OSDN Git Service

Fix typo in sslmode documentation
[pg-rex/syncrep.git] / src / backend / access / gist / gist.c
1 /*-------------------------------------------------------------------------
2  *
3  * gist.c
4  *        interface routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        src/backend/access/gist/gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/gist_private.h"
19 #include "catalog/index.h"
20 #include "catalog/pg_collation.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "storage/indexfsm.h"
24 #include "utils/memutils.h"
25 #include "utils/rel.h"
26
27 /* Working state for gistbuild and its callback */
28 typedef struct
29 {
30         GISTSTATE       giststate;
31         int                     numindexattrs;
32         double          indtuples;
33         MemoryContext tmpCtx;
34 } GISTBuildState;
35
36 /* A List of these is used represent a split-in-progress. */
37 typedef struct
38 {
39         Buffer          buf;                    /* the split page "half" */
40         IndexTuple      downlink;               /* downlink for this half. */
41 } GISTPageSplitInfo;
42
43 /* non-export function prototypes */
44 static void gistbuildCallback(Relation index,
45                                   HeapTuple htup,
46                                   Datum *values,
47                                   bool *isnull,
48                                   bool tupleIsAlive,
49                                   void *state);
50 static void gistdoinsert(Relation r,
51                          IndexTuple itup,
52                          Size freespace,
53                          GISTSTATE *GISTstate);
54 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
55 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
56                                  GISTSTATE *giststate,
57                                  IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
58                                  Buffer leftchild);
59 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
60                                 GISTSTATE *giststate, List *splitinfo);
61
62
63 #define ROTATEDIST(d) do { \
64         SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
65         memset(tmp,0,sizeof(SplitedPageLayout)); \
66         tmp->block.blkno = InvalidBlockNumber;  \
67         tmp->buffer = InvalidBuffer;    \
68         tmp->next = (d); \
69         (d)=tmp; \
70 } while(0)
71
72
73 /*
74  * Create and return a temporary memory context for use by GiST. We
75  * _always_ invoke user-provided methods in a temporary memory
76  * context, so that memory leaks in those functions cannot cause
77  * problems. Also, we use some additional temporary contexts in the
78  * GiST code itself, to avoid the need to do some awkward manual
79  * memory management.
80  */
81 MemoryContext
82 createTempGistContext(void)
83 {
84         return AllocSetContextCreate(CurrentMemoryContext,
85                                                                  "GiST temporary context",
86                                                                  ALLOCSET_DEFAULT_MINSIZE,
87                                                                  ALLOCSET_DEFAULT_INITSIZE,
88                                                                  ALLOCSET_DEFAULT_MAXSIZE);
89 }
90
91 /*
92  * Routine to build an index.  Basically calls insert over and over.
93  *
94  * XXX: it would be nice to implement some sort of bulk-loading
95  * algorithm, but it is not clear how to do that.
96  */
97 Datum
98 gistbuild(PG_FUNCTION_ARGS)
99 {
100         Relation        heap = (Relation) PG_GETARG_POINTER(0);
101         Relation        index = (Relation) PG_GETARG_POINTER(1);
102         IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
103         IndexBuildResult *result;
104         double          reltuples;
105         GISTBuildState buildstate;
106         Buffer          buffer;
107         Page            page;
108
109         /*
110          * We expect to be called exactly once for any index relation. If that's
111          * not the case, big trouble's what we have.
112          */
113         if (RelationGetNumberOfBlocks(index) != 0)
114                 elog(ERROR, "index \"%s\" already contains data",
115                          RelationGetRelationName(index));
116
117         /* no locking is needed */
118         initGISTstate(&buildstate.giststate, index);
119
120         /* initialize the root page */
121         buffer = gistNewBuffer(index);
122         Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
123         page = BufferGetPage(buffer);
124
125         START_CRIT_SECTION();
126
127         GISTInitBuffer(buffer, F_LEAF);
128
129         MarkBufferDirty(buffer);
130
131         if (RelationNeedsWAL(index))
132         {
133                 XLogRecPtr      recptr;
134                 XLogRecData rdata;
135
136                 rdata.data = (char *) &(index->rd_node);
137                 rdata.len = sizeof(RelFileNode);
138                 rdata.buffer = InvalidBuffer;
139                 rdata.next = NULL;
140
141                 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
142                 PageSetLSN(page, recptr);
143                 PageSetTLI(page, ThisTimeLineID);
144         }
145         else
146                 PageSetLSN(page, GetXLogRecPtrForTemp());
147
148         UnlockReleaseBuffer(buffer);
149
150         END_CRIT_SECTION();
151
152         /* build the index */
153         buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
154         buildstate.indtuples = 0;
155
156         /*
157          * create a temporary memory context that is reset once for each tuple
158          * inserted into the index
159          */
160         buildstate.tmpCtx = createTempGistContext();
161
162         /* do the heap scan */
163         reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
164                                                                    gistbuildCallback, (void *) &buildstate);
165
166         /* okay, all heap tuples are indexed */
167         MemoryContextDelete(buildstate.tmpCtx);
168
169         freeGISTstate(&buildstate.giststate);
170
171         /*
172          * Return statistics
173          */
174         result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
175
176         result->heap_tuples = reltuples;
177         result->index_tuples = buildstate.indtuples;
178
179         PG_RETURN_POINTER(result);
180 }
181
182 /*
183  * Per-tuple callback from IndexBuildHeapScan
184  */
185 static void
186 gistbuildCallback(Relation index,
187                                   HeapTuple htup,
188                                   Datum *values,
189                                   bool *isnull,
190                                   bool tupleIsAlive,
191                                   void *state)
192 {
193         GISTBuildState *buildstate = (GISTBuildState *) state;
194         IndexTuple      itup;
195         MemoryContext oldCtx;
196
197         oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
198
199         /* form an index tuple and point it at the heap tuple */
200         itup = gistFormTuple(&buildstate->giststate, index,
201                                                  values, isnull, true /* size is currently bogus */ );
202         itup->t_tid = htup->t_self;
203
204         /*
205          * Since we already have the index relation locked, we call gistdoinsert
206          * directly.  Normal access method calls dispatch through gistinsert,
207          * which locks the relation for write.  This is the right thing to do if
208          * you're inserting single tups, but not when you're initializing the
209          * whole index at once.
210          *
211          * In this path we respect the fillfactor setting, whereas insertions
212          * after initial build do not.
213          */
214         gistdoinsert(index, itup,
215                           RelationGetTargetPageFreeSpace(index, GIST_DEFAULT_FILLFACTOR),
216                                  &buildstate->giststate);
217
218         buildstate->indtuples += 1;
219         MemoryContextSwitchTo(oldCtx);
220         MemoryContextReset(buildstate->tmpCtx);
221 }
222
223 /*
224  *      gistbuildempty() -- build an empty gist index in the initialization fork
225  */
226 Datum
227 gistbuildempty(PG_FUNCTION_ARGS)
228 {
229         ereport(ERROR,
230                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
231                          errmsg("unlogged GiST indexes are not supported")));
232
233         PG_RETURN_VOID();
234 }
235
236 /*
237  *      gistinsert -- wrapper for GiST tuple insertion.
238  *
239  *        This is the public interface routine for tuple insertion in GiSTs.
240  *        It doesn't do any work; just locks the relation and passes the buck.
241  */
242 Datum
243 gistinsert(PG_FUNCTION_ARGS)
244 {
245         Relation        r = (Relation) PG_GETARG_POINTER(0);
246         Datum      *values = (Datum *) PG_GETARG_POINTER(1);
247         bool       *isnull = (bool *) PG_GETARG_POINTER(2);
248         ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
249
250 #ifdef NOT_USED
251         Relation        heapRel = (Relation) PG_GETARG_POINTER(4);
252         IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
253 #endif
254         IndexTuple      itup;
255         GISTSTATE       giststate;
256         MemoryContext oldCtx;
257         MemoryContext insertCtx;
258
259         insertCtx = createTempGistContext();
260         oldCtx = MemoryContextSwitchTo(insertCtx);
261
262         initGISTstate(&giststate, r);
263
264         itup = gistFormTuple(&giststate, r,
265                                                  values, isnull, true /* size is currently bogus */ );
266         itup->t_tid = *ht_ctid;
267
268         gistdoinsert(r, itup, 0, &giststate);
269
270         /* cleanup */
271         freeGISTstate(&giststate);
272         MemoryContextSwitchTo(oldCtx);
273         MemoryContextDelete(insertCtx);
274
275         PG_RETURN_BOOL(false);
276 }
277
278
279 /*
280  * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
281  * at that offset is atomically removed along with inserting the new tuples.
282  * This is used to replace a tuple with a new one.
283  *
284  * If 'leftchildbuf' is valid, we're inserting the downlink for the page
285  * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
286  * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
287  *
288  * If there is not enough room on the page, it is split. All the split
289  * pages are kept pinned and locked and returned in *splitinfo, the caller
290  * is responsible for inserting the downlinks for them. However, if
291  * 'buffer' is the root page and it needs to be split, gistplacetopage()
292  * performs the split as one atomic operation, and *splitinfo is set to NIL.
293  * In that case, we continue to hold the root page locked, and the child
294  * pages are released; note that new tuple(s) are *not* on the root page
295  * but in one of the new child pages.
296  */
297 static bool
298 gistplacetopage(GISTInsertState *state, GISTSTATE *giststate,
299                                 Buffer buffer,
300                                 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
301                                 Buffer leftchildbuf,
302                                 List **splitinfo)
303 {
304         Page            page = BufferGetPage(buffer);
305         bool            is_leaf = (GistPageIsLeaf(page)) ? true : false;
306         XLogRecPtr      recptr;
307         int                     i;
308         bool            is_split;
309
310         /*
311          * Refuse to modify a page that's incompletely split. This should not
312          * happen because we finish any incomplete splits while we walk down the
313          * tree. However, it's remotely possible that another concurrent inserter
314          * splits a parent page, and errors out before completing the split. We
315          * will just throw an error in that case, and leave any split we had in
316          * progress unfinished too. The next insert that comes along will clean up
317          * the mess.
318          */
319         if (GistFollowRight(page))
320                 elog(ERROR, "concurrent GiST page split was incomplete");
321
322         *splitinfo = NIL;
323
324         /*
325          * if isupdate, remove old key: This node's key has been modified, either
326          * because a child split occurred or because we needed to adjust our key
327          * for an insert in a child node. Therefore, remove the old version of
328          * this node's key.
329          *
330          * for WAL replay, in the non-split case we handle this by setting up a
331          * one-element todelete array; in the split case, it's handled implicitly
332          * because the tuple vector passed to gistSplit won't include this tuple.
333          */
334         is_split = gistnospace(page, itup, ntup, oldoffnum, state->freespace);
335         if (is_split)
336         {
337                 /* no space for insertion */
338                 IndexTuple *itvec;
339                 int                     tlen;
340                 SplitedPageLayout *dist = NULL,
341                                    *ptr;
342                 BlockNumber oldrlink = InvalidBlockNumber;
343                 GistNSN         oldnsn = {0, 0};
344                 SplitedPageLayout rootpg;
345                 BlockNumber blkno = BufferGetBlockNumber(buffer);
346                 bool            is_rootsplit;
347
348                 is_rootsplit = (blkno == GIST_ROOT_BLKNO);
349
350                 /*
351                  * Form index tuples vector to split. If we're replacing an old tuple,
352                  * remove the old version from the vector.
353                  */
354                 itvec = gistextractpage(page, &tlen);
355                 if (OffsetNumberIsValid(oldoffnum))
356                 {
357                         /* on inner page we should remove old tuple */
358                         int                     pos = oldoffnum - FirstOffsetNumber;
359
360                         tlen--;
361                         if (pos != tlen)
362                                 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
363                 }
364                 itvec = gistjoinvector(itvec, &tlen, itup, ntup);
365                 dist = gistSplit(state->r, page, itvec, tlen, giststate);
366
367                 /*
368                  * Set up pages to work with. Allocate new buffers for all but the
369                  * leftmost page. The original page becomes the new leftmost page, and
370                  * is just replaced with the new contents.
371                  *
372                  * For a root-split, allocate new buffers for all child pages, the
373                  * original page is overwritten with new root page containing
374                  * downlinks to the new child pages.
375                  */
376                 ptr = dist;
377                 if (!is_rootsplit)
378                 {
379                         /* save old rightlink and NSN */
380                         oldrlink = GistPageGetOpaque(page)->rightlink;
381                         oldnsn = GistPageGetOpaque(page)->nsn;
382
383                         dist->buffer = buffer;
384                         dist->block.blkno = BufferGetBlockNumber(buffer);
385                         dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
386
387                         /* clean all flags except F_LEAF */
388                         GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
389
390                         ptr = ptr->next;
391                 }
392                 for (; ptr; ptr = ptr->next)
393                 {
394                         /* Allocate new page */
395                         ptr->buffer = gistNewBuffer(state->r);
396                         GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
397                         ptr->page = BufferGetPage(ptr->buffer);
398                         ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
399                 }
400
401                 /*
402                  * Now that we know whick blocks the new pages go to, set up downlink
403                  * tuples to point to them.
404                  */
405                 for (ptr = dist; ptr; ptr = ptr->next)
406                 {
407                         ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
408                         GistTupleSetValid(ptr->itup);
409                 }
410
411                 /*
412                  * If this is a root split, we construct the new root page with the
413                  * downlinks here directly, instead of requiring the caller to insert
414                  * them. Add the new root page to the list along with the child pages.
415                  */
416                 if (is_rootsplit)
417                 {
418                         IndexTuple *downlinks;
419                         int                     ndownlinks = 0;
420                         int                     i;
421
422                         rootpg.buffer = buffer;
423                         rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
424                         GistPageGetOpaque(rootpg.page)->flags = 0;
425
426                         /* Prepare a vector of all the downlinks */
427                         for (ptr = dist; ptr; ptr = ptr->next)
428                                 ndownlinks++;
429                         downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
430                         for (i = 0, ptr = dist; ptr; ptr = ptr->next)
431                                 downlinks[i++] = ptr->itup;
432
433                         rootpg.block.blkno = GIST_ROOT_BLKNO;
434                         rootpg.block.num = ndownlinks;
435                         rootpg.list = gistfillitupvec(downlinks, ndownlinks,
436                                                                                   &(rootpg.lenlist));
437                         rootpg.itup = NULL;
438
439                         rootpg.next = dist;
440                         dist = &rootpg;
441                 }
442                 else
443                 {
444                         /* Prepare split-info to be returned to caller */
445                         for (ptr = dist; ptr; ptr = ptr->next)
446                         {
447                                 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
448
449                                 si->buf = ptr->buffer;
450                                 si->downlink = ptr->itup;
451                                 *splitinfo = lappend(*splitinfo, si);
452                         }
453                 }
454
455                 /*
456                  * Fill all pages. All the pages are new, ie. freshly allocated empty
457                  * pages, or a temporary copy of the old page.
458                  */
459                 for (ptr = dist; ptr; ptr = ptr->next)
460                 {
461                         char       *data = (char *) (ptr->list);
462
463                         for (i = 0; i < ptr->block.num; i++)
464                         {
465                                 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
466                                         elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
467                                 data += IndexTupleSize((IndexTuple) data);
468                         }
469
470                         /* Set up rightlinks */
471                         if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
472                                 GistPageGetOpaque(ptr->page)->rightlink =
473                                         ptr->next->block.blkno;
474                         else
475                                 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
476
477                         if (ptr->next && !is_rootsplit)
478                                 GistMarkFollowRight(ptr->page);
479                         else
480                                 GistClearFollowRight(ptr->page);
481
482                         /*
483                          * Copy the NSN of the original page to all pages. The
484                          * F_FOLLOW_RIGHT flags ensure that scans will follow the
485                          * rightlinks until the downlinks are inserted.
486                          */
487                         GistPageGetOpaque(ptr->page)->nsn = oldnsn;
488                 }
489
490                 START_CRIT_SECTION();
491
492                 /*
493                  * Must mark buffers dirty before XLogInsert, even though we'll still
494                  * be changing their opaque fields below.
495                  */
496                 for (ptr = dist; ptr; ptr = ptr->next)
497                         MarkBufferDirty(ptr->buffer);
498                 if (BufferIsValid(leftchildbuf))
499                         MarkBufferDirty(leftchildbuf);
500
501                 /*
502                  * The first page in the chain was a temporary working copy meant to
503                  * replace the old page. Copy it over the old page.
504                  */
505                 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
506                 dist->page = BufferGetPage(dist->buffer);
507
508                 /* Write the WAL record */
509                 if (RelationNeedsWAL(state->r))
510                         recptr = gistXLogSplit(state->r->rd_node, blkno, is_leaf,
511                                                                    dist, oldrlink, oldnsn, leftchildbuf);
512                 else
513                         recptr = GetXLogRecPtrForTemp();
514
515                 for (ptr = dist; ptr; ptr = ptr->next)
516                 {
517                         PageSetLSN(ptr->page, recptr);
518                         PageSetTLI(ptr->page, ThisTimeLineID);
519                 }
520
521                 /*
522                  * Return the new child buffers to the caller.
523                  *
524                  * If this was a root split, we've already inserted the downlink
525                  * pointers, in the form of a new root page. Therefore we can release
526                  * all the new buffers, and keep just the root page locked.
527                  */
528                 if (is_rootsplit)
529                 {
530                         for (ptr = dist->next; ptr; ptr = ptr->next)
531                                 UnlockReleaseBuffer(ptr->buffer);
532                 }
533         }
534         else
535         {
536                 /*
537                  * Enough space. We also get here if ntuples==0.
538                  */
539                 START_CRIT_SECTION();
540
541                 if (OffsetNumberIsValid(oldoffnum))
542                         PageIndexTupleDelete(page, oldoffnum);
543                 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
544
545                 MarkBufferDirty(buffer);
546
547                 if (BufferIsValid(leftchildbuf))
548                         MarkBufferDirty(leftchildbuf);
549
550                 if (RelationNeedsWAL(state->r))
551                 {
552                         OffsetNumber ndeloffs = 0,
553                                                 deloffs[1];
554
555                         if (OffsetNumberIsValid(oldoffnum))
556                         {
557                                 deloffs[0] = oldoffnum;
558                                 ndeloffs = 1;
559                         }
560
561                         recptr = gistXLogUpdate(state->r->rd_node, buffer,
562                                                                         deloffs, ndeloffs, itup, ntup,
563                                                                         leftchildbuf);
564
565                         PageSetLSN(page, recptr);
566                         PageSetTLI(page, ThisTimeLineID);
567                 }
568                 else
569                 {
570                         recptr = GetXLogRecPtrForTemp();
571                         PageSetLSN(page, recptr);
572                 }
573
574                 *splitinfo = NIL;
575         }
576
577         /*
578          * If we inserted the downlink for a child page, set NSN and clear
579          * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
580          * follow the rightlink if and only if they looked at the parent page
581          * before we inserted the downlink.
582          *
583          * Note that we do this *after* writing the WAL record. That means that
584          * the possible full page image in the WAL record does not include these
585          * changes, and they must be replayed even if the page is restored from
586          * the full page image. There's a chicken-and-egg problem: if we updated
587          * the child pages first, we wouldn't know the recptr of the WAL record
588          * we're about to write.
589          */
590         if (BufferIsValid(leftchildbuf))
591         {
592                 Page            leftpg = BufferGetPage(leftchildbuf);
593
594                 GistPageGetOpaque(leftpg)->nsn = recptr;
595                 GistClearFollowRight(leftpg);
596
597                 PageSetLSN(leftpg, recptr);
598                 PageSetTLI(leftpg, ThisTimeLineID);
599         }
600
601         END_CRIT_SECTION();
602
603         return is_split;
604 }
605
606 /*
607  * Workhouse routine for doing insertion into a GiST index. Note that
608  * this routine assumes it is invoked in a short-lived memory context,
609  * so it does not bother releasing palloc'd allocations.
610  */
611 static void
612 gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
613 {
614         ItemId          iid;
615         IndexTuple      idxtuple;
616         GISTInsertStack firststack;
617         GISTInsertStack *stack;
618         GISTInsertState state;
619         bool            xlocked = false;
620
621         memset(&state, 0, sizeof(GISTInsertState));
622         state.freespace = freespace;
623         state.r = r;
624
625         /* Start from the root */
626         firststack.blkno = GIST_ROOT_BLKNO;
627         firststack.lsn.xrecoff = 0;
628         firststack.parent = NULL;
629         state.stack = stack = &firststack;
630
631         /*
632          * Walk down along the path of smallest penalty, updating the parent
633          * pointers with the key we're inserting as we go. If we crash in the
634          * middle, the tree is consistent, although the possible parent updates
635          * were a waste.
636          */
637         for (;;)
638         {
639                 if (XLogRecPtrIsInvalid(stack->lsn))
640                         stack->buffer = ReadBuffer(state.r, stack->blkno);
641
642                 /*
643                  * Be optimistic and grab shared lock first. Swap it for an exclusive
644                  * lock later if we need to update the page.
645                  */
646                 if (!xlocked)
647                 {
648                         LockBuffer(stack->buffer, GIST_SHARE);
649                         gistcheckpage(state.r, stack->buffer);
650                 }
651
652                 stack->page = (Page) BufferGetPage(stack->buffer);
653                 stack->lsn = PageGetLSN(stack->page);
654                 Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
655
656                 /*
657                  * If this page was split but the downlink was never inserted to the
658                  * parent because the inserting backend crashed before doing that, fix
659                  * that now.
660                  */
661                 if (GistFollowRight(stack->page))
662                 {
663                         if (!xlocked)
664                         {
665                                 LockBuffer(stack->buffer, GIST_UNLOCK);
666                                 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
667                                 xlocked = true;
668                                 /* someone might've completed the split when we unlocked */
669                                 if (!GistFollowRight(stack->page))
670                                         continue;
671                         }
672                         gistfixsplit(&state, giststate);
673
674                         UnlockReleaseBuffer(stack->buffer);
675                         xlocked = false;
676                         state.stack = stack = stack->parent;
677                         continue;
678                 }
679
680                 if (stack->blkno != GIST_ROOT_BLKNO &&
681                         XLByteLT(stack->parent->lsn,
682                                          GistPageGetOpaque(stack->page)->nsn))
683                 {
684                         /*
685                          * Concurrent split detected. There's no guarantee that the
686                          * downlink for this page is consistent with the tuple we're
687                          * inserting anymore, so go back to parent and rechoose the best
688                          * child.
689                          */
690                         UnlockReleaseBuffer(stack->buffer);
691                         xlocked = false;
692                         state.stack = stack = stack->parent;
693                         continue;
694                 }
695
696                 if (!GistPageIsLeaf(stack->page))
697                 {
698                         /*
699                          * This is an internal page so continue to walk down the tree.
700                          * Find the child node that has the minimum insertion penalty.
701                          */
702                         BlockNumber childblkno;
703                         IndexTuple      newtup;
704                         GISTInsertStack *item;
705
706                         stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate);
707                         iid = PageGetItemId(stack->page, stack->childoffnum);
708                         idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
709                         childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
710
711                         /*
712                          * Check that it's not a leftover invalid tuple from pre-9.1
713                          */
714                         if (GistTupleIsInvalid(idxtuple))
715                                 ereport(ERROR,
716                                                 (errmsg("index \"%s\" contains an inner tuple marked as invalid",
717                                                                 RelationGetRelationName(r)),
718                                                  errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
719                                                  errhint("Please REINDEX it.")));
720
721                         /*
722                          * Check that the key representing the target child node is
723                          * consistent with the key we're inserting. Update it if it's not.
724                          */
725                         newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
726                         if (newtup)
727                         {
728                                 /*
729                                  * Swap shared lock for an exclusive one. Beware, the page may
730                                  * change while we unlock/lock the page...
731                                  */
732                                 if (!xlocked)
733                                 {
734                                         LockBuffer(stack->buffer, GIST_UNLOCK);
735                                         LockBuffer(stack->buffer, GIST_EXCLUSIVE);
736                                         xlocked = true;
737                                         stack->page = (Page) BufferGetPage(stack->buffer);
738
739                                         if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn))
740                                         {
741                                                 /* the page was changed while we unlocked it, retry */
742                                                 continue;
743                                         }
744                                 }
745
746                                 /*
747                                  * Update the tuple.
748                                  *
749                                  * We still hold the lock after gistinserttuples(), but it
750                                  * might have to split the page to make the updated tuple fit.
751                                  * In that case the updated tuple might migrate to the other
752                                  * half of the split, so we have to go back to the parent and
753                                  * descend back to the half that's a better fit for the new
754                                  * tuple.
755                                  */
756                                 if (gistinserttuples(&state, stack, giststate, &newtup, 1,
757                                                                          stack->childoffnum, InvalidBuffer))
758                                 {
759                                         /*
760                                          * If this was a root split, the root page continues to be
761                                          * the parent and the updated tuple went to one of the
762                                          * child pages, so we just need to retry from the root
763                                          * page.
764                                          */
765                                         if (stack->blkno != GIST_ROOT_BLKNO)
766                                         {
767                                                 UnlockReleaseBuffer(stack->buffer);
768                                                 xlocked = false;
769                                                 state.stack = stack = stack->parent;
770                                         }
771                                         continue;
772                                 }
773                         }
774                         LockBuffer(stack->buffer, GIST_UNLOCK);
775                         xlocked = false;
776
777                         /* descend to the chosen child */
778                         item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
779                         item->blkno = childblkno;
780                         item->parent = stack;
781                         state.stack = stack = item;
782                 }
783                 else
784                 {
785                         /*
786                          * Leaf page. Insert the new key. We've already updated all the
787                          * parents on the way down, but we might have to split the page if
788                          * it doesn't fit. gistinserthere() will take care of that.
789                          */
790
791                         /*
792                          * Swap shared lock for an exclusive one. Be careful, the page may
793                          * change while we unlock/lock the page...
794                          */
795                         if (!xlocked)
796                         {
797                                 LockBuffer(stack->buffer, GIST_UNLOCK);
798                                 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
799                                 xlocked = true;
800                                 stack->page = (Page) BufferGetPage(stack->buffer);
801                                 stack->lsn = PageGetLSN(stack->page);
802
803                                 if (stack->blkno == GIST_ROOT_BLKNO)
804                                 {
805                                         /*
806                                          * the only page that can become inner instead of leaf is
807                                          * the root page, so for root we should recheck it
808                                          */
809                                         if (!GistPageIsLeaf(stack->page))
810                                         {
811                                                 /*
812                                                  * very rare situation: during unlock/lock index with
813                                                  * number of pages = 1 was increased
814                                                  */
815                                                 LockBuffer(stack->buffer, GIST_UNLOCK);
816                                                 xlocked = false;
817                                                 continue;
818                                         }
819
820                                         /*
821                                          * we don't need to check root split, because checking
822                                          * leaf/inner is enough to recognize split for root
823                                          */
824                                 }
825                                 else if (GistFollowRight(stack->page) ||
826                                                  XLByteLT(stack->parent->lsn,
827                                                                   GistPageGetOpaque(stack->page)->nsn))
828                                 {
829                                         /*
830                                          * The page was split while we momentarily unlocked the
831                                          * page. Go back to parent.
832                                          */
833                                         UnlockReleaseBuffer(stack->buffer);
834                                         xlocked = false;
835                                         state.stack = stack = stack->parent;
836                                         continue;
837                                 }
838                         }
839
840                         /* now state.stack->(page, buffer and blkno) points to leaf page */
841
842                         gistinserttuples(&state, stack, giststate, &itup, 1,
843                                                          InvalidOffsetNumber, InvalidBuffer);
844                         LockBuffer(stack->buffer, GIST_UNLOCK);
845
846                         /* Release any pins we might still hold before exiting */
847                         for (; stack; stack = stack->parent)
848                                 ReleaseBuffer(stack->buffer);
849                         break;
850                 }
851         }
852 }
853
854 /*
855  * Traverse the tree to find path from root page to specified "child" block.
856  *
857  * returns from the beginning of closest parent;
858  *
859  * To prevent deadlocks, this should lock only one page at a time.
860  */
861 GISTInsertStack *
862 gistFindPath(Relation r, BlockNumber child)
863 {
864         Page            page;
865         Buffer          buffer;
866         OffsetNumber i,
867                                 maxoff;
868         ItemId          iid;
869         IndexTuple      idxtuple;
870         GISTInsertStack *top,
871                            *tail,
872                            *ptr;
873         BlockNumber blkno;
874
875         top = tail = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
876         top->blkno = GIST_ROOT_BLKNO;
877
878         while (top && top->blkno != child)
879         {
880                 buffer = ReadBuffer(r, top->blkno);
881                 LockBuffer(buffer, GIST_SHARE);
882                 gistcheckpage(r, buffer);
883                 page = (Page) BufferGetPage(buffer);
884
885                 if (GistPageIsLeaf(page))
886                 {
887                         /* we can safety go away, follows only leaf pages */
888                         UnlockReleaseBuffer(buffer);
889                         return NULL;
890                 }
891
892                 top->lsn = PageGetLSN(page);
893
894                 /*
895                  * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
896                  * downlink. This should not normally happen..
897                  */
898                 if (GistFollowRight(page))
899                         elog(ERROR, "concurrent GiST page split was incomplete");
900
901                 if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
902                         GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
903                 {
904                         /* page splited while we thinking of... */
905                         ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
906                         ptr->blkno = GistPageGetOpaque(page)->rightlink;
907                         ptr->childoffnum = InvalidOffsetNumber;
908                         ptr->parent = top;
909                         ptr->next = NULL;
910                         tail->next = ptr;
911                         tail = ptr;
912                 }
913
914                 maxoff = PageGetMaxOffsetNumber(page);
915
916                 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
917                 {
918                         iid = PageGetItemId(page, i);
919                         idxtuple = (IndexTuple) PageGetItem(page, iid);
920                         blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
921                         if (blkno == child)
922                         {
923                                 OffsetNumber poff = InvalidOffsetNumber;
924
925                                 /* make childs links */
926                                 ptr = top;
927                                 while (ptr->parent)
928                                 {
929                                         /* move childoffnum.. */
930                                         if (ptr == top)
931                                         {
932                                                 /* first iteration */
933                                                 poff = ptr->parent->childoffnum;
934                                                 ptr->parent->childoffnum = ptr->childoffnum;
935                                         }
936                                         else
937                                         {
938                                                 OffsetNumber tmp = ptr->parent->childoffnum;
939
940                                                 ptr->parent->childoffnum = poff;
941                                                 poff = tmp;
942                                         }
943                                         ptr = ptr->parent;
944                                 }
945                                 top->childoffnum = i;
946                                 UnlockReleaseBuffer(buffer);
947                                 return top;
948                         }
949                         else
950                         {
951                                 /* Install next inner page to the end of stack */
952                                 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
953                                 ptr->blkno = blkno;
954                                 ptr->childoffnum = i;   /* set offsetnumber of child to child
955                                                                                  * !!! */
956                                 ptr->parent = top;
957                                 ptr->next = NULL;
958                                 tail->next = ptr;
959                                 tail = ptr;
960                         }
961                 }
962
963                 UnlockReleaseBuffer(buffer);
964                 top = top->next;
965         }
966
967         return NULL;
968 }
969
970 /*
971  * Updates the stack so that child->parent is the correct parent of the
972  * child. child->parent must be exclusively locked on entry, and will
973  * remain so at exit, but it might not be the same page anymore.
974  */
975 static void
976 gistFindCorrectParent(Relation r, GISTInsertStack *child)
977 {
978         GISTInsertStack *parent = child->parent;
979
980         gistcheckpage(r, parent->buffer);
981         parent->page = (Page) BufferGetPage(parent->buffer);
982
983         /* here we don't need to distinguish between split and page update */
984         if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page)))
985         {
986                 /* parent is changed, look child in right links until found */
987                 OffsetNumber i,
988                                         maxoff;
989                 ItemId          iid;
990                 IndexTuple      idxtuple;
991                 GISTInsertStack *ptr;
992
993                 while (true)
994                 {
995                         maxoff = PageGetMaxOffsetNumber(parent->page);
996                         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
997                         {
998                                 iid = PageGetItemId(parent->page, i);
999                                 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
1000                                 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1001                                 {
1002                                         /* yes!!, found */
1003                                         parent->childoffnum = i;
1004                                         return;
1005                                 }
1006                         }
1007
1008                         parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1009                         UnlockReleaseBuffer(parent->buffer);
1010                         if (parent->blkno == InvalidBlockNumber)
1011
1012                                 /*
1013                                  * end of chain and still didn't found parent, It's very-very
1014                                  * rare situation when root splited
1015                                  */
1016                                 break;
1017                         parent->buffer = ReadBuffer(r, parent->blkno);
1018                         LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1019                         gistcheckpage(r, parent->buffer);
1020                         parent->page = (Page) BufferGetPage(parent->buffer);
1021                 }
1022
1023                 /*
1024                  * awful!!, we need search tree to find parent ... , but before we
1025                  * should release all old parent
1026                  */
1027
1028                 ptr = child->parent->parent;    /* child->parent already released
1029                                                                                  * above */
1030                 while (ptr)
1031                 {
1032                         ReleaseBuffer(ptr->buffer);
1033                         ptr = ptr->parent;
1034                 }
1035
1036                 /* ok, find new path */
1037                 ptr = parent = gistFindPath(r, child->blkno);
1038                 Assert(ptr != NULL);
1039
1040                 /* read all buffers as expected by caller */
1041                 /* note we don't lock them or gistcheckpage them here! */
1042                 while (ptr)
1043                 {
1044                         ptr->buffer = ReadBuffer(r, ptr->blkno);
1045                         ptr->page = (Page) BufferGetPage(ptr->buffer);
1046                         ptr = ptr->parent;
1047                 }
1048
1049                 /* install new chain of parents to stack */
1050                 child->parent = parent;
1051
1052                 /* make recursive call to normal processing */
1053                 LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1054                 gistFindCorrectParent(r, child);
1055         }
1056
1057         return;
1058 }
1059
1060 /*
1061  * Form a downlink pointer for the page in 'buf'.
1062  */
1063 static IndexTuple
1064 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1065                                  GISTInsertStack *stack)
1066 {
1067         Page            page = BufferGetPage(buf);
1068         OffsetNumber maxoff;
1069         OffsetNumber offset;
1070         IndexTuple      downlink = NULL;
1071
1072         maxoff = PageGetMaxOffsetNumber(page);
1073         for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1074         {
1075                 IndexTuple      ituple = (IndexTuple)
1076                 PageGetItem(page, PageGetItemId(page, offset));
1077
1078                 if (downlink == NULL)
1079                         downlink = CopyIndexTuple(ituple);
1080                 else
1081                 {
1082                         IndexTuple      newdownlink;
1083
1084                         newdownlink = gistgetadjusted(rel, downlink, ituple,
1085                                                                                   giststate);
1086                         if (newdownlink)
1087                                 downlink = newdownlink;
1088                 }
1089         }
1090
1091         /*
1092          * If the page is completely empty, we can't form a meaningful downlink
1093          * for it. But we have to insert a downlink for the page. Any key will do,
1094          * as long as its consistent with the downlink of parent page, so that we
1095          * can legally insert it to the parent. A minimal one that matches as few
1096          * scans as possible would be best, to keep scans from doing useless work,
1097          * but we don't know how to construct that. So we just use the downlink of
1098          * the original page that was split - that's as far from optimal as it can
1099          * get but will do..
1100          */
1101         if (!downlink)
1102         {
1103                 ItemId          iid;
1104
1105                 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1106                 gistFindCorrectParent(rel, stack);
1107                 iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum);
1108                 downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1109                 downlink = CopyIndexTuple(downlink);
1110                 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1111         }
1112
1113         ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1114         GistTupleSetValid(downlink);
1115
1116         return downlink;
1117 }
1118
1119
1120 /*
1121  * Complete the incomplete split of state->stack->page.
1122  */
1123 static void
1124 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1125 {
1126         GISTInsertStack *stack = state->stack;
1127         Buffer          buf;
1128         Page            page;
1129         List       *splitinfo = NIL;
1130
1131         elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1132                  RelationGetRelationName(state->r), stack->blkno);
1133
1134         Assert(GistFollowRight(stack->page));
1135         Assert(OffsetNumberIsValid(stack->parent->childoffnum));
1136
1137         buf = stack->buffer;
1138
1139         /*
1140          * Read the chain of split pages, following the rightlinks. Construct a
1141          * downlink tuple for each page.
1142          */
1143         for (;;)
1144         {
1145                 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1146                 IndexTuple      downlink;
1147
1148                 page = BufferGetPage(buf);
1149
1150                 /* Form the new downlink tuples to insert to parent */
1151                 downlink = gistformdownlink(state->r, buf, giststate, stack);
1152
1153                 si->buf = buf;
1154                 si->downlink = downlink;
1155
1156                 splitinfo = lappend(splitinfo, si);
1157
1158                 if (GistFollowRight(page))
1159                 {
1160                         /* lock next page */
1161                         buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1162                         LockBuffer(buf, GIST_EXCLUSIVE);
1163                 }
1164                 else
1165                         break;
1166         }
1167
1168         /* Insert the downlinks */
1169         gistfinishsplit(state, stack, giststate, splitinfo);
1170 }
1171
1172 /*
1173  * Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples
1174  * replace an old tuple at oldoffnum. The caller must hold an exclusive lock
1175  * on the page.
1176  *
1177  * If leftchild is valid, we're inserting/updating the downlink for the
1178  * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and
1179  * update NSN on leftchild, atomically with the insertion of the downlink.
1180  *
1181  * Returns 'true' if the page had to be split. On return, we will continue
1182  * to hold an exclusive lock on state->stack->buffer, but if we had to split
1183  * the page, it might not contain the tuple we just inserted/updated.
1184  */
1185 static bool
1186 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1187                                  GISTSTATE *giststate,
1188                                  IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1189                                  Buffer leftchild)
1190 {
1191         List       *splitinfo;
1192         bool            is_split;
1193
1194         is_split = gistplacetopage(state, giststate, stack->buffer,
1195                                                            tuples, ntup, oldoffnum,
1196                                                            leftchild,
1197                                                            &splitinfo);
1198         if (splitinfo)
1199                 gistfinishsplit(state, stack, giststate, splitinfo);
1200
1201         return is_split;
1202 }
1203
1204 /*
1205  * Finish an incomplete split by inserting/updating the downlinks in
1206  * parent page. 'splitinfo' contains all the child pages, exclusively-locked,
1207  * involved in the split, from left-to-right.
1208  */
1209 static void
1210 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1211                                 GISTSTATE *giststate, List *splitinfo)
1212 {
1213         ListCell   *lc;
1214         List       *reversed;
1215         GISTPageSplitInfo *right;
1216         GISTPageSplitInfo *left;
1217         IndexTuple      tuples[2];
1218
1219         /* A split always contains at least two halves */
1220         Assert(list_length(splitinfo) >= 2);
1221
1222         /*
1223          * We need to insert downlinks for each new page, and update the downlink
1224          * for the original (leftmost) page in the split. Begin at the rightmost
1225          * page, inserting one downlink at a time until there's only two pages
1226          * left. Finally insert the downlink for the last new page and update the
1227          * downlink for the original page as one operation.
1228          */
1229
1230         /* for convenience, create a copy of the list in reverse order */
1231         reversed = NIL;
1232         foreach(lc, splitinfo)
1233         {
1234                 reversed = lcons(lfirst(lc), reversed);
1235         }
1236
1237         LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1238         gistFindCorrectParent(state->r, stack);
1239
1240         while (list_length(reversed) > 2)
1241         {
1242                 right = (GISTPageSplitInfo *) linitial(reversed);
1243                 left = (GISTPageSplitInfo *) lsecond(reversed);
1244
1245                 if (gistinserttuples(state, stack->parent, giststate,
1246                                                          &right->downlink, 1,
1247                                                          InvalidOffsetNumber,
1248                                                          left->buf))
1249                 {
1250                         /*
1251                          * If the parent page was split, need to relocate the original
1252                          * parent pointer.
1253                          */
1254                         gistFindCorrectParent(state->r, stack);
1255                 }
1256                 UnlockReleaseBuffer(right->buf);
1257                 reversed = list_delete_first(reversed);
1258         }
1259
1260         right = (GISTPageSplitInfo *) linitial(reversed);
1261         left = (GISTPageSplitInfo *) lsecond(reversed);
1262
1263         /*
1264          * Finally insert downlink for the remaining right page and update the
1265          * downlink for the original page to not contain the tuples that were
1266          * moved to the new pages.
1267          */
1268         tuples[0] = left->downlink;
1269         tuples[1] = right->downlink;
1270         gistinserttuples(state, stack->parent, giststate,
1271                                          tuples, 2,
1272                                          stack->parent->childoffnum,
1273                                          left->buf);
1274         LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1275         UnlockReleaseBuffer(right->buf);
1276         Assert(left->buf == stack->buffer);
1277 }
1278
1279 /*
1280  * gistSplit -- split a page in the tree and fill struct
1281  * used for XLOG and real writes buffers. Function is recursive, ie
1282  * it will split page until keys will fit in every page.
1283  */
1284 SplitedPageLayout *
1285 gistSplit(Relation r,
1286                   Page page,
1287                   IndexTuple *itup,             /* contains compressed entry */
1288                   int len,
1289                   GISTSTATE *giststate)
1290 {
1291         IndexTuple *lvectup,
1292                            *rvectup;
1293         GistSplitVector v;
1294         GistEntryVector *entryvec;
1295         int                     i;
1296         SplitedPageLayout *res = NULL;
1297
1298         /* generate the item array */
1299         entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
1300         entryvec->n = len + 1;
1301
1302         memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1303         memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1304         gistSplitByKey(r, page, itup, len, giststate,
1305                                    &v, entryvec, 0);
1306
1307         /* form left and right vector */
1308         lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1309         rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1310
1311         for (i = 0; i < v.splitVector.spl_nleft; i++)
1312                 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1313
1314         for (i = 0; i < v.splitVector.spl_nright; i++)
1315                 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1316
1317         /* finalize splitting (may need another split) */
1318         if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1319         {
1320                 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1321         }
1322         else
1323         {
1324                 ROTATEDIST(res);
1325                 res->block.num = v.splitVector.spl_nright;
1326                 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1327                 res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1328         }
1329
1330         if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1331         {
1332                 SplitedPageLayout *resptr,
1333                                    *subres;
1334
1335                 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1336
1337                 /* install on list's tail */
1338                 while (resptr->next)
1339                         resptr = resptr->next;
1340
1341                 resptr->next = res;
1342                 res = subres;
1343         }
1344         else
1345         {
1346                 ROTATEDIST(res);
1347                 res->block.num = v.splitVector.spl_nleft;
1348                 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1349                 res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1350         }
1351
1352         return res;
1353 }
1354
1355 /*
1356  * Fill a GISTSTATE with information about the index
1357  */
1358 void
1359 initGISTstate(GISTSTATE *giststate, Relation index)
1360 {
1361         int                     i;
1362
1363         if (index->rd_att->natts > INDEX_MAX_KEYS)
1364                 elog(ERROR, "numberOfAttributes %d > %d",
1365                          index->rd_att->natts, INDEX_MAX_KEYS);
1366
1367         giststate->tupdesc = index->rd_att;
1368
1369         for (i = 0; i < index->rd_att->natts; i++)
1370         {
1371                 fmgr_info_copy(&(giststate->consistentFn[i]),
1372                                            index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1373                                            CurrentMemoryContext);
1374                 fmgr_info_copy(&(giststate->unionFn[i]),
1375                                            index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1376                                            CurrentMemoryContext);
1377                 fmgr_info_copy(&(giststate->compressFn[i]),
1378                                            index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1379                                            CurrentMemoryContext);
1380                 fmgr_info_copy(&(giststate->decompressFn[i]),
1381                                            index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1382                                            CurrentMemoryContext);
1383                 fmgr_info_copy(&(giststate->penaltyFn[i]),
1384                                            index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1385                                            CurrentMemoryContext);
1386                 fmgr_info_copy(&(giststate->picksplitFn[i]),
1387                                            index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1388                                            CurrentMemoryContext);
1389                 fmgr_info_copy(&(giststate->equalFn[i]),
1390                                            index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1391                                            CurrentMemoryContext);
1392                 /* opclasses are not required to provide a Distance method */
1393                 if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1394                         fmgr_info_copy(&(giststate->distanceFn[i]),
1395                                                  index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1396                                                    CurrentMemoryContext);
1397                 else
1398                         giststate->distanceFn[i].fn_oid = InvalidOid;
1399
1400                 /*
1401                  * If the index column has a specified collation, we should honor that
1402                  * while doing comparisons.  However, we may have a collatable storage
1403                  * type for a noncollatable indexed data type.  If there's no index
1404                  * collation then specify default collation in case the support
1405                  * functions need collation.  This is harmless if the support
1406                  * functions don't care about collation, so we just do it
1407                  * unconditionally.  (We could alternatively call get_typcollation,
1408                  * but that seems like expensive overkill --- there aren't going to be
1409                  * any cases where a GiST storage type has a nondefault collation.)
1410                  */
1411                 if (OidIsValid(index->rd_indcollation[i]))
1412                         giststate->supportCollation[i] = index->rd_indcollation[i];
1413                 else
1414                         giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1415         }
1416 }
1417
1418 void
1419 freeGISTstate(GISTSTATE *giststate)
1420 {
1421         /* no work */
1422 }