1 /*-------------------------------------------------------------------------
4 * interface routines for the postgres GiST index access method.
7 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/gist/gist.c
13 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/gist_private.h"
19 #include "catalog/index.h"
20 #include "catalog/pg_collation.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "storage/indexfsm.h"
24 #include "utils/memutils.h"
25 #include "utils/rel.h"
27 /* Working state for gistbuild and its callback */
36 /* A List of these is used represent a split-in-progress. */
39 Buffer buf; /* the split page "half" */
40 IndexTuple downlink; /* downlink for this half. */
43 /* non-export function prototypes */
44 static void gistbuildCallback(Relation index,
50 static void gistdoinsert(Relation r,
53 GISTSTATE *GISTstate);
54 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
55 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
57 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
59 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
60 GISTSTATE *giststate, List *splitinfo);
63 #define ROTATEDIST(d) do { \
64 SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
65 memset(tmp,0,sizeof(SplitedPageLayout)); \
66 tmp->block.blkno = InvalidBlockNumber; \
67 tmp->buffer = InvalidBuffer; \
74 * Create and return a temporary memory context for use by GiST. We
75 * _always_ invoke user-provided methods in a temporary memory
76 * context, so that memory leaks in those functions cannot cause
77 * problems. Also, we use some additional temporary contexts in the
78 * GiST code itself, to avoid the need to do some awkward manual
82 createTempGistContext(void)
84 return AllocSetContextCreate(CurrentMemoryContext,
85 "GiST temporary context",
86 ALLOCSET_DEFAULT_MINSIZE,
87 ALLOCSET_DEFAULT_INITSIZE,
88 ALLOCSET_DEFAULT_MAXSIZE);
92 * Routine to build an index. Basically calls insert over and over.
94 * XXX: it would be nice to implement some sort of bulk-loading
95 * algorithm, but it is not clear how to do that.
98 gistbuild(PG_FUNCTION_ARGS)
100 Relation heap = (Relation) PG_GETARG_POINTER(0);
101 Relation index = (Relation) PG_GETARG_POINTER(1);
102 IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
103 IndexBuildResult *result;
105 GISTBuildState buildstate;
110 * We expect to be called exactly once for any index relation. If that's
111 * not the case, big trouble's what we have.
113 if (RelationGetNumberOfBlocks(index) != 0)
114 elog(ERROR, "index \"%s\" already contains data",
115 RelationGetRelationName(index));
117 /* no locking is needed */
118 initGISTstate(&buildstate.giststate, index);
120 /* initialize the root page */
121 buffer = gistNewBuffer(index);
122 Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
123 page = BufferGetPage(buffer);
125 START_CRIT_SECTION();
127 GISTInitBuffer(buffer, F_LEAF);
129 MarkBufferDirty(buffer);
131 if (RelationNeedsWAL(index))
136 rdata.data = (char *) &(index->rd_node);
137 rdata.len = sizeof(RelFileNode);
138 rdata.buffer = InvalidBuffer;
141 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
142 PageSetLSN(page, recptr);
143 PageSetTLI(page, ThisTimeLineID);
146 PageSetLSN(page, GetXLogRecPtrForTemp());
148 UnlockReleaseBuffer(buffer);
152 /* build the index */
153 buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
154 buildstate.indtuples = 0;
157 * create a temporary memory context that is reset once for each tuple
158 * inserted into the index
160 buildstate.tmpCtx = createTempGistContext();
162 /* do the heap scan */
163 reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
164 gistbuildCallback, (void *) &buildstate);
166 /* okay, all heap tuples are indexed */
167 MemoryContextDelete(buildstate.tmpCtx);
169 freeGISTstate(&buildstate.giststate);
174 result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
176 result->heap_tuples = reltuples;
177 result->index_tuples = buildstate.indtuples;
179 PG_RETURN_POINTER(result);
183 * Per-tuple callback from IndexBuildHeapScan
186 gistbuildCallback(Relation index,
193 GISTBuildState *buildstate = (GISTBuildState *) state;
195 MemoryContext oldCtx;
197 oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
199 /* form an index tuple and point it at the heap tuple */
200 itup = gistFormTuple(&buildstate->giststate, index,
201 values, isnull, true /* size is currently bogus */ );
202 itup->t_tid = htup->t_self;
205 * Since we already have the index relation locked, we call gistdoinsert
206 * directly. Normal access method calls dispatch through gistinsert,
207 * which locks the relation for write. This is the right thing to do if
208 * you're inserting single tups, but not when you're initializing the
209 * whole index at once.
211 * In this path we respect the fillfactor setting, whereas insertions
212 * after initial build do not.
214 gistdoinsert(index, itup,
215 RelationGetTargetPageFreeSpace(index, GIST_DEFAULT_FILLFACTOR),
216 &buildstate->giststate);
218 buildstate->indtuples += 1;
219 MemoryContextSwitchTo(oldCtx);
220 MemoryContextReset(buildstate->tmpCtx);
224 * gistbuildempty() -- build an empty gist index in the initialization fork
227 gistbuildempty(PG_FUNCTION_ARGS)
230 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
231 errmsg("unlogged GiST indexes are not supported")));
237 * gistinsert -- wrapper for GiST tuple insertion.
239 * This is the public interface routine for tuple insertion in GiSTs.
240 * It doesn't do any work; just locks the relation and passes the buck.
243 gistinsert(PG_FUNCTION_ARGS)
245 Relation r = (Relation) PG_GETARG_POINTER(0);
246 Datum *values = (Datum *) PG_GETARG_POINTER(1);
247 bool *isnull = (bool *) PG_GETARG_POINTER(2);
248 ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
251 Relation heapRel = (Relation) PG_GETARG_POINTER(4);
252 IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
256 MemoryContext oldCtx;
257 MemoryContext insertCtx;
259 insertCtx = createTempGistContext();
260 oldCtx = MemoryContextSwitchTo(insertCtx);
262 initGISTstate(&giststate, r);
264 itup = gistFormTuple(&giststate, r,
265 values, isnull, true /* size is currently bogus */ );
266 itup->t_tid = *ht_ctid;
268 gistdoinsert(r, itup, 0, &giststate);
271 freeGISTstate(&giststate);
272 MemoryContextSwitchTo(oldCtx);
273 MemoryContextDelete(insertCtx);
275 PG_RETURN_BOOL(false);
280 * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
281 * at that offset is atomically removed along with inserting the new tuples.
282 * This is used to replace a tuple with a new one.
284 * If 'leftchildbuf' is valid, we're inserting the downlink for the page
285 * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
286 * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
288 * If there is not enough room on the page, it is split. All the split
289 * pages are kept pinned and locked and returned in *splitinfo, the caller
290 * is responsible for inserting the downlinks for them. However, if
291 * 'buffer' is the root page and it needs to be split, gistplacetopage()
292 * performs the split as one atomic operation, and *splitinfo is set to NIL.
293 * In that case, we continue to hold the root page locked, and the child
294 * pages are released; note that new tuple(s) are *not* on the root page
295 * but in one of the new child pages.
298 gistplacetopage(GISTInsertState *state, GISTSTATE *giststate,
300 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
304 Page page = BufferGetPage(buffer);
305 bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
311 * Refuse to modify a page that's incompletely split. This should not
312 * happen because we finish any incomplete splits while we walk down the
313 * tree. However, it's remotely possible that another concurrent inserter
314 * splits a parent page, and errors out before completing the split. We
315 * will just throw an error in that case, and leave any split we had in
316 * progress unfinished too. The next insert that comes along will clean up
319 if (GistFollowRight(page))
320 elog(ERROR, "concurrent GiST page split was incomplete");
325 * if isupdate, remove old key: This node's key has been modified, either
326 * because a child split occurred or because we needed to adjust our key
327 * for an insert in a child node. Therefore, remove the old version of
330 * for WAL replay, in the non-split case we handle this by setting up a
331 * one-element todelete array; in the split case, it's handled implicitly
332 * because the tuple vector passed to gistSplit won't include this tuple.
334 is_split = gistnospace(page, itup, ntup, oldoffnum, state->freespace);
337 /* no space for insertion */
340 SplitedPageLayout *dist = NULL,
342 BlockNumber oldrlink = InvalidBlockNumber;
343 GistNSN oldnsn = {0, 0};
344 SplitedPageLayout rootpg;
345 BlockNumber blkno = BufferGetBlockNumber(buffer);
348 is_rootsplit = (blkno == GIST_ROOT_BLKNO);
351 * Form index tuples vector to split. If we're replacing an old tuple,
352 * remove the old version from the vector.
354 itvec = gistextractpage(page, &tlen);
355 if (OffsetNumberIsValid(oldoffnum))
357 /* on inner page we should remove old tuple */
358 int pos = oldoffnum - FirstOffsetNumber;
362 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
364 itvec = gistjoinvector(itvec, &tlen, itup, ntup);
365 dist = gistSplit(state->r, page, itvec, tlen, giststate);
368 * Set up pages to work with. Allocate new buffers for all but the
369 * leftmost page. The original page becomes the new leftmost page, and
370 * is just replaced with the new contents.
372 * For a root-split, allocate new buffers for all child pages, the
373 * original page is overwritten with new root page containing
374 * downlinks to the new child pages.
379 /* save old rightlink and NSN */
380 oldrlink = GistPageGetOpaque(page)->rightlink;
381 oldnsn = GistPageGetOpaque(page)->nsn;
383 dist->buffer = buffer;
384 dist->block.blkno = BufferGetBlockNumber(buffer);
385 dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
387 /* clean all flags except F_LEAF */
388 GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
392 for (; ptr; ptr = ptr->next)
394 /* Allocate new page */
395 ptr->buffer = gistNewBuffer(state->r);
396 GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
397 ptr->page = BufferGetPage(ptr->buffer);
398 ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
402 * Now that we know whick blocks the new pages go to, set up downlink
403 * tuples to point to them.
405 for (ptr = dist; ptr; ptr = ptr->next)
407 ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
408 GistTupleSetValid(ptr->itup);
412 * If this is a root split, we construct the new root page with the
413 * downlinks here directly, instead of requiring the caller to insert
414 * them. Add the new root page to the list along with the child pages.
418 IndexTuple *downlinks;
422 rootpg.buffer = buffer;
423 rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
424 GistPageGetOpaque(rootpg.page)->flags = 0;
426 /* Prepare a vector of all the downlinks */
427 for (ptr = dist; ptr; ptr = ptr->next)
429 downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
430 for (i = 0, ptr = dist; ptr; ptr = ptr->next)
431 downlinks[i++] = ptr->itup;
433 rootpg.block.blkno = GIST_ROOT_BLKNO;
434 rootpg.block.num = ndownlinks;
435 rootpg.list = gistfillitupvec(downlinks, ndownlinks,
444 /* Prepare split-info to be returned to caller */
445 for (ptr = dist; ptr; ptr = ptr->next)
447 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
449 si->buf = ptr->buffer;
450 si->downlink = ptr->itup;
451 *splitinfo = lappend(*splitinfo, si);
456 * Fill all pages. All the pages are new, ie. freshly allocated empty
457 * pages, or a temporary copy of the old page.
459 for (ptr = dist; ptr; ptr = ptr->next)
461 char *data = (char *) (ptr->list);
463 for (i = 0; i < ptr->block.num; i++)
465 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
466 elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
467 data += IndexTupleSize((IndexTuple) data);
470 /* Set up rightlinks */
471 if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
472 GistPageGetOpaque(ptr->page)->rightlink =
473 ptr->next->block.blkno;
475 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
477 if (ptr->next && !is_rootsplit)
478 GistMarkFollowRight(ptr->page);
480 GistClearFollowRight(ptr->page);
483 * Copy the NSN of the original page to all pages. The
484 * F_FOLLOW_RIGHT flags ensure that scans will follow the
485 * rightlinks until the downlinks are inserted.
487 GistPageGetOpaque(ptr->page)->nsn = oldnsn;
490 START_CRIT_SECTION();
493 * Must mark buffers dirty before XLogInsert, even though we'll still
494 * be changing their opaque fields below.
496 for (ptr = dist; ptr; ptr = ptr->next)
497 MarkBufferDirty(ptr->buffer);
498 if (BufferIsValid(leftchildbuf))
499 MarkBufferDirty(leftchildbuf);
502 * The first page in the chain was a temporary working copy meant to
503 * replace the old page. Copy it over the old page.
505 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
506 dist->page = BufferGetPage(dist->buffer);
508 /* Write the WAL record */
509 if (RelationNeedsWAL(state->r))
510 recptr = gistXLogSplit(state->r->rd_node, blkno, is_leaf,
511 dist, oldrlink, oldnsn, leftchildbuf);
513 recptr = GetXLogRecPtrForTemp();
515 for (ptr = dist; ptr; ptr = ptr->next)
517 PageSetLSN(ptr->page, recptr);
518 PageSetTLI(ptr->page, ThisTimeLineID);
522 * Return the new child buffers to the caller.
524 * If this was a root split, we've already inserted the downlink
525 * pointers, in the form of a new root page. Therefore we can release
526 * all the new buffers, and keep just the root page locked.
530 for (ptr = dist->next; ptr; ptr = ptr->next)
531 UnlockReleaseBuffer(ptr->buffer);
537 * Enough space. We also get here if ntuples==0.
539 START_CRIT_SECTION();
541 if (OffsetNumberIsValid(oldoffnum))
542 PageIndexTupleDelete(page, oldoffnum);
543 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
545 MarkBufferDirty(buffer);
547 if (BufferIsValid(leftchildbuf))
548 MarkBufferDirty(leftchildbuf);
550 if (RelationNeedsWAL(state->r))
552 OffsetNumber ndeloffs = 0,
555 if (OffsetNumberIsValid(oldoffnum))
557 deloffs[0] = oldoffnum;
561 recptr = gistXLogUpdate(state->r->rd_node, buffer,
562 deloffs, ndeloffs, itup, ntup,
565 PageSetLSN(page, recptr);
566 PageSetTLI(page, ThisTimeLineID);
570 recptr = GetXLogRecPtrForTemp();
571 PageSetLSN(page, recptr);
578 * If we inserted the downlink for a child page, set NSN and clear
579 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
580 * follow the rightlink if and only if they looked at the parent page
581 * before we inserted the downlink.
583 * Note that we do this *after* writing the WAL record. That means that
584 * the possible full page image in the WAL record does not include these
585 * changes, and they must be replayed even if the page is restored from
586 * the full page image. There's a chicken-and-egg problem: if we updated
587 * the child pages first, we wouldn't know the recptr of the WAL record
588 * we're about to write.
590 if (BufferIsValid(leftchildbuf))
592 Page leftpg = BufferGetPage(leftchildbuf);
594 GistPageGetOpaque(leftpg)->nsn = recptr;
595 GistClearFollowRight(leftpg);
597 PageSetLSN(leftpg, recptr);
598 PageSetTLI(leftpg, ThisTimeLineID);
607 * Workhouse routine for doing insertion into a GiST index. Note that
608 * this routine assumes it is invoked in a short-lived memory context,
609 * so it does not bother releasing palloc'd allocations.
612 gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
616 GISTInsertStack firststack;
617 GISTInsertStack *stack;
618 GISTInsertState state;
619 bool xlocked = false;
621 memset(&state, 0, sizeof(GISTInsertState));
622 state.freespace = freespace;
625 /* Start from the root */
626 firststack.blkno = GIST_ROOT_BLKNO;
627 firststack.lsn.xrecoff = 0;
628 firststack.parent = NULL;
629 state.stack = stack = &firststack;
632 * Walk down along the path of smallest penalty, updating the parent
633 * pointers with the key we're inserting as we go. If we crash in the
634 * middle, the tree is consistent, although the possible parent updates
639 if (XLogRecPtrIsInvalid(stack->lsn))
640 stack->buffer = ReadBuffer(state.r, stack->blkno);
643 * Be optimistic and grab shared lock first. Swap it for an exclusive
644 * lock later if we need to update the page.
648 LockBuffer(stack->buffer, GIST_SHARE);
649 gistcheckpage(state.r, stack->buffer);
652 stack->page = (Page) BufferGetPage(stack->buffer);
653 stack->lsn = PageGetLSN(stack->page);
654 Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
657 * If this page was split but the downlink was never inserted to the
658 * parent because the inserting backend crashed before doing that, fix
661 if (GistFollowRight(stack->page))
665 LockBuffer(stack->buffer, GIST_UNLOCK);
666 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
668 /* someone might've completed the split when we unlocked */
669 if (!GistFollowRight(stack->page))
672 gistfixsplit(&state, giststate);
674 UnlockReleaseBuffer(stack->buffer);
676 state.stack = stack = stack->parent;
680 if (stack->blkno != GIST_ROOT_BLKNO &&
681 XLByteLT(stack->parent->lsn,
682 GistPageGetOpaque(stack->page)->nsn))
685 * Concurrent split detected. There's no guarantee that the
686 * downlink for this page is consistent with the tuple we're
687 * inserting anymore, so go back to parent and rechoose the best
690 UnlockReleaseBuffer(stack->buffer);
692 state.stack = stack = stack->parent;
696 if (!GistPageIsLeaf(stack->page))
699 * This is an internal page so continue to walk down the tree.
700 * Find the child node that has the minimum insertion penalty.
702 BlockNumber childblkno;
704 GISTInsertStack *item;
706 stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate);
707 iid = PageGetItemId(stack->page, stack->childoffnum);
708 idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
709 childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
712 * Check that it's not a leftover invalid tuple from pre-9.1
714 if (GistTupleIsInvalid(idxtuple))
716 (errmsg("index \"%s\" contains an inner tuple marked as invalid",
717 RelationGetRelationName(r)),
718 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
719 errhint("Please REINDEX it.")));
722 * Check that the key representing the target child node is
723 * consistent with the key we're inserting. Update it if it's not.
725 newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
729 * Swap shared lock for an exclusive one. Beware, the page may
730 * change while we unlock/lock the page...
734 LockBuffer(stack->buffer, GIST_UNLOCK);
735 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
737 stack->page = (Page) BufferGetPage(stack->buffer);
739 if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn))
741 /* the page was changed while we unlocked it, retry */
749 * We still hold the lock after gistinserttuples(), but it
750 * might have to split the page to make the updated tuple fit.
751 * In that case the updated tuple might migrate to the other
752 * half of the split, so we have to go back to the parent and
753 * descend back to the half that's a better fit for the new
756 if (gistinserttuples(&state, stack, giststate, &newtup, 1,
757 stack->childoffnum, InvalidBuffer))
760 * If this was a root split, the root page continues to be
761 * the parent and the updated tuple went to one of the
762 * child pages, so we just need to retry from the root
765 if (stack->blkno != GIST_ROOT_BLKNO)
767 UnlockReleaseBuffer(stack->buffer);
769 state.stack = stack = stack->parent;
774 LockBuffer(stack->buffer, GIST_UNLOCK);
777 /* descend to the chosen child */
778 item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
779 item->blkno = childblkno;
780 item->parent = stack;
781 state.stack = stack = item;
786 * Leaf page. Insert the new key. We've already updated all the
787 * parents on the way down, but we might have to split the page if
788 * it doesn't fit. gistinserthere() will take care of that.
792 * Swap shared lock for an exclusive one. Be careful, the page may
793 * change while we unlock/lock the page...
797 LockBuffer(stack->buffer, GIST_UNLOCK);
798 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
800 stack->page = (Page) BufferGetPage(stack->buffer);
801 stack->lsn = PageGetLSN(stack->page);
803 if (stack->blkno == GIST_ROOT_BLKNO)
806 * the only page that can become inner instead of leaf is
807 * the root page, so for root we should recheck it
809 if (!GistPageIsLeaf(stack->page))
812 * very rare situation: during unlock/lock index with
813 * number of pages = 1 was increased
815 LockBuffer(stack->buffer, GIST_UNLOCK);
821 * we don't need to check root split, because checking
822 * leaf/inner is enough to recognize split for root
825 else if (GistFollowRight(stack->page) ||
826 XLByteLT(stack->parent->lsn,
827 GistPageGetOpaque(stack->page)->nsn))
830 * The page was split while we momentarily unlocked the
831 * page. Go back to parent.
833 UnlockReleaseBuffer(stack->buffer);
835 state.stack = stack = stack->parent;
840 /* now state.stack->(page, buffer and blkno) points to leaf page */
842 gistinserttuples(&state, stack, giststate, &itup, 1,
843 InvalidOffsetNumber, InvalidBuffer);
844 LockBuffer(stack->buffer, GIST_UNLOCK);
846 /* Release any pins we might still hold before exiting */
847 for (; stack; stack = stack->parent)
848 ReleaseBuffer(stack->buffer);
855 * Traverse the tree to find path from root page to specified "child" block.
857 * returns from the beginning of closest parent;
859 * To prevent deadlocks, this should lock only one page at a time.
862 gistFindPath(Relation r, BlockNumber child)
870 GISTInsertStack *top,
875 top = tail = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
876 top->blkno = GIST_ROOT_BLKNO;
878 while (top && top->blkno != child)
880 buffer = ReadBuffer(r, top->blkno);
881 LockBuffer(buffer, GIST_SHARE);
882 gistcheckpage(r, buffer);
883 page = (Page) BufferGetPage(buffer);
885 if (GistPageIsLeaf(page))
887 /* we can safety go away, follows only leaf pages */
888 UnlockReleaseBuffer(buffer);
892 top->lsn = PageGetLSN(page);
895 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
896 * downlink. This should not normally happen..
898 if (GistFollowRight(page))
899 elog(ERROR, "concurrent GiST page split was incomplete");
901 if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
902 GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
904 /* page splited while we thinking of... */
905 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
906 ptr->blkno = GistPageGetOpaque(page)->rightlink;
907 ptr->childoffnum = InvalidOffsetNumber;
914 maxoff = PageGetMaxOffsetNumber(page);
916 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
918 iid = PageGetItemId(page, i);
919 idxtuple = (IndexTuple) PageGetItem(page, iid);
920 blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
923 OffsetNumber poff = InvalidOffsetNumber;
925 /* make childs links */
929 /* move childoffnum.. */
932 /* first iteration */
933 poff = ptr->parent->childoffnum;
934 ptr->parent->childoffnum = ptr->childoffnum;
938 OffsetNumber tmp = ptr->parent->childoffnum;
940 ptr->parent->childoffnum = poff;
945 top->childoffnum = i;
946 UnlockReleaseBuffer(buffer);
951 /* Install next inner page to the end of stack */
952 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
954 ptr->childoffnum = i; /* set offsetnumber of child to child
963 UnlockReleaseBuffer(buffer);
971 * Updates the stack so that child->parent is the correct parent of the
972 * child. child->parent must be exclusively locked on entry, and will
973 * remain so at exit, but it might not be the same page anymore.
976 gistFindCorrectParent(Relation r, GISTInsertStack *child)
978 GISTInsertStack *parent = child->parent;
980 gistcheckpage(r, parent->buffer);
981 parent->page = (Page) BufferGetPage(parent->buffer);
983 /* here we don't need to distinguish between split and page update */
984 if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page)))
986 /* parent is changed, look child in right links until found */
991 GISTInsertStack *ptr;
995 maxoff = PageGetMaxOffsetNumber(parent->page);
996 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
998 iid = PageGetItemId(parent->page, i);
999 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
1000 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1003 parent->childoffnum = i;
1008 parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1009 UnlockReleaseBuffer(parent->buffer);
1010 if (parent->blkno == InvalidBlockNumber)
1013 * end of chain and still didn't found parent, It's very-very
1014 * rare situation when root splited
1017 parent->buffer = ReadBuffer(r, parent->blkno);
1018 LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1019 gistcheckpage(r, parent->buffer);
1020 parent->page = (Page) BufferGetPage(parent->buffer);
1024 * awful!!, we need search tree to find parent ... , but before we
1025 * should release all old parent
1028 ptr = child->parent->parent; /* child->parent already released
1032 ReleaseBuffer(ptr->buffer);
1036 /* ok, find new path */
1037 ptr = parent = gistFindPath(r, child->blkno);
1038 Assert(ptr != NULL);
1040 /* read all buffers as expected by caller */
1041 /* note we don't lock them or gistcheckpage them here! */
1044 ptr->buffer = ReadBuffer(r, ptr->blkno);
1045 ptr->page = (Page) BufferGetPage(ptr->buffer);
1049 /* install new chain of parents to stack */
1050 child->parent = parent;
1052 /* make recursive call to normal processing */
1053 LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1054 gistFindCorrectParent(r, child);
1061 * Form a downlink pointer for the page in 'buf'.
1064 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1065 GISTInsertStack *stack)
1067 Page page = BufferGetPage(buf);
1068 OffsetNumber maxoff;
1069 OffsetNumber offset;
1070 IndexTuple downlink = NULL;
1072 maxoff = PageGetMaxOffsetNumber(page);
1073 for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1075 IndexTuple ituple = (IndexTuple)
1076 PageGetItem(page, PageGetItemId(page, offset));
1078 if (downlink == NULL)
1079 downlink = CopyIndexTuple(ituple);
1082 IndexTuple newdownlink;
1084 newdownlink = gistgetadjusted(rel, downlink, ituple,
1087 downlink = newdownlink;
1092 * If the page is completely empty, we can't form a meaningful downlink
1093 * for it. But we have to insert a downlink for the page. Any key will do,
1094 * as long as its consistent with the downlink of parent page, so that we
1095 * can legally insert it to the parent. A minimal one that matches as few
1096 * scans as possible would be best, to keep scans from doing useless work,
1097 * but we don't know how to construct that. So we just use the downlink of
1098 * the original page that was split - that's as far from optimal as it can
1105 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1106 gistFindCorrectParent(rel, stack);
1107 iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum);
1108 downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1109 downlink = CopyIndexTuple(downlink);
1110 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1113 ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1114 GistTupleSetValid(downlink);
1121 * Complete the incomplete split of state->stack->page.
1124 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1126 GISTInsertStack *stack = state->stack;
1129 List *splitinfo = NIL;
1131 elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1132 RelationGetRelationName(state->r), stack->blkno);
1134 Assert(GistFollowRight(stack->page));
1135 Assert(OffsetNumberIsValid(stack->parent->childoffnum));
1137 buf = stack->buffer;
1140 * Read the chain of split pages, following the rightlinks. Construct a
1141 * downlink tuple for each page.
1145 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1146 IndexTuple downlink;
1148 page = BufferGetPage(buf);
1150 /* Form the new downlink tuples to insert to parent */
1151 downlink = gistformdownlink(state->r, buf, giststate, stack);
1154 si->downlink = downlink;
1156 splitinfo = lappend(splitinfo, si);
1158 if (GistFollowRight(page))
1160 /* lock next page */
1161 buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1162 LockBuffer(buf, GIST_EXCLUSIVE);
1168 /* Insert the downlinks */
1169 gistfinishsplit(state, stack, giststate, splitinfo);
1173 * Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples
1174 * replace an old tuple at oldoffnum. The caller must hold an exclusive lock
1177 * If leftchild is valid, we're inserting/updating the downlink for the
1178 * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and
1179 * update NSN on leftchild, atomically with the insertion of the downlink.
1181 * Returns 'true' if the page had to be split. On return, we will continue
1182 * to hold an exclusive lock on state->stack->buffer, but if we had to split
1183 * the page, it might not contain the tuple we just inserted/updated.
1186 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1187 GISTSTATE *giststate,
1188 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1194 is_split = gistplacetopage(state, giststate, stack->buffer,
1195 tuples, ntup, oldoffnum,
1199 gistfinishsplit(state, stack, giststate, splitinfo);
1205 * Finish an incomplete split by inserting/updating the downlinks in
1206 * parent page. 'splitinfo' contains all the child pages, exclusively-locked,
1207 * involved in the split, from left-to-right.
1210 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1211 GISTSTATE *giststate, List *splitinfo)
1215 GISTPageSplitInfo *right;
1216 GISTPageSplitInfo *left;
1217 IndexTuple tuples[2];
1219 /* A split always contains at least two halves */
1220 Assert(list_length(splitinfo) >= 2);
1223 * We need to insert downlinks for each new page, and update the downlink
1224 * for the original (leftmost) page in the split. Begin at the rightmost
1225 * page, inserting one downlink at a time until there's only two pages
1226 * left. Finally insert the downlink for the last new page and update the
1227 * downlink for the original page as one operation.
1230 /* for convenience, create a copy of the list in reverse order */
1232 foreach(lc, splitinfo)
1234 reversed = lcons(lfirst(lc), reversed);
1237 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1238 gistFindCorrectParent(state->r, stack);
1240 while (list_length(reversed) > 2)
1242 right = (GISTPageSplitInfo *) linitial(reversed);
1243 left = (GISTPageSplitInfo *) lsecond(reversed);
1245 if (gistinserttuples(state, stack->parent, giststate,
1246 &right->downlink, 1,
1247 InvalidOffsetNumber,
1251 * If the parent page was split, need to relocate the original
1254 gistFindCorrectParent(state->r, stack);
1256 UnlockReleaseBuffer(right->buf);
1257 reversed = list_delete_first(reversed);
1260 right = (GISTPageSplitInfo *) linitial(reversed);
1261 left = (GISTPageSplitInfo *) lsecond(reversed);
1264 * Finally insert downlink for the remaining right page and update the
1265 * downlink for the original page to not contain the tuples that were
1266 * moved to the new pages.
1268 tuples[0] = left->downlink;
1269 tuples[1] = right->downlink;
1270 gistinserttuples(state, stack->parent, giststate,
1272 stack->parent->childoffnum,
1274 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1275 UnlockReleaseBuffer(right->buf);
1276 Assert(left->buf == stack->buffer);
1280 * gistSplit -- split a page in the tree and fill struct
1281 * used for XLOG and real writes buffers. Function is recursive, ie
1282 * it will split page until keys will fit in every page.
1285 gistSplit(Relation r,
1287 IndexTuple *itup, /* contains compressed entry */
1289 GISTSTATE *giststate)
1291 IndexTuple *lvectup,
1294 GistEntryVector *entryvec;
1296 SplitedPageLayout *res = NULL;
1298 /* generate the item array */
1299 entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
1300 entryvec->n = len + 1;
1302 memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1303 memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1304 gistSplitByKey(r, page, itup, len, giststate,
1307 /* form left and right vector */
1308 lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1309 rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1311 for (i = 0; i < v.splitVector.spl_nleft; i++)
1312 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1314 for (i = 0; i < v.splitVector.spl_nright; i++)
1315 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1317 /* finalize splitting (may need another split) */
1318 if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1320 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1325 res->block.num = v.splitVector.spl_nright;
1326 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1327 res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1330 if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1332 SplitedPageLayout *resptr,
1335 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1337 /* install on list's tail */
1338 while (resptr->next)
1339 resptr = resptr->next;
1347 res->block.num = v.splitVector.spl_nleft;
1348 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1349 res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1356 * Fill a GISTSTATE with information about the index
1359 initGISTstate(GISTSTATE *giststate, Relation index)
1363 if (index->rd_att->natts > INDEX_MAX_KEYS)
1364 elog(ERROR, "numberOfAttributes %d > %d",
1365 index->rd_att->natts, INDEX_MAX_KEYS);
1367 giststate->tupdesc = index->rd_att;
1369 for (i = 0; i < index->rd_att->natts; i++)
1371 fmgr_info_copy(&(giststate->consistentFn[i]),
1372 index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1373 CurrentMemoryContext);
1374 fmgr_info_copy(&(giststate->unionFn[i]),
1375 index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1376 CurrentMemoryContext);
1377 fmgr_info_copy(&(giststate->compressFn[i]),
1378 index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1379 CurrentMemoryContext);
1380 fmgr_info_copy(&(giststate->decompressFn[i]),
1381 index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1382 CurrentMemoryContext);
1383 fmgr_info_copy(&(giststate->penaltyFn[i]),
1384 index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1385 CurrentMemoryContext);
1386 fmgr_info_copy(&(giststate->picksplitFn[i]),
1387 index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1388 CurrentMemoryContext);
1389 fmgr_info_copy(&(giststate->equalFn[i]),
1390 index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1391 CurrentMemoryContext);
1392 /* opclasses are not required to provide a Distance method */
1393 if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1394 fmgr_info_copy(&(giststate->distanceFn[i]),
1395 index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1396 CurrentMemoryContext);
1398 giststate->distanceFn[i].fn_oid = InvalidOid;
1401 * If the index column has a specified collation, we should honor that
1402 * while doing comparisons. However, we may have a collatable storage
1403 * type for a noncollatable indexed data type. If there's no index
1404 * collation then specify default collation in case the support
1405 * functions need collation. This is harmless if the support
1406 * functions don't care about collation, so we just do it
1407 * unconditionally. (We could alternatively call get_typcollation,
1408 * but that seems like expensive overkill --- there aren't going to be
1409 * any cases where a GiST storage type has a nondefault collation.)
1411 if (OidIsValid(index->rd_indcollation[i]))
1412 giststate->supportCollation[i] = index->rd_indcollation[i];
1414 giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1419 freeGISTstate(GISTSTATE *giststate)