OSDN Git Service

Remove columns pg_index.haskeytype and pg_index.indisclustered. Not used.
[pg-rex/syncrep.git] / src / backend / access / gist / gist.c
1 /*-------------------------------------------------------------------------
2  *
3  * gist.c
4  *        interface routines for the postgres GiST index access method.
5  *
6  *
7  *
8  * IDENTIFICATION
9  *        $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.74 2001/05/14 21:53:16 momjian Exp $
10  *
11  *-------------------------------------------------------------------------
12  */
13
14 #include "postgres.h"
15
16 #include "access/genam.h"
17 #include "access/gist.h"
18 #include "access/gistscan.h"
19 #include "access/heapam.h"
20 #include "catalog/index.h"
21 #include "catalog/pg_index.h"
22 #include "executor/executor.h"
23 #include "miscadmin.h"
24 #include "utils/syscache.h"
25
26 #include "access/xlogutils.h"
27
28 /* result's status */
29 #define INSERTED        0x01
30 #define SPLITED         0x02
31
32 /* non-export function prototypes */
33 static void gistdoinsert(Relation r,
34                          IndexTuple itup,
35                          InsertIndexResult *res,
36                          GISTSTATE *GISTstate);
37 static int gistlayerinsert(Relation r, BlockNumber blkno,
38                                 IndexTuple **itup,
39                                 int *len,
40                                 InsertIndexResult *res,
41                                 GISTSTATE *giststate);
42 static OffsetNumber gistwritebuffer(Relation r,
43                                 Page page,
44                                 IndexTuple *itup,
45                                 int len,
46                                 OffsetNumber off,
47                                 GISTSTATE *giststate);
48 static int gistnospace(Page page,
49                         IndexTuple *itvec, int len);
50 static IndexTuple *gistreadbuffer(Relation r,
51                            Buffer buffer, int *len);
52 static IndexTuple *gistjoinvector(
53                            IndexTuple *itvec, int *len,
54                            IndexTuple *additvec, int addlen);
55 static IndexTuple gistunion(Relation r, IndexTuple *itvec,
56                   int len, GISTSTATE *giststate);
57 static IndexTuple gistgetadjusted(Relation r,
58                                 IndexTuple oldtup,
59                                 IndexTuple addtup,
60                                 GISTSTATE *giststate);
61 static IndexTuple *gistSplit(Relation r,
62                   Buffer buffer,
63                   IndexTuple *itup,
64                   int *len,
65                   GISTSTATE *giststate,
66                   InsertIndexResult *res);
67 static void gistnewroot(GISTSTATE *giststate, Relation r,
68                         IndexTuple *itup, int len);
69 static void GISTInitBuffer(Buffer b, uint32 f);
70 static OffsetNumber gistchoose(Relation r, Page p,
71                    IndexTuple it,
72                    GISTSTATE *giststate);
73 static IndexTuple gist_tuple_replacekey(Relation r,
74                                           GISTENTRY entry, IndexTuple t);
75 static void gistcentryinit(GISTSTATE *giststate,
76                            GISTENTRY *e, char *pr,
77                            Relation r, Page pg,
78                            OffsetNumber o, int b, bool l);
79
80 #undef GISTDEBUG
81 #ifdef GISTDEBUG
82 static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff);
83
84 #endif
85
86 /*
87  * routine to build an index.  Basically calls insert over and over
88  */
89 Datum
90 gistbuild(PG_FUNCTION_ARGS)
91 {
92         Relation        heap = (Relation) PG_GETARG_POINTER(0);
93         Relation        index = (Relation) PG_GETARG_POINTER(1);
94         IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
95         Node       *oldPred = (Node *) PG_GETARG_POINTER(3);
96
97 #ifdef NOT_USED
98         IndexStrategy istrat = (IndexStrategy) PG_GETARG_POINTER(4);
99
100 #endif
101         HeapScanDesc hscan;
102         HeapTuple       htup;
103         IndexTuple      itup;
104         TupleDesc       htupdesc,
105                                 itupdesc;
106         Datum           attdata[INDEX_MAX_KEYS];
107         char            nulls[INDEX_MAX_KEYS];
108         double          nhtups,
109                                 nitups;
110         Node       *pred = indexInfo->ii_Predicate;
111
112 #ifndef OMIT_PARTIAL_INDEX
113         TupleTable      tupleTable;
114         TupleTableSlot *slot;
115
116 #endif
117         ExprContext *econtext;
118         GISTSTATE       giststate;
119         GISTENTRY       tmpcentry;
120         Buffer          buffer = InvalidBuffer;
121         bool       *compvec;
122         int                     i;
123
124         /* no locking is needed */
125
126         initGISTstate(&giststate, index);
127
128         /*
129          * We expect to be called exactly once for any index relation. If
130          * that's not the case, big trouble's what we have.
131          */
132         if (oldPred == NULL && RelationGetNumberOfBlocks(index) != 0)
133                 elog(ERROR, "%s already contains data", RelationGetRelationName(index));
134
135         /* initialize the root page (if this is a new index) */
136         if (oldPred == NULL)
137         {
138                 buffer = ReadBuffer(index, P_NEW);
139                 GISTInitBuffer(buffer, F_LEAF);
140                 WriteBuffer(buffer);
141         }
142
143         /* get tuple descriptors for heap and index relations */
144         htupdesc = RelationGetDescr(heap);
145         itupdesc = RelationGetDescr(index);
146
147         /*
148          * If this is a predicate (partial) index, we will need to evaluate
149          * the predicate using ExecQual, which requires the current tuple to
150          * be in a slot of a TupleTable.  In addition, ExecQual must have an
151          * ExprContext referring to that slot.  Here, we initialize dummy
152          * TupleTable and ExprContext objects for this purpose. --Nels, Feb 92
153          *
154          * We construct the ExprContext anyway since we need a per-tuple
155          * temporary memory context for function evaluation -- tgl July 00
156          */
157 #ifndef OMIT_PARTIAL_INDEX
158         if (pred != NULL || oldPred != NULL)
159         {
160                 tupleTable = ExecCreateTupleTable(1);
161                 slot = ExecAllocTableSlot(tupleTable);
162                 ExecSetSlotDescriptor(slot, htupdesc, false);
163         }
164         else
165         {
166                 tupleTable = NULL;
167                 slot = NULL;
168         }
169         econtext = MakeExprContext(slot, TransactionCommandContext);
170 #else
171         econtext = MakeExprContext(NULL, TransactionCommandContext);
172 #endif   /* OMIT_PARTIAL_INDEX */
173
174         /* build the index */
175         nhtups = nitups = 0.0;
176
177         compvec = (bool *) palloc(sizeof(bool) * indexInfo->ii_NumIndexAttrs);
178
179         /* start a heap scan */
180         hscan = heap_beginscan(heap, 0, SnapshotNow, 0, (ScanKey) NULL);
181
182         while (HeapTupleIsValid(htup = heap_getnext(hscan, 0)))
183         {
184                 MemoryContextReset(econtext->ecxt_per_tuple_memory);
185
186                 nhtups += 1.0;
187
188 #ifndef OMIT_PARTIAL_INDEX
189
190                 /*
191                  * If oldPred != NULL, this is an EXTEND INDEX command, so skip
192                  * this tuple if it was already in the existing partial index
193                  */
194                 if (oldPred != NULL)
195                 {
196                         slot->val = htup;
197                         if (ExecQual((List *) oldPred, econtext, false))
198                         {
199                                 nitups += 1.0;
200                                 continue;
201                         }
202                 }
203
204                 /*
205                  * Skip this tuple if it doesn't satisfy the partial-index
206                  * predicate
207                  */
208                 if (pred != NULL)
209                 {
210                         slot->val = htup;
211                         if (!ExecQual((List *) pred, econtext, false))
212                                 continue;
213                 }
214 #endif   /* OMIT_PARTIAL_INDEX */
215
216                 nitups += 1.0;
217
218                 /*
219                  * For the current heap tuple, extract all the attributes we use
220                  * in this index, and note which are null.
221                  */
222                 FormIndexDatum(indexInfo,
223                                            htup,
224                                            htupdesc,
225                                            econtext->ecxt_per_tuple_memory,
226                                            attdata,
227                                            nulls);
228
229                 /* immediately compress keys to normalize */
230                 for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
231                 {
232                         gistcentryinit(&giststate, &tmpcentry, (char *) attdata[i],
233                                                    (Relation) NULL, (Page) NULL, (OffsetNumber) 0,
234                                                    -1 /* size is currently bogus */ , TRUE);
235                         if (attdata[i] != (Datum) tmpcentry.pred &&
236                                 !(giststate.keytypbyval))
237                                 compvec[i] = TRUE;
238                         else
239                                 compvec[i] = FALSE;
240                         attdata[i] = (Datum) tmpcentry.pred;
241                 }
242
243                 /* form an index tuple and point it at the heap tuple */
244                 itup = index_formtuple(itupdesc, attdata, nulls);
245                 itup->t_tid = htup->t_self;
246
247                 /*
248                  * Since we already have the index relation locked, we call
249                  * gistdoinsert directly.  Normal access method calls dispatch
250                  * through gistinsert, which locks the relation for write.      This
251                  * is the right thing to do if you're inserting single tups, but
252                  * not when you're initializing the whole index at once.
253                  */
254
255                 gistdoinsert(index, itup, NULL, &giststate);
256
257                 for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
258                         if (compvec[i])
259                                 pfree(DatumGetPointer(attdata[i]));
260
261                 pfree(itup);
262         }
263
264         /* okay, all heap tuples are indexed */
265         heap_endscan(hscan);
266
267         pfree(compvec);
268
269 #ifndef OMIT_PARTIAL_INDEX
270         if (pred != NULL || oldPred != NULL)
271                 ExecDropTupleTable(tupleTable, true);
272 #endif   /* OMIT_PARTIAL_INDEX */
273         FreeExprContext(econtext);
274
275         /*
276          * Since we just counted the tuples in the heap, we update its stats
277          * in pg_class to guarantee that the planner takes advantage of the
278          * index we just created.  But, only update statistics during normal
279          * index definitions, not for indices on system catalogs created
280          * during bootstrap processing.  We must close the relations before
281          * updating statistics to guarantee that the relcache entries are
282          * flushed when we increment the command counter in UpdateStats(). But
283          * we do not release any locks on the relations; those will be held
284          * until end of transaction.
285          */
286         if (IsNormalProcessingMode())
287         {
288                 Oid                     hrelid = RelationGetRelid(heap);
289                 Oid                     irelid = RelationGetRelid(index);
290
291                 heap_close(heap, NoLock);
292                 index_close(index);
293                 UpdateStats(hrelid, nhtups);
294                 UpdateStats(irelid, nitups);
295                 if (oldPred != NULL)
296                 {
297                         if (nitups == nhtups)
298                                 pred = NULL;
299                         UpdateIndexPredicate(irelid, oldPred, pred);
300                 }
301         }
302
303 #ifdef GISTDEBUG
304         gist_dumptree(index, 0, GISTP_ROOT, 0);
305 #endif
306
307         PG_RETURN_VOID();
308 }
309
310 /*
311  *      gistinsert -- wrapper for GiST tuple insertion.
312  *
313  *        This is the public interface routine for tuple insertion in GiSTs.
314  *        It doesn't do any work; just locks the relation and passes the buck.
315  */
316 Datum
317 gistinsert(PG_FUNCTION_ARGS)
318 {
319         Relation        r = (Relation) PG_GETARG_POINTER(0);
320         Datum      *datum = (Datum *) PG_GETARG_POINTER(1);
321         char       *nulls = (char *) PG_GETARG_POINTER(2);
322         ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
323
324 #ifdef NOT_USED
325         Relation        heapRel = (Relation) PG_GETARG_POINTER(4);
326
327 #endif
328         InsertIndexResult res;
329         IndexTuple      itup;
330         GISTSTATE       giststate;
331         GISTENTRY       tmpentry;
332         int                     i;
333         bool       *compvec;
334
335         initGISTstate(&giststate, r);
336
337         /* immediately compress keys to normalize */
338         compvec = (bool *) palloc(sizeof(bool) * r->rd_att->natts);
339         for (i = 0; i < r->rd_att->natts; i++)
340         {
341                 gistcentryinit(&giststate, &tmpentry, (char *) datum[i],
342                                            (Relation) NULL, (Page) NULL, (OffsetNumber) 0,
343                                            -1 /* size is currently bogus */ , TRUE);
344                 if (datum[i] != (Datum) tmpentry.pred && !(giststate.keytypbyval))
345                         compvec[i] = TRUE;
346                 else
347                         compvec[i] = FALSE;
348                 datum[i] = (Datum) tmpentry.pred;
349         }
350         itup = index_formtuple(RelationGetDescr(r), datum, nulls);
351         itup->t_tid = *ht_ctid;
352
353         /*
354          * Notes in ExecUtils:ExecOpenIndices()
355          *
356          * RelationSetLockForWrite(r);
357          */
358
359         res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
360         gistdoinsert(r, itup, &res, &giststate);
361         for (i = 0; i < r->rd_att->natts; i++)
362                 if (compvec[i] == TRUE)
363                         pfree((char *) datum[i]);
364         pfree(itup);
365         pfree(compvec);
366
367         PG_RETURN_POINTER(res);
368 }
369
370 /*
371 ** Take a compressed entry, and install it on a page.  Since we now know
372 ** where the entry will live, we decompress it and recompress it using
373 ** that knowledge (some compression routines may want to fish around
374 ** on the page, for example, or do something special for leaf nodes.)
375 */
376 static OffsetNumber
377 gistPageAddItem(GISTSTATE *giststate,
378                                 Relation r,
379                                 Page page,
380                                 Item item,
381                                 Size size,
382                                 OffsetNumber offsetNumber,
383                                 ItemIdFlags flags,
384                                 GISTENTRY *dentry,
385                                 IndexTuple *newtup)
386 {
387         GISTENTRY       tmpcentry;
388         IndexTuple      itup = (IndexTuple) item;
389         OffsetNumber retval;
390
391         /*
392          * recompress the item given that we now know the exact page and
393          * offset for insertion
394          */
395         gistdentryinit(giststate, dentry,
396                                    (((char *) itup) + sizeof(IndexTupleData)),
397                           (Relation) 0, (Page) 0, (OffsetNumber) InvalidOffsetNumber,
398                                    IndexTupleSize(itup) - sizeof(IndexTupleData), FALSE);
399         gistcentryinit(giststate, &tmpcentry, dentry->pred, r, page,
400                                    offsetNumber, dentry->bytes, FALSE);
401         *newtup = gist_tuple_replacekey(r, tmpcentry, itup);
402         retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup),
403                                                  offsetNumber, flags);
404         if (retval == InvalidOffsetNumber)
405                 elog(ERROR, "gist: failed to add index item to %s",
406                          RelationGetRelationName(r));
407         /* be tidy */
408         if (tmpcentry.pred && tmpcentry.pred != dentry->pred
409                 && tmpcentry.pred != (((char *) itup) + sizeof(IndexTupleData)))
410                 pfree(tmpcentry.pred);
411         return (retval);
412 }
413
414 static void
415 gistdoinsert(Relation r,
416                          IndexTuple itup,
417                          InsertIndexResult *res,
418                          GISTSTATE *giststate)
419 {
420         IndexTuple *instup;
421         int                     i,
422                                 ret,
423                                 len = 1;
424
425         instup = (IndexTuple *) palloc(sizeof(IndexTuple));
426         instup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
427         memcpy(instup[0], itup, IndexTupleSize(itup));
428
429         ret = gistlayerinsert(r, GISTP_ROOT, &instup, &len, res, giststate);
430         if (ret & SPLITED)
431                 gistnewroot(giststate, r, instup, len);
432
433         for (i = 0; i < len; i++)
434                 pfree(instup[i]);
435         pfree(instup);
436 }
437
438 static int
439 gistlayerinsert(Relation r, BlockNumber blkno,
440                                 IndexTuple **itup,              /* in - out, has compressed entry */
441                                 int *len,               /* in - out */
442                                 InsertIndexResult *res, /* out */
443                                 GISTSTATE *giststate)
444 {
445         Buffer          buffer;
446         Page            page;
447         OffsetNumber child;
448         int                     ret;
449         GISTPageOpaque opaque;
450
451         buffer = ReadBuffer(r, blkno);
452         page = (Page) BufferGetPage(buffer);
453         opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
454
455         if (!(opaque->flags & F_LEAF))
456         {
457                 /* internal page, so we must walk on tree */
458                 /* len IS equial 1 */
459                 ItemId          iid;
460                 BlockNumber nblkno;
461                 ItemPointerData oldtid;
462                 IndexTuple      oldtup;
463
464                 child = gistchoose(r, page, *(*itup), giststate);
465                 iid = PageGetItemId(page, child);
466                 oldtup = (IndexTuple) PageGetItem(page, iid);
467                 nblkno = ItemPointerGetBlockNumber(&(oldtup->t_tid));
468
469                 /*
470                  * After this call: 1. if child page was splited, then itup
471                  * contains keys for each page 2. if  child page wasn't splited,
472                  * then itup contains additional for adjustement of current key
473                  */
474                 ret = gistlayerinsert(r, nblkno, itup, len, res, giststate);
475
476                 /* nothing inserted in child */
477                 if (!(ret & INSERTED))
478                 {
479                         ReleaseBuffer(buffer);
480                         return 0x00;
481                 }
482
483                 /* child does not splited */
484                 if (!(ret & SPLITED))
485                 {
486                         IndexTuple      newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate);
487
488                         if (!newtup)
489                         {
490                                 /* not need to update key */
491                                 ReleaseBuffer(buffer);
492                                 return 0x00;
493                         }
494
495                         pfree((*itup)[0]);      /* !!! */
496                         (*itup)[0] = newtup;
497                 }
498
499                 /* key is modified, so old version must be deleted */
500                 ItemPointerSet(&oldtid, blkno, child);
501                 DirectFunctionCall2(gistdelete,
502                                                         PointerGetDatum(r),
503                                                         PointerGetDatum(&oldtid));
504         }
505
506         ret = INSERTED;
507
508         if (gistnospace(page, (*itup), *len))
509         {
510                 /* no space for insertion */
511                 IndexTuple *itvec;
512                 int                     tlen;
513
514                 ret |= SPLITED;
515                 itvec = gistreadbuffer(r, buffer, &tlen);
516                 itvec = gistjoinvector(itvec, &tlen, (*itup), *len);
517                 pfree((*itup));
518                 (*itup) = gistSplit(r, buffer, itvec, &tlen, giststate,
519                                                         (opaque->flags & F_LEAF) ? res : NULL);         /* res only for
520                                                                                                                                                  * inserting in leaf */
521                 ReleaseBuffer(buffer);
522                 pfree(itvec);
523                 *len = tlen;                    /* now tlen >= 2 */
524         }
525         else
526         {
527                 /* enogth space */
528                 OffsetNumber off,
529                                         l;
530
531                 off = (PageIsEmpty(page)) ?
532                         FirstOffsetNumber
533                         :
534                         OffsetNumberNext(PageGetMaxOffsetNumber(page));
535                 l = gistwritebuffer(r, page, (*itup), *len, off, giststate);
536                 WriteBuffer(buffer);
537
538                 /*
539                  * set res if insert into leaf page, in this case, len = 1 always
540                  */
541                 if (res && (opaque->flags & F_LEAF))
542                         ItemPointerSet(&((*res)->pointerData), blkno, l);
543
544                 if (*len > 1)
545                 {                                               /* previos insert ret & SPLITED != 0 */
546                         int                     i;
547
548                         /*
549                          * child was splited, so we must form union for insertion in
550                          * parent
551                          */
552                         IndexTuple      newtup = gistunion(r, (*itup), *len, giststate);
553
554                         for (i = 0; i < *len; i++)
555                                 pfree((*itup)[i]);
556                         (*itup)[0] = newtup;
557                         *len = 1;
558                 }
559         }
560
561         return ret;
562 }
563
564 /*
565  * Write itup vector to page, has no control of free space
566  */
567 static OffsetNumber
568 gistwritebuffer(Relation r, Page page, IndexTuple *itup,
569                                 int len, OffsetNumber off, GISTSTATE *giststate)
570 {
571         OffsetNumber l = InvalidOffsetNumber;
572         int                     i;
573         GISTENTRY       tmpdentry;
574         IndexTuple      newtup;
575
576         for (i = 0; i < len; i++)
577         {
578                 l = gistPageAddItem(giststate, r, page,
579                                                         (Item) itup[i], IndexTupleSize(itup[i]),
580                                                         off, LP_USED, &tmpdentry, &newtup);
581                 off = OffsetNumberNext(off);
582                 if (tmpdentry.pred != (((char *) itup[i]) + sizeof(IndexTupleData)) && tmpdentry.pred)
583                         pfree(tmpdentry.pred);
584                 if (itup[i] != newtup)
585                         pfree(newtup);
586         }
587         return l;
588 }
589
590 /*
591  * Check space for itup vector on page
592  */
593 static int
594 gistnospace(Page page, IndexTuple *itvec, int len)
595 {
596         int                     size = 0;
597         int                     i;
598
599         for (i = 0; i < len; i++)
600                 size += IndexTupleSize(itvec[i]) + 4;   /* ??? */
601
602         return (PageGetFreeSpace(page) < size);
603 }
604
605 /*
606  * Read buffer into itup vector
607  */
608 static IndexTuple *
609 gistreadbuffer(Relation r, Buffer buffer, int *len /* out */ )
610 {
611         OffsetNumber i,
612                                 maxoff;
613         IndexTuple *itvec;
614         Page            p = (Page) BufferGetPage(buffer);
615
616         *len = 0;
617         maxoff = PageGetMaxOffsetNumber(p);
618         itvec = palloc(sizeof(IndexTuple) * maxoff);
619         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
620                 itvec[(*len)++] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
621
622         return itvec;
623 }
624
625 /*
626  * join two vectors into one
627  */
628 static IndexTuple *
629 gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
630 {
631         itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
632         memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
633         *len += addlen;
634         return itvec;
635 }
636
637 /*
638  * return union of itup vector
639  */
640 static IndexTuple
641 gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
642 {
643         bytea      *evec;
644         char       *datum;
645         int                     datumsize,
646                                 i;
647         GISTENTRY       centry;
648         char            isnull;
649         IndexTuple      newtup;
650
651         evec = (bytea *) palloc(len * sizeof(GISTENTRY) + VARHDRSZ);
652         VARATT_SIZEP(evec) = len * sizeof(GISTENTRY) + VARHDRSZ;
653
654         for (i = 0; i < len; i++)
655                 gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[i],
656                                            (char *) itvec[i] + sizeof(IndexTupleData),
657                                            (Relation) NULL, (Page) NULL, (OffsetNumber) NULL,
658                                            IndexTupleSize((IndexTuple) itvec[i]) - sizeof(IndexTupleData), FALSE);
659
660         datum = (char *)
661                 DatumGetPointer(FunctionCall2(&giststate->unionFn,
662                                                                           PointerGetDatum(evec),
663                                                                           PointerGetDatum(&datumsize)));
664
665         for (i = 0; i < len; i++)
666                 if (((GISTENTRY *) VARDATA(evec))[i].pred &&
667                         ((GISTENTRY *) VARDATA(evec))[i].pred !=
668                         ((char *) (itvec[i]) + sizeof(IndexTupleData)))
669                         pfree(((GISTENTRY *) VARDATA(evec))[i].pred);
670
671         pfree(evec);
672
673         gistcentryinit(giststate, &centry, datum,
674                                    (Relation) NULL, (Page) NULL, (OffsetNumber) NULL,
675                                    datumsize, FALSE);
676
677         isnull = (centry.pred) ? ' ' : 'n';
678         newtup = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &centry.pred, &isnull);
679         if (centry.pred != datum)
680                 pfree(datum);
681
682         return newtup;
683 }
684
685 /*
686  * Forms union of oldtup and addtup, if union == oldtup then return NULL
687  */
688 static IndexTuple
689 gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
690 {
691         bytea      *evec;
692         char       *datum;
693         int                     datumsize;
694         bool            result;
695         char            isnull;
696         GISTENTRY       centry,
697                            *ev0p,
698                            *ev1p;
699         IndexTuple      newtup = NULL;
700
701         evec = (bytea *) palloc(2 * sizeof(GISTENTRY) + VARHDRSZ);
702         VARATT_SIZEP(evec) = 2 * sizeof(GISTENTRY) + VARHDRSZ;
703
704         gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[0],
705                            (char *) oldtup + sizeof(IndexTupleData), (Relation) NULL,
706                                    (Page) NULL, (OffsetNumber) 0,
707         IndexTupleSize((IndexTuple) oldtup) - sizeof(IndexTupleData), FALSE);
708         ev0p = &((GISTENTRY *) VARDATA(evec))[0];
709
710         gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[1],
711                            (char *) addtup + sizeof(IndexTupleData), (Relation) NULL,
712                                    (Page) NULL, (OffsetNumber) 0,
713         IndexTupleSize((IndexTuple) addtup) - sizeof(IndexTupleData), FALSE);
714         ev1p = &((GISTENTRY *) VARDATA(evec))[1];
715
716         datum = (char *)
717                 DatumGetPointer(FunctionCall2(&giststate->unionFn,
718                                                                           PointerGetDatum(evec),
719                                                                           PointerGetDatum(&datumsize)));
720
721         if (!(ev0p->pred && ev1p->pred))
722                 result = (ev0p->pred == NULL && ev1p->pred == NULL);
723         else
724         {
725                 FunctionCall3(&giststate->equalFn,
726                                           PointerGetDatum(ev0p->pred),
727                                           PointerGetDatum(datum),
728                                           PointerGetDatum(&result));
729         }
730
731         if (result)
732         {
733                 /* not need to update key */
734                 pfree(datum);
735         }
736         else
737         {
738                 gistcentryinit(giststate, &centry, datum, ev0p->rel, ev0p->page,
739                                            ev0p->offset, datumsize, FALSE);
740
741                 isnull = (centry.pred) ? ' ' : 'n';
742                 newtup = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &centry.pred, &isnull);
743                 newtup->t_tid = oldtup->t_tid;
744                 if (centry.pred != datum)
745                         pfree(datum);
746         }
747
748         if (ev0p->pred &&
749                 ev0p->pred != (char *) oldtup + sizeof(IndexTupleData))
750                 pfree(ev0p->pred);
751         if (ev1p->pred &&
752                 ev1p->pred != (char *) addtup + sizeof(IndexTupleData))
753                 pfree(ev1p->pred);
754         pfree(evec);
755
756         return newtup;
757 }
758
759 /*
760  *      gistSplit -- split a page in the tree.
761  */
762 static IndexTuple *
763 gistSplit(Relation r,
764                   Buffer buffer,
765                   IndexTuple *itup,             /* contains compressed entry */
766                   int *len,
767                   GISTSTATE *giststate,
768                   InsertIndexResult *res)
769 {
770         Page            p;
771         Buffer          leftbuf,
772                                 rightbuf;
773         Page            left,
774                                 right;
775         OffsetNumber *spl_left,
776                            *spl_right;
777         IndexTuple *lvectup,
778                            *rvectup,
779                            *newtup;
780         int                     leftoff,
781                                 rightoff;
782         BlockNumber lbknum,
783                                 rbknum;
784         GISTPageOpaque opaque;
785         char            isnull;
786         GIST_SPLITVEC v;
787         bytea      *entryvec;
788         bool       *decompvec;
789         GISTENTRY       tmpentry;
790         int                     i,
791                                 nlen;
792
793         p = (Page) BufferGetPage(buffer);
794         opaque = (GISTPageOpaque) PageGetSpecialPointer(p);
795
796
797         /*
798          * The root of the tree is the first block in the relation.  If we're
799          * about to split the root, we need to do some hocus-pocus to enforce
800          * this guarantee.
801          */
802
803         if (BufferGetBlockNumber(buffer) == GISTP_ROOT)
804         {
805                 leftbuf = ReadBuffer(r, P_NEW);
806                 GISTInitBuffer(leftbuf, opaque->flags);
807                 lbknum = BufferGetBlockNumber(leftbuf);
808                 left = (Page) BufferGetPage(leftbuf);
809         }
810         else
811         {
812                 leftbuf = buffer;
813                 IncrBufferRefCount(buffer);
814                 lbknum = BufferGetBlockNumber(buffer);
815                 left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData));
816         }
817
818         rightbuf = ReadBuffer(r, P_NEW);
819         GISTInitBuffer(rightbuf, opaque->flags);
820         rbknum = BufferGetBlockNumber(rightbuf);
821         right = (Page) BufferGetPage(rightbuf);
822
823         /* generate the item array */
824         entryvec = (bytea *) palloc(VARHDRSZ + (*len + 1) * sizeof(GISTENTRY));
825         decompvec = (bool *) palloc(VARHDRSZ + (*len + 1) * sizeof(bool));
826         VARATT_SIZEP(entryvec) = (*len + 1) * sizeof(GISTENTRY) + VARHDRSZ;
827         for (i = 1; i <= *len; i++)
828         {
829                 gistdentryinit(giststate, &((GISTENTRY *) VARDATA(entryvec))[i],
830                                            (((char *) itup[i - 1]) + sizeof(IndexTupleData)),
831                                            r, p, i,
832                         IndexTupleSize(itup[i - 1]) - sizeof(IndexTupleData), FALSE);
833                 if ((char *) (((GISTENTRY *) VARDATA(entryvec))[i].pred)
834                         == (((char *) itup[i - 1]) + sizeof(IndexTupleData)))
835                         decompvec[i] = FALSE;
836                 else
837                         decompvec[i] = TRUE;
838         }
839
840         /* now let the user-defined picksplit function set up the split vector */
841         FunctionCall2(&giststate->picksplitFn,
842                                   PointerGetDatum(entryvec),
843                                   PointerGetDatum(&v));
844
845         /* clean up the entry vector: its preds need to be deleted, too */
846         for (i = 1; i <= *len; i++)
847                 if (decompvec[i] && ((GISTENTRY *) VARDATA(entryvec))[i].pred)
848                         pfree(((GISTENTRY *) VARDATA(entryvec))[i].pred);
849         pfree(entryvec);
850         pfree(decompvec);
851
852         spl_left = v.spl_left;
853         spl_right = v.spl_right;
854
855         /* form left and right vector */
856         lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nleft);
857         rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nright);
858         leftoff = rightoff = 0;
859         for (i = 1; i <= *len; i++)
860         {
861                 if (i == *(spl_left) || (i == *len && *(spl_left) != FirstOffsetNumber))
862                 {
863                         lvectup[leftoff++] = itup[i - 1];
864                         spl_left++;
865                 }
866                 else
867                 {
868                         rvectup[rightoff++] = itup[i - 1];
869                         spl_right++;
870                 }
871         }
872
873         /* write on disk (may be need another split) */
874         if (gistnospace(right, rvectup, v.spl_nright))
875         {
876                 nlen = v.spl_nright;
877                 newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate,
878                           (res && rvectup[nlen - 1] == itup[*len - 1]) ? res : NULL);
879                 ReleaseBuffer(rightbuf);
880         }
881         else
882         {
883                 OffsetNumber l;
884
885                 l = gistwritebuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber, giststate);
886                 WriteBuffer(rightbuf);
887
888                 if (res)
889                         ItemPointerSet(&((*res)->pointerData), rbknum, l);
890                 gistcentryinit(giststate, &tmpentry, v.spl_rdatum, (Relation) NULL,
891                                            (Page) NULL, (OffsetNumber) 0,
892                                            -1, FALSE);
893                 if (v.spl_rdatum != tmpentry.pred)
894                         pfree(v.spl_rdatum);
895                 v.spl_rdatum = tmpentry.pred;
896
897                 nlen = 1;
898                 newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
899                 isnull = (v.spl_rdatum) ? ' ' : 'n';
900                 newtup[0] = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &(v.spl_rdatum), &isnull);
901                 ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1);
902         }
903
904         if (gistnospace(left, lvectup, v.spl_nleft))
905         {
906                 int                     llen = v.spl_nleft;
907                 IndexTuple *lntup;
908
909                 lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate,
910                           (res && lvectup[llen - 1] == itup[*len - 1]) ? res : NULL);
911                 ReleaseBuffer(leftbuf);
912
913                 newtup = gistjoinvector(newtup, &nlen, lntup, llen);
914                 pfree(lntup);
915         }
916         else
917         {
918                 OffsetNumber l;
919
920                 l = gistwritebuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber, giststate);
921                 if (BufferGetBlockNumber(buffer) != GISTP_ROOT)
922                         PageRestoreTempPage(left, p);
923
924                 WriteBuffer(leftbuf);
925
926                 if (res)
927                         ItemPointerSet(&((*res)->pointerData), lbknum, l);
928                 gistcentryinit(giststate, &tmpentry, v.spl_ldatum, (Relation) NULL,
929                                            (Page) NULL, (OffsetNumber) 0,
930                                            -1, FALSE);
931                 if (v.spl_ldatum != tmpentry.pred)
932                         pfree(v.spl_ldatum);
933                 v.spl_ldatum = tmpentry.pred;
934
935                 nlen += 1;
936                 newtup = (IndexTuple *) repalloc((void *) newtup, sizeof(IndexTuple) * nlen);
937                 isnull = (v.spl_ldatum) ? ' ' : 'n';
938                 newtup[nlen - 1] = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &(v.spl_ldatum), &isnull);
939                 ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1);
940         }
941
942
943         /* adjust active scans */
944         gistadjscans(r, GISTOP_SPLIT, BufferGetBlockNumber(buffer), FirstOffsetNumber);
945
946         /* !!! pfree */
947         pfree(rvectup);
948         pfree(lvectup);
949         pfree(v.spl_left);
950         pfree(v.spl_right);
951
952         *len = nlen;
953         return newtup;
954 }
955
956 static void
957 gistnewroot(GISTSTATE *giststate, Relation r, IndexTuple *itup, int len)
958 {
959         Buffer          b;
960         Page            p;
961
962         b = ReadBuffer(r, GISTP_ROOT);
963         GISTInitBuffer(b, 0);
964         p = BufferGetPage(b);
965
966         gistwritebuffer(r, p, itup, len, FirstOffsetNumber, giststate);
967         WriteBuffer(b);
968 }
969
970 static void
971 GISTInitBuffer(Buffer b, uint32 f)
972 {
973         GISTPageOpaque opaque;
974         Page            page;
975         Size            pageSize;
976
977         pageSize = BufferGetPageSize(b);
978
979         page = BufferGetPage(b);
980         MemSet(page, 0, (int) pageSize);
981         PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
982
983         opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
984         opaque->flags = f;
985 }
986
987
988 /*
989 ** find entry with lowest penalty
990 */
991 static OffsetNumber
992 gistchoose(Relation r, Page p, IndexTuple it,   /* it has compressed entry */
993                    GISTSTATE *giststate)
994 {
995         OffsetNumber maxoff;
996         OffsetNumber i;
997         char       *id;
998         char       *datum;
999         float           usize;
1000         OffsetNumber which;
1001         float           which_grow;
1002         GISTENTRY       entry,
1003                                 identry;
1004         int                     size,
1005                                 idsize;
1006
1007         idsize = IndexTupleSize(it) - sizeof(IndexTupleData);
1008         id = ((char *) it) + sizeof(IndexTupleData);
1009         maxoff = PageGetMaxOffsetNumber(p);
1010         which_grow = -1.0;
1011         which = -1;
1012
1013         gistdentryinit(giststate, &identry, id, (Relation) NULL, (Page) NULL,
1014                                    (OffsetNumber) 0, idsize, FALSE);
1015
1016         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1017         {
1018                 datum = (char *) PageGetItem(p, PageGetItemId(p, i));
1019                 size = IndexTupleSize(datum) - sizeof(IndexTupleData);
1020                 datum += sizeof(IndexTupleData);
1021                 gistdentryinit(giststate, &entry, datum, r, p, i, size, FALSE);
1022                 FunctionCall3(&giststate->penaltyFn,
1023                                           PointerGetDatum(&entry),
1024                                           PointerGetDatum(&identry),
1025                                           PointerGetDatum(&usize));
1026                 if (which_grow < 0 || usize < which_grow)
1027                 {
1028                         which = i;
1029                         which_grow = usize;
1030                         if (which_grow == 0)
1031                                 break;
1032                 }
1033                 if (entry.pred && entry.pred != datum)
1034                         pfree(entry.pred);
1035         }
1036         if (identry.pred && identry.pred != id)
1037                 pfree(identry.pred);
1038
1039         return which;
1040 }
1041
1042 void
1043 gistfreestack(GISTSTACK *s)
1044 {
1045         GISTSTACK  *p;
1046
1047         while (s != (GISTSTACK *) NULL)
1048         {
1049                 p = s->gs_parent;
1050                 pfree(s);
1051                 s = p;
1052         }
1053 }
1054
1055
1056 /*
1057 ** remove an entry from a page
1058 */
1059 Datum
1060 gistdelete(PG_FUNCTION_ARGS)
1061 {
1062         Relation        r = (Relation) PG_GETARG_POINTER(0);
1063         ItemPointer tid = (ItemPointer) PG_GETARG_POINTER(1);
1064         BlockNumber blkno;
1065         OffsetNumber offnum;
1066         Buffer          buf;
1067         Page            page;
1068
1069         /*
1070          * Notes in ExecUtils:ExecOpenIndices() Also note that only vacuum
1071          * deletes index tuples now...
1072          *
1073          * RelationSetLockForWrite(r);
1074          */
1075
1076         blkno = ItemPointerGetBlockNumber(tid);
1077         offnum = ItemPointerGetOffsetNumber(tid);
1078
1079         /* adjust any scans that will be affected by this deletion */
1080         gistadjscans(r, GISTOP_DEL, blkno, offnum);
1081
1082         /* delete the index tuple */
1083         buf = ReadBuffer(r, blkno);
1084         page = BufferGetPage(buf);
1085
1086         PageIndexTupleDelete(page, offnum);
1087
1088         WriteBuffer(buf);
1089
1090         PG_RETURN_VOID();
1091 }
1092
1093 void
1094 initGISTstate(GISTSTATE *giststate, Relation index)
1095 {
1096         RegProcedure consistent_proc,
1097                                 union_proc,
1098                                 compress_proc,
1099                                 decompress_proc;
1100         RegProcedure penalty_proc,
1101                                 picksplit_proc,
1102                                 equal_proc;
1103         HeapTuple       htup;
1104         Form_pg_index itupform;
1105         Oid                     indexrelid;
1106
1107         consistent_proc = index_getprocid(index, 1, GIST_CONSISTENT_PROC);
1108         union_proc = index_getprocid(index, 1, GIST_UNION_PROC);
1109         compress_proc = index_getprocid(index, 1, GIST_COMPRESS_PROC);
1110         decompress_proc = index_getprocid(index, 1, GIST_DECOMPRESS_PROC);
1111         penalty_proc = index_getprocid(index, 1, GIST_PENALTY_PROC);
1112         picksplit_proc = index_getprocid(index, 1, GIST_PICKSPLIT_PROC);
1113         equal_proc = index_getprocid(index, 1, GIST_EQUAL_PROC);
1114         fmgr_info(consistent_proc, &giststate->consistentFn);
1115         fmgr_info(union_proc, &giststate->unionFn);
1116         fmgr_info(compress_proc, &giststate->compressFn);
1117         fmgr_info(decompress_proc, &giststate->decompressFn);
1118         fmgr_info(penalty_proc, &giststate->penaltyFn);
1119         fmgr_info(picksplit_proc, &giststate->picksplitFn);
1120         fmgr_info(equal_proc, &giststate->equalFn);
1121
1122         /* see if key type is different from type of attribute being indexed */
1123         htup = SearchSysCache(INDEXRELID,
1124                                                   ObjectIdGetDatum(RelationGetRelid(index)),
1125                                                   0, 0, 0);
1126         if (!HeapTupleIsValid(htup))
1127                 elog(ERROR, "initGISTstate: index %u not found",
1128                          RelationGetRelid(index));
1129         itupform = (Form_pg_index) GETSTRUCT(htup);
1130         indexrelid = itupform->indexrelid;
1131         ReleaseSysCache(htup);
1132
1133         if (giststate->haskeytype)
1134         {
1135                 /* key type is different -- is it byval? */
1136                 htup = SearchSysCache(ATTNUM,
1137                                                           ObjectIdGetDatum(indexrelid),
1138                                                           UInt16GetDatum(FirstOffsetNumber),
1139                                                           0, 0);
1140                 if (!HeapTupleIsValid(htup))
1141                         elog(ERROR, "initGISTstate: no attribute tuple %u %d",
1142                                  indexrelid, FirstOffsetNumber);
1143                 giststate->keytypbyval = (((Form_pg_attribute) htup)->attbyval);
1144                 ReleaseSysCache(htup);
1145         }
1146         else
1147                 giststate->keytypbyval = FALSE;
1148 }
1149
1150
1151 /*
1152 ** Given an IndexTuple to be inserted on a page, this routine replaces
1153 ** the key with another key, which may involve generating a new IndexTuple
1154 ** if the sizes don't match
1155 */
1156 static IndexTuple
1157 gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t)
1158 {
1159         char       *datum = (((char *) t) + sizeof(IndexTupleData));
1160
1161         /* if new entry fits in index tuple, copy it in */
1162         if ((Size) entry.bytes < IndexTupleSize(t) - sizeof(IndexTupleData) || (Size) entry.bytes == 0)
1163         {
1164                 memcpy(datum, entry.pred, entry.bytes);
1165                 /* clear out old size */
1166                 t->t_info &= ~INDEX_SIZE_MASK;
1167                 /* or in new size */
1168                 t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData));
1169
1170                 return t;
1171         }
1172         else
1173         {
1174                 /* generate a new index tuple for the compressed entry */
1175                 TupleDesc       tupDesc = r->rd_att;
1176                 IndexTuple      newtup;
1177                 char            isnull;
1178
1179                 isnull = (entry.pred) ? ' ' : 'n';
1180                 newtup = (IndexTuple) index_formtuple(tupDesc,
1181                                                                                           (Datum *) &(entry.pred),
1182                                                                                           &isnull);
1183                 newtup->t_tid = t->t_tid;
1184                 return newtup;
1185         }
1186 }
1187
1188
1189 /*
1190 ** initialize a GiST entry with a decompressed version of pred
1191 */
1192 void
1193 gistdentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r,
1194                            Page pg, OffsetNumber o, int b, bool l)
1195 {
1196         GISTENTRY  *dep;
1197
1198         gistentryinit(*e, pr, r, pg, o, b, l);
1199         if (giststate->haskeytype)
1200         {
1201                 dep = (GISTENTRY *)
1202                         DatumGetPointer(FunctionCall1(&giststate->decompressFn,
1203                                                                                   PointerGetDatum(e)));
1204                 gistentryinit(*e, dep->pred, dep->rel, dep->page, dep->offset, dep->bytes,
1205                                           dep->leafkey);
1206                 if (dep != e)
1207                         pfree(dep);
1208         }
1209 }
1210
1211
1212 /*
1213 ** initialize a GiST entry with a compressed version of pred
1214 */
1215 static void
1216 gistcentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r,
1217                            Page pg, OffsetNumber o, int b, bool l)
1218 {
1219         GISTENTRY  *cep;
1220
1221         gistentryinit(*e, pr, r, pg, o, b, l);
1222         if (giststate->haskeytype)
1223         {
1224                 cep = (GISTENTRY *)
1225                         DatumGetPointer(FunctionCall1(&giststate->compressFn,
1226                                                                                   PointerGetDatum(e)));
1227                 gistentryinit(*e, cep->pred, cep->rel, cep->page, cep->offset, cep->bytes,
1228                                           cep->leafkey);
1229                 if (cep != e)
1230                         pfree(cep);
1231         }
1232 }
1233
1234 #ifdef GISTDEBUG
1235 static void
1236 gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
1237 {
1238         Buffer          buffer;
1239         Page            page;
1240         GISTPageOpaque opaque;
1241         IndexTuple      which;
1242         ItemId          iid;
1243         OffsetNumber i,
1244                                 maxoff;
1245         BlockNumber cblk;
1246         char       *pred;
1247
1248         pred = (char *) palloc(sizeof(char) * level + 1);
1249         MemSet(pred, '\t', level);
1250         pred[level] = '\0';
1251
1252         buffer = ReadBuffer(r, blk);
1253         page = (Page) BufferGetPage(buffer);
1254         opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
1255
1256         maxoff = PageGetMaxOffsetNumber(page);
1257
1258         elog(NOTICE, "%sPage: %d %s blk: %d maxoff: %d free: %d", pred, coff, (opaque->flags & F_LEAF) ? "LEAF" : "INTE", (int) blk, (int) maxoff, PageGetFreeSpace(page));
1259
1260         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1261         {
1262                 iid = PageGetItemId(page, i);
1263                 which = (IndexTuple) PageGetItem(page, iid);
1264                 cblk = ItemPointerGetBlockNumber(&(which->t_tid));
1265 #ifdef PRINTTUPLE
1266                 elog(NOTICE, "%s  Tuple. blk: %d size: %d", pred, (int) cblk, IndexTupleSize(which));
1267 #endif
1268
1269                 if (!(opaque->flags & F_LEAF))
1270                         gist_dumptree(r, level + 1, cblk, i);
1271         }
1272         ReleaseBuffer(buffer);
1273         pfree(pred);
1274 }
1275
1276 #endif   /* defined GISTDEBUG */
1277
1278 void
1279 gist_redo(XLogRecPtr lsn, XLogRecord *record)
1280 {
1281         elog(STOP, "gist_redo: unimplemented");
1282 }
1283
1284 void
1285 gist_undo(XLogRecPtr lsn, XLogRecord *record)
1286 {
1287         elog(STOP, "gist_undo: unimplemented");
1288 }
1289
1290 void
1291 gist_desc(char *buf, uint8 xl_info, char *rec)
1292 {
1293 }