OSDN Git Service

cdb0343d104cfec50b751edb83dae2239d89687a
[pg-rex/syncrep.git] / src / backend / access / gist / gist.c
1 /*-------------------------------------------------------------------------
2  *
3  * gist.c
4  *        interface routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        src/backend/access/gist/gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/gist_private.h"
19 #include "catalog/index.h"
20 #include "catalog/pg_collation.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "storage/indexfsm.h"
24 #include "utils/memutils.h"
25
26 /* Working state for gistbuild and its callback */
27 typedef struct
28 {
29         GISTSTATE       giststate;
30         int                     numindexattrs;
31         double          indtuples;
32         MemoryContext tmpCtx;
33 } GISTBuildState;
34
35 /* A List of these is used represent a split-in-progress. */
36 typedef struct
37 {
38         Buffer          buf;                    /* the split page "half" */
39         IndexTuple      downlink;               /* downlink for this half. */
40 } GISTPageSplitInfo;
41
42 /* non-export function prototypes */
43 static void gistbuildCallback(Relation index,
44                                   HeapTuple htup,
45                                   Datum *values,
46                                   bool *isnull,
47                                   bool tupleIsAlive,
48                                   void *state);
49 static void gistdoinsert(Relation r,
50                          IndexTuple itup,
51                          Size freespace,
52                          GISTSTATE *GISTstate);
53 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
54 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
55                                  GISTSTATE *giststate,
56                                  IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
57                                  Buffer leftchild);
58 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
59                                 GISTSTATE *giststate, List *splitinfo);
60
61
62 #define ROTATEDIST(d) do { \
63         SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
64         memset(tmp,0,sizeof(SplitedPageLayout)); \
65         tmp->block.blkno = InvalidBlockNumber;  \
66         tmp->buffer = InvalidBuffer;    \
67         tmp->next = (d); \
68         (d)=tmp; \
69 } while(0)
70
71
72 /*
73  * Create and return a temporary memory context for use by GiST. We
74  * _always_ invoke user-provided methods in a temporary memory
75  * context, so that memory leaks in those functions cannot cause
76  * problems. Also, we use some additional temporary contexts in the
77  * GiST code itself, to avoid the need to do some awkward manual
78  * memory management.
79  */
80 MemoryContext
81 createTempGistContext(void)
82 {
83         return AllocSetContextCreate(CurrentMemoryContext,
84                                                                  "GiST temporary context",
85                                                                  ALLOCSET_DEFAULT_MINSIZE,
86                                                                  ALLOCSET_DEFAULT_INITSIZE,
87                                                                  ALLOCSET_DEFAULT_MAXSIZE);
88 }
89
90 /*
91  * Routine to build an index.  Basically calls insert over and over.
92  *
93  * XXX: it would be nice to implement some sort of bulk-loading
94  * algorithm, but it is not clear how to do that.
95  */
96 Datum
97 gistbuild(PG_FUNCTION_ARGS)
98 {
99         Relation        heap = (Relation) PG_GETARG_POINTER(0);
100         Relation        index = (Relation) PG_GETARG_POINTER(1);
101         IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
102         IndexBuildResult *result;
103         double          reltuples;
104         GISTBuildState buildstate;
105         Buffer          buffer;
106         Page            page;
107
108         /*
109          * We expect to be called exactly once for any index relation. If that's
110          * not the case, big trouble's what we have.
111          */
112         if (RelationGetNumberOfBlocks(index) != 0)
113                 elog(ERROR, "index \"%s\" already contains data",
114                          RelationGetRelationName(index));
115
116         /* no locking is needed */
117         initGISTstate(&buildstate.giststate, index);
118
119         /* initialize the root page */
120         buffer = gistNewBuffer(index);
121         Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
122         page = BufferGetPage(buffer);
123
124         START_CRIT_SECTION();
125
126         GISTInitBuffer(buffer, F_LEAF);
127
128         MarkBufferDirty(buffer);
129
130         if (RelationNeedsWAL(index))
131         {
132                 XLogRecPtr      recptr;
133                 XLogRecData rdata;
134
135                 rdata.data = (char *) &(index->rd_node);
136                 rdata.len = sizeof(RelFileNode);
137                 rdata.buffer = InvalidBuffer;
138                 rdata.next = NULL;
139
140                 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
141                 PageSetLSN(page, recptr);
142                 PageSetTLI(page, ThisTimeLineID);
143         }
144         else
145                 PageSetLSN(page, GetXLogRecPtrForTemp());
146
147         UnlockReleaseBuffer(buffer);
148
149         END_CRIT_SECTION();
150
151         /* build the index */
152         buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
153         buildstate.indtuples = 0;
154
155         /*
156          * create a temporary memory context that is reset once for each tuple
157          * inserted into the index
158          */
159         buildstate.tmpCtx = createTempGistContext();
160
161         /* do the heap scan */
162         reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
163                                                                    gistbuildCallback, (void *) &buildstate);
164
165         /* okay, all heap tuples are indexed */
166         MemoryContextDelete(buildstate.tmpCtx);
167
168         freeGISTstate(&buildstate.giststate);
169
170         /*
171          * Return statistics
172          */
173         result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
174
175         result->heap_tuples = reltuples;
176         result->index_tuples = buildstate.indtuples;
177
178         PG_RETURN_POINTER(result);
179 }
180
181 /*
182  * Per-tuple callback from IndexBuildHeapScan
183  */
184 static void
185 gistbuildCallback(Relation index,
186                                   HeapTuple htup,
187                                   Datum *values,
188                                   bool *isnull,
189                                   bool tupleIsAlive,
190                                   void *state)
191 {
192         GISTBuildState *buildstate = (GISTBuildState *) state;
193         IndexTuple      itup;
194         MemoryContext oldCtx;
195
196         oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
197
198         /* form an index tuple and point it at the heap tuple */
199         itup = gistFormTuple(&buildstate->giststate, index,
200                                                  values, isnull, true /* size is currently bogus */ );
201         itup->t_tid = htup->t_self;
202
203         /*
204          * Since we already have the index relation locked, we call gistdoinsert
205          * directly.  Normal access method calls dispatch through gistinsert,
206          * which locks the relation for write.  This is the right thing to do if
207          * you're inserting single tups, but not when you're initializing the
208          * whole index at once.
209          *
210          * In this path we respect the fillfactor setting, whereas insertions
211          * after initial build do not.
212          */
213         gistdoinsert(index, itup,
214                           RelationGetTargetPageFreeSpace(index, GIST_DEFAULT_FILLFACTOR),
215                                  &buildstate->giststate);
216
217         buildstate->indtuples += 1;
218         MemoryContextSwitchTo(oldCtx);
219         MemoryContextReset(buildstate->tmpCtx);
220 }
221
222 /*
223  *      gistbuildempty() -- build an empty gist index in the initialization fork
224  */
225 Datum
226 gistbuildempty(PG_FUNCTION_ARGS)
227 {
228         ereport(ERROR,
229                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
230                          errmsg("unlogged GiST indexes are not supported")));
231
232         PG_RETURN_VOID();
233 }
234
235 /*
236  *      gistinsert -- wrapper for GiST tuple insertion.
237  *
238  *        This is the public interface routine for tuple insertion in GiSTs.
239  *        It doesn't do any work; just locks the relation and passes the buck.
240  */
241 Datum
242 gistinsert(PG_FUNCTION_ARGS)
243 {
244         Relation        r = (Relation) PG_GETARG_POINTER(0);
245         Datum      *values = (Datum *) PG_GETARG_POINTER(1);
246         bool       *isnull = (bool *) PG_GETARG_POINTER(2);
247         ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
248
249 #ifdef NOT_USED
250         Relation        heapRel = (Relation) PG_GETARG_POINTER(4);
251         IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
252 #endif
253         IndexTuple      itup;
254         GISTSTATE       giststate;
255         MemoryContext oldCtx;
256         MemoryContext insertCtx;
257
258         insertCtx = createTempGistContext();
259         oldCtx = MemoryContextSwitchTo(insertCtx);
260
261         initGISTstate(&giststate, r);
262
263         itup = gistFormTuple(&giststate, r,
264                                                  values, isnull, true /* size is currently bogus */ );
265         itup->t_tid = *ht_ctid;
266
267         gistdoinsert(r, itup, 0, &giststate);
268
269         /* cleanup */
270         freeGISTstate(&giststate);
271         MemoryContextSwitchTo(oldCtx);
272         MemoryContextDelete(insertCtx);
273
274         PG_RETURN_BOOL(false);
275 }
276
277
278 /*
279  * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
280  * at that offset is atomically removed along with inserting the new tuples.
281  * This is used to replace a tuple with a new one.
282  *
283  * If 'leftchildbuf' is valid, we're inserting the downlink for the page
284  * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
285  * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
286  *
287  * If there is not enough room on the page, it is split. All the split
288  * pages are kept pinned and locked and returned in *splitinfo, the caller
289  * is responsible for inserting the downlinks for them. However, if
290  * 'buffer' is the root page and it needs to be split, gistplacetopage()
291  * performs the split as one atomic operation, and *splitinfo is set to NIL.
292  * In that case, we continue to hold the root page locked, and the child
293  * pages are released; note that new tuple(s) are *not* on the root page
294  * but in one of the new child pages.
295  */
296 static bool
297 gistplacetopage(GISTInsertState *state, GISTSTATE *giststate,
298                                 Buffer buffer,
299                                 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
300                                 Buffer leftchildbuf,
301                                 List **splitinfo)
302 {
303         Page            page = BufferGetPage(buffer);
304         bool            is_leaf = (GistPageIsLeaf(page)) ? true : false;
305         XLogRecPtr      recptr;
306         int                     i;
307         bool            is_split;
308
309         /*
310          * Refuse to modify a page that's incompletely split. This should not
311          * happen because we finish any incomplete splits while we walk down the
312          * tree. However, it's remotely possible that another concurrent inserter
313          * splits a parent page, and errors out before completing the split. We
314          * will just throw an error in that case, and leave any split we had in
315          * progress unfinished too. The next insert that comes along will clean up
316          * the mess.
317          */
318         if (GistFollowRight(page))
319                 elog(ERROR, "concurrent GiST page split was incomplete");
320
321         *splitinfo = NIL;
322
323         /*
324          * if isupdate, remove old key: This node's key has been modified, either
325          * because a child split occurred or because we needed to adjust our key
326          * for an insert in a child node. Therefore, remove the old version of
327          * this node's key.
328          *
329          * for WAL replay, in the non-split case we handle this by setting up a
330          * one-element todelete array; in the split case, it's handled implicitly
331          * because the tuple vector passed to gistSplit won't include this tuple.
332          */
333         is_split = gistnospace(page, itup, ntup, oldoffnum, state->freespace);
334         if (is_split)
335         {
336                 /* no space for insertion */
337                 IndexTuple *itvec;
338                 int                     tlen;
339                 SplitedPageLayout *dist = NULL,
340                                    *ptr;
341                 BlockNumber oldrlink = InvalidBlockNumber;
342                 GistNSN         oldnsn = {0, 0};
343                 SplitedPageLayout rootpg;
344                 BlockNumber blkno = BufferGetBlockNumber(buffer);
345                 bool            is_rootsplit;
346
347                 is_rootsplit = (blkno == GIST_ROOT_BLKNO);
348
349                 /*
350                  * Form index tuples vector to split. If we're replacing an old tuple,
351                  * remove the old version from the vector.
352                  */
353                 itvec = gistextractpage(page, &tlen);
354                 if (OffsetNumberIsValid(oldoffnum))
355                 {
356                         /* on inner page we should remove old tuple */
357                         int                     pos = oldoffnum - FirstOffsetNumber;
358
359                         tlen--;
360                         if (pos != tlen)
361                                 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
362                 }
363                 itvec = gistjoinvector(itvec, &tlen, itup, ntup);
364                 dist = gistSplit(state->r, page, itvec, tlen, giststate);
365
366                 /*
367                  * Set up pages to work with. Allocate new buffers for all but the
368                  * leftmost page. The original page becomes the new leftmost page, and
369                  * is just replaced with the new contents.
370                  *
371                  * For a root-split, allocate new buffers for all child pages, the
372                  * original page is overwritten with new root page containing
373                  * downlinks to the new child pages.
374                  */
375                 ptr = dist;
376                 if (!is_rootsplit)
377                 {
378                         /* save old rightlink and NSN */
379                         oldrlink = GistPageGetOpaque(page)->rightlink;
380                         oldnsn = GistPageGetOpaque(page)->nsn;
381
382                         dist->buffer = buffer;
383                         dist->block.blkno = BufferGetBlockNumber(buffer);
384                         dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
385
386                         /* clean all flags except F_LEAF */
387                         GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
388
389                         ptr = ptr->next;
390                 }
391                 for (; ptr; ptr = ptr->next)
392                 {
393                         /* Allocate new page */
394                         ptr->buffer = gistNewBuffer(state->r);
395                         GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
396                         ptr->page = BufferGetPage(ptr->buffer);
397                         ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
398                 }
399
400                 /*
401                  * Now that we know whick blocks the new pages go to, set up downlink
402                  * tuples to point to them.
403                  */
404                 for (ptr = dist; ptr; ptr = ptr->next)
405                 {
406                         ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
407                         GistTupleSetValid(ptr->itup);
408                 }
409
410                 /*
411                  * If this is a root split, we construct the new root page with the
412                  * downlinks here directly, instead of requiring the caller to insert
413                  * them. Add the new root page to the list along with the child pages.
414                  */
415                 if (is_rootsplit)
416                 {
417                         IndexTuple *downlinks;
418                         int                     ndownlinks = 0;
419                         int                     i;
420
421                         rootpg.buffer = buffer;
422                         rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
423                         GistPageGetOpaque(rootpg.page)->flags = 0;
424
425                         /* Prepare a vector of all the downlinks */
426                         for (ptr = dist; ptr; ptr = ptr->next)
427                                 ndownlinks++;
428                         downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
429                         for (i = 0, ptr = dist; ptr; ptr = ptr->next)
430                                 downlinks[i++] = ptr->itup;
431
432                         rootpg.block.blkno = GIST_ROOT_BLKNO;
433                         rootpg.block.num = ndownlinks;
434                         rootpg.list = gistfillitupvec(downlinks, ndownlinks,
435                                                                                   &(rootpg.lenlist));
436                         rootpg.itup = NULL;
437
438                         rootpg.next = dist;
439                         dist = &rootpg;
440                 }
441                 else
442                 {
443                         /* Prepare split-info to be returned to caller */
444                         for (ptr = dist; ptr; ptr = ptr->next)
445                         {
446                                 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
447
448                                 si->buf = ptr->buffer;
449                                 si->downlink = ptr->itup;
450                                 *splitinfo = lappend(*splitinfo, si);
451                         }
452                 }
453
454                 /*
455                  * Fill all pages. All the pages are new, ie. freshly allocated empty
456                  * pages, or a temporary copy of the old page.
457                  */
458                 for (ptr = dist; ptr; ptr = ptr->next)
459                 {
460                         char       *data = (char *) (ptr->list);
461
462                         for (i = 0; i < ptr->block.num; i++)
463                         {
464                                 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
465                                         elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
466                                 data += IndexTupleSize((IndexTuple) data);
467                         }
468
469                         /* Set up rightlinks */
470                         if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
471                                 GistPageGetOpaque(ptr->page)->rightlink =
472                                         ptr->next->block.blkno;
473                         else
474                                 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
475
476                         if (ptr->next && !is_rootsplit)
477                                 GistMarkFollowRight(ptr->page);
478                         else
479                                 GistClearFollowRight(ptr->page);
480
481                         /*
482                          * Copy the NSN of the original page to all pages. The
483                          * F_FOLLOW_RIGHT flags ensure that scans will follow the
484                          * rightlinks until the downlinks are inserted.
485                          */
486                         GistPageGetOpaque(ptr->page)->nsn = oldnsn;
487                 }
488
489                 START_CRIT_SECTION();
490
491                 /*
492                  * Must mark buffers dirty before XLogInsert, even though we'll still
493                  * be changing their opaque fields below.
494                  */
495                 for (ptr = dist; ptr; ptr = ptr->next)
496                         MarkBufferDirty(ptr->buffer);
497                 if (BufferIsValid(leftchildbuf))
498                         MarkBufferDirty(leftchildbuf);
499
500                 /*
501                  * The first page in the chain was a temporary working copy meant to
502                  * replace the old page. Copy it over the old page.
503                  */
504                 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
505                 dist->page = BufferGetPage(dist->buffer);
506
507                 /* Write the WAL record */
508                 if (RelationNeedsWAL(state->r))
509                         recptr = gistXLogSplit(state->r->rd_node, blkno, is_leaf,
510                                                                    dist, oldrlink, oldnsn, leftchildbuf);
511                 else
512                         recptr = GetXLogRecPtrForTemp();
513
514                 for (ptr = dist; ptr; ptr = ptr->next)
515                 {
516                         PageSetLSN(ptr->page, recptr);
517                         PageSetTLI(ptr->page, ThisTimeLineID);
518                 }
519
520                 /*
521                  * Return the new child buffers to the caller.
522                  *
523                  * If this was a root split, we've already inserted the downlink
524                  * pointers, in the form of a new root page. Therefore we can release
525                  * all the new buffers, and keep just the root page locked.
526                  */
527                 if (is_rootsplit)
528                 {
529                         for (ptr = dist->next; ptr; ptr = ptr->next)
530                                 UnlockReleaseBuffer(ptr->buffer);
531                 }
532         }
533         else
534         {
535                 /*
536                  * Enough space. We also get here if ntuples==0.
537                  */
538                 START_CRIT_SECTION();
539
540                 if (OffsetNumberIsValid(oldoffnum))
541                         PageIndexTupleDelete(page, oldoffnum);
542                 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
543
544                 MarkBufferDirty(buffer);
545
546                 if (BufferIsValid(leftchildbuf))
547                         MarkBufferDirty(leftchildbuf);
548
549                 if (RelationNeedsWAL(state->r))
550                 {
551                         OffsetNumber ndeloffs = 0,
552                                                 deloffs[1];
553
554                         if (OffsetNumberIsValid(oldoffnum))
555                         {
556                                 deloffs[0] = oldoffnum;
557                                 ndeloffs = 1;
558                         }
559
560                         recptr = gistXLogUpdate(state->r->rd_node, buffer,
561                                                                         deloffs, ndeloffs, itup, ntup,
562                                                                         leftchildbuf);
563
564                         PageSetLSN(page, recptr);
565                         PageSetTLI(page, ThisTimeLineID);
566                 }
567                 else
568                 {
569                         recptr = GetXLogRecPtrForTemp();
570                         PageSetLSN(page, recptr);
571                 }
572
573                 *splitinfo = NIL;
574         }
575
576         /*
577          * If we inserted the downlink for a child page, set NSN and clear
578          * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
579          * follow the rightlink if and only if they looked at the parent page
580          * before we inserted the downlink.
581          *
582          * Note that we do this *after* writing the WAL record. That means that
583          * the possible full page image in the WAL record does not include these
584          * changes, and they must be replayed even if the page is restored from
585          * the full page image. There's a chicken-and-egg problem: if we updated
586          * the child pages first, we wouldn't know the recptr of the WAL record
587          * we're about to write.
588          */
589         if (BufferIsValid(leftchildbuf))
590         {
591                 Page            leftpg = BufferGetPage(leftchildbuf);
592
593                 GistPageGetOpaque(leftpg)->nsn = recptr;
594                 GistClearFollowRight(leftpg);
595
596                 PageSetLSN(leftpg, recptr);
597                 PageSetTLI(leftpg, ThisTimeLineID);
598         }
599
600         END_CRIT_SECTION();
601
602         return is_split;
603 }
604
605 /*
606  * Workhouse routine for doing insertion into a GiST index. Note that
607  * this routine assumes it is invoked in a short-lived memory context,
608  * so it does not bother releasing palloc'd allocations.
609  */
610 static void
611 gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
612 {
613         ItemId          iid;
614         IndexTuple      idxtuple;
615         GISTInsertStack firststack;
616         GISTInsertStack *stack;
617         GISTInsertState state;
618         bool            xlocked = false;
619
620         memset(&state, 0, sizeof(GISTInsertState));
621         state.freespace = freespace;
622         state.r = r;
623
624         /* Start from the root */
625         firststack.blkno = GIST_ROOT_BLKNO;
626         firststack.lsn.xrecoff = 0;
627         firststack.parent = NULL;
628         state.stack = stack = &firststack;
629
630         /*
631          * Walk down along the path of smallest penalty, updating the parent
632          * pointers with the key we're inserting as we go. If we crash in the
633          * middle, the tree is consistent, although the possible parent updates
634          * were a waste.
635          */
636         for (;;)
637         {
638                 if (XLogRecPtrIsInvalid(stack->lsn))
639                         stack->buffer = ReadBuffer(state.r, stack->blkno);
640
641                 /*
642                  * Be optimistic and grab shared lock first. Swap it for an exclusive
643                  * lock later if we need to update the page.
644                  */
645                 if (!xlocked)
646                 {
647                         LockBuffer(stack->buffer, GIST_SHARE);
648                         gistcheckpage(state.r, stack->buffer);
649                 }
650
651                 stack->page = (Page) BufferGetPage(stack->buffer);
652                 stack->lsn = PageGetLSN(stack->page);
653                 Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
654
655                 /*
656                  * If this page was split but the downlink was never inserted to the
657                  * parent because the inserting backend crashed before doing that, fix
658                  * that now.
659                  */
660                 if (GistFollowRight(stack->page))
661                 {
662                         if (!xlocked)
663                         {
664                                 LockBuffer(stack->buffer, GIST_UNLOCK);
665                                 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
666                                 xlocked = true;
667                                 /* someone might've completed the split when we unlocked */
668                                 if (!GistFollowRight(stack->page))
669                                         continue;
670                         }
671                         gistfixsplit(&state, giststate);
672
673                         UnlockReleaseBuffer(stack->buffer);
674                         xlocked = false;
675                         state.stack = stack = stack->parent;
676                         continue;
677                 }
678
679                 if (stack->blkno != GIST_ROOT_BLKNO &&
680                         XLByteLT(stack->parent->lsn,
681                                          GistPageGetOpaque(stack->page)->nsn))
682                 {
683                         /*
684                          * Concurrent split detected. There's no guarantee that the
685                          * downlink for this page is consistent with the tuple we're
686                          * inserting anymore, so go back to parent and rechoose the best
687                          * child.
688                          */
689                         UnlockReleaseBuffer(stack->buffer);
690                         xlocked = false;
691                         state.stack = stack = stack->parent;
692                         continue;
693                 }
694
695                 if (!GistPageIsLeaf(stack->page))
696                 {
697                         /*
698                          * This is an internal page so continue to walk down the tree.
699                          * Find the child node that has the minimum insertion penalty.
700                          */
701                         BlockNumber childblkno;
702                         IndexTuple      newtup;
703                         GISTInsertStack *item;
704
705                         stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate);
706                         iid = PageGetItemId(stack->page, stack->childoffnum);
707                         idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
708                         childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
709
710                         /*
711                          * Check that it's not a leftover invalid tuple from pre-9.1
712                          */
713                         if (GistTupleIsInvalid(idxtuple))
714                                 ereport(ERROR,
715                                                 (errmsg("index \"%s\" contains an inner tuple marked as invalid",
716                                                                 RelationGetRelationName(r)),
717                                                  errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
718                                                  errhint("Please REINDEX it.")));
719
720                         /*
721                          * Check that the key representing the target child node is
722                          * consistent with the key we're inserting. Update it if it's not.
723                          */
724                         newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
725                         if (newtup)
726                         {
727                                 /*
728                                  * Swap shared lock for an exclusive one. Beware, the page may
729                                  * change while we unlock/lock the page...
730                                  */
731                                 if (!xlocked)
732                                 {
733                                         LockBuffer(stack->buffer, GIST_UNLOCK);
734                                         LockBuffer(stack->buffer, GIST_EXCLUSIVE);
735                                         xlocked = true;
736                                         stack->page = (Page) BufferGetPage(stack->buffer);
737
738                                         if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn))
739                                         {
740                                                 /* the page was changed while we unlocked it, retry */
741                                                 continue;
742                                         }
743                                 }
744
745                                 /*
746                                  * Update the tuple.
747                                  *
748                                  * We still hold the lock after gistinserttuples(), but it
749                                  * might have to split the page to make the updated tuple fit.
750                                  * In that case the updated tuple might migrate to the other
751                                  * half of the split, so we have to go back to the parent and
752                                  * descend back to the half that's a better fit for the new
753                                  * tuple.
754                                  */
755                                 if (gistinserttuples(&state, stack, giststate, &newtup, 1,
756                                                                          stack->childoffnum, InvalidBuffer))
757                                 {
758                                         /*
759                                          * If this was a root split, the root page continues to be
760                                          * the parent and the updated tuple went to one of the
761                                          * child pages, so we just need to retry from the root
762                                          * page.
763                                          */
764                                         if (stack->blkno != GIST_ROOT_BLKNO)
765                                         {
766                                                 UnlockReleaseBuffer(stack->buffer);
767                                                 xlocked = false;
768                                                 state.stack = stack = stack->parent;
769                                         }
770                                         continue;
771                                 }
772                         }
773                         LockBuffer(stack->buffer, GIST_UNLOCK);
774                         xlocked = false;
775
776                         /* descend to the chosen child */
777                         item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
778                         item->blkno = childblkno;
779                         item->parent = stack;
780                         state.stack = stack = item;
781                 }
782                 else
783                 {
784                         /*
785                          * Leaf page. Insert the new key. We've already updated all the
786                          * parents on the way down, but we might have to split the page if
787                          * it doesn't fit. gistinserthere() will take care of that.
788                          */
789
790                         /*
791                          * Swap shared lock for an exclusive one. Be careful, the page may
792                          * change while we unlock/lock the page...
793                          */
794                         if (!xlocked)
795                         {
796                                 LockBuffer(stack->buffer, GIST_UNLOCK);
797                                 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
798                                 xlocked = true;
799                                 stack->page = (Page) BufferGetPage(stack->buffer);
800                                 stack->lsn = PageGetLSN(stack->page);
801
802                                 if (stack->blkno == GIST_ROOT_BLKNO)
803                                 {
804                                         /*
805                                          * the only page that can become inner instead of leaf is
806                                          * the root page, so for root we should recheck it
807                                          */
808                                         if (!GistPageIsLeaf(stack->page))
809                                         {
810                                                 /*
811                                                  * very rare situation: during unlock/lock index with
812                                                  * number of pages = 1 was increased
813                                                  */
814                                                 LockBuffer(stack->buffer, GIST_UNLOCK);
815                                                 xlocked = false;
816                                                 continue;
817                                         }
818
819                                         /*
820                                          * we don't need to check root split, because checking
821                                          * leaf/inner is enough to recognize split for root
822                                          */
823                                 }
824                                 else if (GistFollowRight(stack->page) ||
825                                                  XLByteLT(stack->parent->lsn,
826                                                                   GistPageGetOpaque(stack->page)->nsn))
827                                 {
828                                         /*
829                                          * The page was split while we momentarily unlocked the
830                                          * page. Go back to parent.
831                                          */
832                                         UnlockReleaseBuffer(stack->buffer);
833                                         xlocked = false;
834                                         state.stack = stack = stack->parent;
835                                         continue;
836                                 }
837                         }
838
839                         /* now state.stack->(page, buffer and blkno) points to leaf page */
840
841                         gistinserttuples(&state, stack, giststate, &itup, 1,
842                                                          InvalidOffsetNumber, InvalidBuffer);
843                         LockBuffer(stack->buffer, GIST_UNLOCK);
844
845                         /* Release any pins we might still hold before exiting */
846                         for (; stack; stack = stack->parent)
847                                 ReleaseBuffer(stack->buffer);
848                         break;
849                 }
850         }
851 }
852
853 /*
854  * Traverse the tree to find path from root page to specified "child" block.
855  *
856  * returns from the beginning of closest parent;
857  *
858  * To prevent deadlocks, this should lock only one page at a time.
859  */
860 GISTInsertStack *
861 gistFindPath(Relation r, BlockNumber child)
862 {
863         Page            page;
864         Buffer          buffer;
865         OffsetNumber i,
866                                 maxoff;
867         ItemId          iid;
868         IndexTuple      idxtuple;
869         GISTInsertStack *top,
870                            *tail,
871                            *ptr;
872         BlockNumber blkno;
873
874         top = tail = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
875         top->blkno = GIST_ROOT_BLKNO;
876
877         while (top && top->blkno != child)
878         {
879                 buffer = ReadBuffer(r, top->blkno);
880                 LockBuffer(buffer, GIST_SHARE);
881                 gistcheckpage(r, buffer);
882                 page = (Page) BufferGetPage(buffer);
883
884                 if (GistPageIsLeaf(page))
885                 {
886                         /* we can safety go away, follows only leaf pages */
887                         UnlockReleaseBuffer(buffer);
888                         return NULL;
889                 }
890
891                 top->lsn = PageGetLSN(page);
892
893                 /*
894                  * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
895                  * downlink. This should not normally happen..
896                  */
897                 if (GistFollowRight(page))
898                         elog(ERROR, "concurrent GiST page split was incomplete");
899
900                 if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
901                         GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
902                 {
903                         /* page splited while we thinking of... */
904                         ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
905                         ptr->blkno = GistPageGetOpaque(page)->rightlink;
906                         ptr->childoffnum = InvalidOffsetNumber;
907                         ptr->parent = top;
908                         ptr->next = NULL;
909                         tail->next = ptr;
910                         tail = ptr;
911                 }
912
913                 maxoff = PageGetMaxOffsetNumber(page);
914
915                 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
916                 {
917                         iid = PageGetItemId(page, i);
918                         idxtuple = (IndexTuple) PageGetItem(page, iid);
919                         blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
920                         if (blkno == child)
921                         {
922                                 OffsetNumber poff = InvalidOffsetNumber;
923
924                                 /* make childs links */
925                                 ptr = top;
926                                 while (ptr->parent)
927                                 {
928                                         /* move childoffnum.. */
929                                         if (ptr == top)
930                                         {
931                                                 /* first iteration */
932                                                 poff = ptr->parent->childoffnum;
933                                                 ptr->parent->childoffnum = ptr->childoffnum;
934                                         }
935                                         else
936                                         {
937                                                 OffsetNumber tmp = ptr->parent->childoffnum;
938
939                                                 ptr->parent->childoffnum = poff;
940                                                 poff = tmp;
941                                         }
942                                         ptr = ptr->parent;
943                                 }
944                                 top->childoffnum = i;
945                                 UnlockReleaseBuffer(buffer);
946                                 return top;
947                         }
948                         else
949                         {
950                                 /* Install next inner page to the end of stack */
951                                 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
952                                 ptr->blkno = blkno;
953                                 ptr->childoffnum = i;   /* set offsetnumber of child to child
954                                                                                  * !!! */
955                                 ptr->parent = top;
956                                 ptr->next = NULL;
957                                 tail->next = ptr;
958                                 tail = ptr;
959                         }
960                 }
961
962                 UnlockReleaseBuffer(buffer);
963                 top = top->next;
964         }
965
966         return NULL;
967 }
968
969 /*
970  * Updates the stack so that child->parent is the correct parent of the
971  * child. child->parent must be exclusively locked on entry, and will
972  * remain so at exit, but it might not be the same page anymore.
973  */
974 static void
975 gistFindCorrectParent(Relation r, GISTInsertStack *child)
976 {
977         GISTInsertStack *parent = child->parent;
978
979         gistcheckpage(r, parent->buffer);
980         parent->page = (Page) BufferGetPage(parent->buffer);
981
982         /* here we don't need to distinguish between split and page update */
983         if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page)))
984         {
985                 /* parent is changed, look child in right links until found */
986                 OffsetNumber i,
987                                         maxoff;
988                 ItemId          iid;
989                 IndexTuple      idxtuple;
990                 GISTInsertStack *ptr;
991
992                 while (true)
993                 {
994                         maxoff = PageGetMaxOffsetNumber(parent->page);
995                         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
996                         {
997                                 iid = PageGetItemId(parent->page, i);
998                                 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
999                                 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1000                                 {
1001                                         /* yes!!, found */
1002                                         parent->childoffnum = i;
1003                                         return;
1004                                 }
1005                         }
1006
1007                         parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1008                         UnlockReleaseBuffer(parent->buffer);
1009                         if (parent->blkno == InvalidBlockNumber)
1010
1011                                 /*
1012                                  * end of chain and still didn't found parent, It's very-very
1013                                  * rare situation when root splited
1014                                  */
1015                                 break;
1016                         parent->buffer = ReadBuffer(r, parent->blkno);
1017                         LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1018                         gistcheckpage(r, parent->buffer);
1019                         parent->page = (Page) BufferGetPage(parent->buffer);
1020                 }
1021
1022                 /*
1023                  * awful!!, we need search tree to find parent ... , but before we
1024                  * should release all old parent
1025                  */
1026
1027                 ptr = child->parent->parent;    /* child->parent already released
1028                                                                                  * above */
1029                 while (ptr)
1030                 {
1031                         ReleaseBuffer(ptr->buffer);
1032                         ptr = ptr->parent;
1033                 }
1034
1035                 /* ok, find new path */
1036                 ptr = parent = gistFindPath(r, child->blkno);
1037                 Assert(ptr != NULL);
1038
1039                 /* read all buffers as expected by caller */
1040                 /* note we don't lock them or gistcheckpage them here! */
1041                 while (ptr)
1042                 {
1043                         ptr->buffer = ReadBuffer(r, ptr->blkno);
1044                         ptr->page = (Page) BufferGetPage(ptr->buffer);
1045                         ptr = ptr->parent;
1046                 }
1047
1048                 /* install new chain of parents to stack */
1049                 child->parent = parent;
1050
1051                 /* make recursive call to normal processing */
1052                 LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1053                 gistFindCorrectParent(r, child);
1054         }
1055
1056         return;
1057 }
1058
1059 /*
1060  * Form a downlink pointer for the page in 'buf'.
1061  */
1062 static IndexTuple
1063 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1064                                  GISTInsertStack *stack)
1065 {
1066         Page            page = BufferGetPage(buf);
1067         OffsetNumber maxoff;
1068         OffsetNumber offset;
1069         IndexTuple      downlink = NULL;
1070
1071         maxoff = PageGetMaxOffsetNumber(page);
1072         for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1073         {
1074                 IndexTuple      ituple = (IndexTuple)
1075                 PageGetItem(page, PageGetItemId(page, offset));
1076
1077                 if (downlink == NULL)
1078                         downlink = CopyIndexTuple(ituple);
1079                 else
1080                 {
1081                         IndexTuple      newdownlink;
1082
1083                         newdownlink = gistgetadjusted(rel, downlink, ituple,
1084                                                                                   giststate);
1085                         if (newdownlink)
1086                                 downlink = newdownlink;
1087                 }
1088         }
1089
1090         /*
1091          * If the page is completely empty, we can't form a meaningful downlink
1092          * for it. But we have to insert a downlink for the page. Any key will do,
1093          * as long as its consistent with the downlink of parent page, so that we
1094          * can legally insert it to the parent. A minimal one that matches as few
1095          * scans as possible would be best, to keep scans from doing useless work,
1096          * but we don't know how to construct that. So we just use the downlink of
1097          * the original page that was split - that's as far from optimal as it can
1098          * get but will do..
1099          */
1100         if (!downlink)
1101         {
1102                 ItemId          iid;
1103
1104                 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1105                 gistFindCorrectParent(rel, stack);
1106                 iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum);
1107                 downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1108                 downlink = CopyIndexTuple(downlink);
1109                 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1110         }
1111
1112         ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1113         GistTupleSetValid(downlink);
1114
1115         return downlink;
1116 }
1117
1118
1119 /*
1120  * Complete the incomplete split of state->stack->page.
1121  */
1122 static void
1123 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1124 {
1125         GISTInsertStack *stack = state->stack;
1126         Buffer          buf;
1127         Page            page;
1128         List       *splitinfo = NIL;
1129
1130         elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1131                  RelationGetRelationName(state->r), stack->blkno);
1132
1133         Assert(GistFollowRight(stack->page));
1134         Assert(OffsetNumberIsValid(stack->parent->childoffnum));
1135
1136         buf = stack->buffer;
1137
1138         /*
1139          * Read the chain of split pages, following the rightlinks. Construct a
1140          * downlink tuple for each page.
1141          */
1142         for (;;)
1143         {
1144                 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1145                 IndexTuple      downlink;
1146
1147                 page = BufferGetPage(buf);
1148
1149                 /* Form the new downlink tuples to insert to parent */
1150                 downlink = gistformdownlink(state->r, buf, giststate, stack);
1151
1152                 si->buf = buf;
1153                 si->downlink = downlink;
1154
1155                 splitinfo = lappend(splitinfo, si);
1156
1157                 if (GistFollowRight(page))
1158                 {
1159                         /* lock next page */
1160                         buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1161                         LockBuffer(buf, GIST_EXCLUSIVE);
1162                 }
1163                 else
1164                         break;
1165         }
1166
1167         /* Insert the downlinks */
1168         gistfinishsplit(state, stack, giststate, splitinfo);
1169 }
1170
1171 /*
1172  * Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples
1173  * replace an old tuple at oldoffnum. The caller must hold an exclusive lock
1174  * on the page.
1175  *
1176  * If leftchild is valid, we're inserting/updating the downlink for the
1177  * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and
1178  * update NSN on leftchild, atomically with the insertion of the downlink.
1179  *
1180  * Returns 'true' if the page had to be split. On return, we will continue
1181  * to hold an exclusive lock on state->stack->buffer, but if we had to split
1182  * the page, it might not contain the tuple we just inserted/updated.
1183  */
1184 static bool
1185 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1186                                  GISTSTATE *giststate,
1187                                  IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1188                                  Buffer leftchild)
1189 {
1190         List       *splitinfo;
1191         bool            is_split;
1192
1193         is_split = gistplacetopage(state, giststate, stack->buffer,
1194                                                            tuples, ntup, oldoffnum,
1195                                                            leftchild,
1196                                                            &splitinfo);
1197         if (splitinfo)
1198                 gistfinishsplit(state, stack, giststate, splitinfo);
1199
1200         return is_split;
1201 }
1202
1203 /*
1204  * Finish an incomplete split by inserting/updating the downlinks in
1205  * parent page. 'splitinfo' contains all the child pages, exclusively-locked,
1206  * involved in the split, from left-to-right.
1207  */
1208 static void
1209 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1210                                 GISTSTATE *giststate, List *splitinfo)
1211 {
1212         ListCell   *lc;
1213         List       *reversed;
1214         GISTPageSplitInfo *right;
1215         GISTPageSplitInfo *left;
1216         IndexTuple      tuples[2];
1217
1218         /* A split always contains at least two halves */
1219         Assert(list_length(splitinfo) >= 2);
1220
1221         /*
1222          * We need to insert downlinks for each new page, and update the downlink
1223          * for the original (leftmost) page in the split. Begin at the rightmost
1224          * page, inserting one downlink at a time until there's only two pages
1225          * left. Finally insert the downlink for the last new page and update the
1226          * downlink for the original page as one operation.
1227          */
1228
1229         /* for convenience, create a copy of the list in reverse order */
1230         reversed = NIL;
1231         foreach(lc, splitinfo)
1232         {
1233                 reversed = lcons(lfirst(lc), reversed);
1234         }
1235
1236         LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1237         gistFindCorrectParent(state->r, stack);
1238
1239         while (list_length(reversed) > 2)
1240         {
1241                 right = (GISTPageSplitInfo *) linitial(reversed);
1242                 left = (GISTPageSplitInfo *) lsecond(reversed);
1243
1244                 if (gistinserttuples(state, stack->parent, giststate,
1245                                                          &right->downlink, 1,
1246                                                          InvalidOffsetNumber,
1247                                                          left->buf))
1248                 {
1249                         /*
1250                          * If the parent page was split, need to relocate the original
1251                          * parent pointer.
1252                          */
1253                         gistFindCorrectParent(state->r, stack);
1254                 }
1255                 UnlockReleaseBuffer(right->buf);
1256                 reversed = list_delete_first(reversed);
1257         }
1258
1259         right = (GISTPageSplitInfo *) linitial(reversed);
1260         left = (GISTPageSplitInfo *) lsecond(reversed);
1261
1262         /*
1263          * Finally insert downlink for the remaining right page and update the
1264          * downlink for the original page to not contain the tuples that were
1265          * moved to the new pages.
1266          */
1267         tuples[0] = left->downlink;
1268         tuples[1] = right->downlink;
1269         gistinserttuples(state, stack->parent, giststate,
1270                                          tuples, 2,
1271                                          stack->parent->childoffnum,
1272                                          left->buf);
1273         LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1274         UnlockReleaseBuffer(right->buf);
1275         Assert(left->buf == stack->buffer);
1276 }
1277
1278 /*
1279  * gistSplit -- split a page in the tree and fill struct
1280  * used for XLOG and real writes buffers. Function is recursive, ie
1281  * it will split page until keys will fit in every page.
1282  */
1283 SplitedPageLayout *
1284 gistSplit(Relation r,
1285                   Page page,
1286                   IndexTuple *itup,             /* contains compressed entry */
1287                   int len,
1288                   GISTSTATE *giststate)
1289 {
1290         IndexTuple *lvectup,
1291                            *rvectup;
1292         GistSplitVector v;
1293         GistEntryVector *entryvec;
1294         int                     i;
1295         SplitedPageLayout *res = NULL;
1296
1297         /* generate the item array */
1298         entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
1299         entryvec->n = len + 1;
1300
1301         memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1302         memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1303         gistSplitByKey(r, page, itup, len, giststate,
1304                                    &v, entryvec, 0);
1305
1306         /* form left and right vector */
1307         lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1308         rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1309
1310         for (i = 0; i < v.splitVector.spl_nleft; i++)
1311                 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1312
1313         for (i = 0; i < v.splitVector.spl_nright; i++)
1314                 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1315
1316         /* finalize splitting (may need another split) */
1317         if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1318         {
1319                 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1320         }
1321         else
1322         {
1323                 ROTATEDIST(res);
1324                 res->block.num = v.splitVector.spl_nright;
1325                 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1326                 res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1327         }
1328
1329         if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1330         {
1331                 SplitedPageLayout *resptr,
1332                                    *subres;
1333
1334                 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1335
1336                 /* install on list's tail */
1337                 while (resptr->next)
1338                         resptr = resptr->next;
1339
1340                 resptr->next = res;
1341                 res = subres;
1342         }
1343         else
1344         {
1345                 ROTATEDIST(res);
1346                 res->block.num = v.splitVector.spl_nleft;
1347                 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1348                 res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1349         }
1350
1351         return res;
1352 }
1353
1354 /*
1355  * Fill a GISTSTATE with information about the index
1356  */
1357 void
1358 initGISTstate(GISTSTATE *giststate, Relation index)
1359 {
1360         int                     i;
1361
1362         if (index->rd_att->natts > INDEX_MAX_KEYS)
1363                 elog(ERROR, "numberOfAttributes %d > %d",
1364                          index->rd_att->natts, INDEX_MAX_KEYS);
1365
1366         giststate->tupdesc = index->rd_att;
1367
1368         for (i = 0; i < index->rd_att->natts; i++)
1369         {
1370                 fmgr_info_copy(&(giststate->consistentFn[i]),
1371                                            index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1372                                            CurrentMemoryContext);
1373                 fmgr_info_copy(&(giststate->unionFn[i]),
1374                                            index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1375                                            CurrentMemoryContext);
1376                 fmgr_info_copy(&(giststate->compressFn[i]),
1377                                            index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1378                                            CurrentMemoryContext);
1379                 fmgr_info_copy(&(giststate->decompressFn[i]),
1380                                            index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1381                                            CurrentMemoryContext);
1382                 fmgr_info_copy(&(giststate->penaltyFn[i]),
1383                                            index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1384                                            CurrentMemoryContext);
1385                 fmgr_info_copy(&(giststate->picksplitFn[i]),
1386                                            index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1387                                            CurrentMemoryContext);
1388                 fmgr_info_copy(&(giststate->equalFn[i]),
1389                                            index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1390                                            CurrentMemoryContext);
1391                 /* opclasses are not required to provide a Distance method */
1392                 if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1393                         fmgr_info_copy(&(giststate->distanceFn[i]),
1394                                                  index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1395                                                    CurrentMemoryContext);
1396                 else
1397                         giststate->distanceFn[i].fn_oid = InvalidOid;
1398
1399                 /*
1400                  * If the index column has a specified collation, we should honor that
1401                  * while doing comparisons.  However, we may have a collatable storage
1402                  * type for a noncollatable indexed data type.  If there's no index
1403                  * collation then specify default collation in case the support
1404                  * functions need collation.  This is harmless if the support
1405                  * functions don't care about collation, so we just do it
1406                  * unconditionally.  (We could alternatively call get_typcollation,
1407                  * but that seems like expensive overkill --- there aren't going to be
1408                  * any cases where a GiST storage type has a nondefault collation.)
1409                  */
1410                 if (OidIsValid(index->rd_indcollation[i]))
1411                         giststate->supportCollation[i] = index->rd_indcollation[i];
1412                 else
1413                         giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1414         }
1415 }
1416
1417 void
1418 freeGISTstate(GISTSTATE *giststate)
1419 {
1420         /* no work */
1421 }