1 /*-------------------------------------------------------------------------
4 * interface routines for the postgres GiST index access method.
7 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/gist/gist.c
13 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/gist_private.h"
19 #include "catalog/index.h"
20 #include "catalog/pg_collation.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "storage/indexfsm.h"
24 #include "utils/memutils.h"
26 /* Working state for gistbuild and its callback */
35 /* A List of these is used represent a split-in-progress. */
38 Buffer buf; /* the split page "half" */
39 IndexTuple downlink; /* downlink for this half. */
42 /* non-export function prototypes */
43 static void gistbuildCallback(Relation index,
49 static void gistdoinsert(Relation r,
52 GISTSTATE *GISTstate);
53 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
54 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
56 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
58 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
59 GISTSTATE *giststate, List *splitinfo);
62 #define ROTATEDIST(d) do { \
63 SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
64 memset(tmp,0,sizeof(SplitedPageLayout)); \
65 tmp->block.blkno = InvalidBlockNumber; \
66 tmp->buffer = InvalidBuffer; \
73 * Create and return a temporary memory context for use by GiST. We
74 * _always_ invoke user-provided methods in a temporary memory
75 * context, so that memory leaks in those functions cannot cause
76 * problems. Also, we use some additional temporary contexts in the
77 * GiST code itself, to avoid the need to do some awkward manual
81 createTempGistContext(void)
83 return AllocSetContextCreate(CurrentMemoryContext,
84 "GiST temporary context",
85 ALLOCSET_DEFAULT_MINSIZE,
86 ALLOCSET_DEFAULT_INITSIZE,
87 ALLOCSET_DEFAULT_MAXSIZE);
91 * Routine to build an index. Basically calls insert over and over.
93 * XXX: it would be nice to implement some sort of bulk-loading
94 * algorithm, but it is not clear how to do that.
97 gistbuild(PG_FUNCTION_ARGS)
99 Relation heap = (Relation) PG_GETARG_POINTER(0);
100 Relation index = (Relation) PG_GETARG_POINTER(1);
101 IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
102 IndexBuildResult *result;
104 GISTBuildState buildstate;
109 * We expect to be called exactly once for any index relation. If that's
110 * not the case, big trouble's what we have.
112 if (RelationGetNumberOfBlocks(index) != 0)
113 elog(ERROR, "index \"%s\" already contains data",
114 RelationGetRelationName(index));
116 /* no locking is needed */
117 initGISTstate(&buildstate.giststate, index);
119 /* initialize the root page */
120 buffer = gistNewBuffer(index);
121 Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
122 page = BufferGetPage(buffer);
124 START_CRIT_SECTION();
126 GISTInitBuffer(buffer, F_LEAF);
128 MarkBufferDirty(buffer);
130 if (RelationNeedsWAL(index))
135 rdata.data = (char *) &(index->rd_node);
136 rdata.len = sizeof(RelFileNode);
137 rdata.buffer = InvalidBuffer;
140 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
141 PageSetLSN(page, recptr);
142 PageSetTLI(page, ThisTimeLineID);
145 PageSetLSN(page, GetXLogRecPtrForTemp());
147 UnlockReleaseBuffer(buffer);
151 /* build the index */
152 buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
153 buildstate.indtuples = 0;
156 * create a temporary memory context that is reset once for each tuple
157 * inserted into the index
159 buildstate.tmpCtx = createTempGistContext();
161 /* do the heap scan */
162 reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
163 gistbuildCallback, (void *) &buildstate);
165 /* okay, all heap tuples are indexed */
166 MemoryContextDelete(buildstate.tmpCtx);
168 freeGISTstate(&buildstate.giststate);
173 result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
175 result->heap_tuples = reltuples;
176 result->index_tuples = buildstate.indtuples;
178 PG_RETURN_POINTER(result);
182 * Per-tuple callback from IndexBuildHeapScan
185 gistbuildCallback(Relation index,
192 GISTBuildState *buildstate = (GISTBuildState *) state;
194 MemoryContext oldCtx;
196 oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
198 /* form an index tuple and point it at the heap tuple */
199 itup = gistFormTuple(&buildstate->giststate, index,
200 values, isnull, true /* size is currently bogus */ );
201 itup->t_tid = htup->t_self;
204 * Since we already have the index relation locked, we call gistdoinsert
205 * directly. Normal access method calls dispatch through gistinsert,
206 * which locks the relation for write. This is the right thing to do if
207 * you're inserting single tups, but not when you're initializing the
208 * whole index at once.
210 * In this path we respect the fillfactor setting, whereas insertions
211 * after initial build do not.
213 gistdoinsert(index, itup,
214 RelationGetTargetPageFreeSpace(index, GIST_DEFAULT_FILLFACTOR),
215 &buildstate->giststate);
217 buildstate->indtuples += 1;
218 MemoryContextSwitchTo(oldCtx);
219 MemoryContextReset(buildstate->tmpCtx);
223 * gistbuildempty() -- build an empty gist index in the initialization fork
226 gistbuildempty(PG_FUNCTION_ARGS)
229 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
230 errmsg("unlogged GiST indexes are not supported")));
236 * gistinsert -- wrapper for GiST tuple insertion.
238 * This is the public interface routine for tuple insertion in GiSTs.
239 * It doesn't do any work; just locks the relation and passes the buck.
242 gistinsert(PG_FUNCTION_ARGS)
244 Relation r = (Relation) PG_GETARG_POINTER(0);
245 Datum *values = (Datum *) PG_GETARG_POINTER(1);
246 bool *isnull = (bool *) PG_GETARG_POINTER(2);
247 ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
250 Relation heapRel = (Relation) PG_GETARG_POINTER(4);
251 IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
255 MemoryContext oldCtx;
256 MemoryContext insertCtx;
258 insertCtx = createTempGistContext();
259 oldCtx = MemoryContextSwitchTo(insertCtx);
261 initGISTstate(&giststate, r);
263 itup = gistFormTuple(&giststate, r,
264 values, isnull, true /* size is currently bogus */ );
265 itup->t_tid = *ht_ctid;
267 gistdoinsert(r, itup, 0, &giststate);
270 freeGISTstate(&giststate);
271 MemoryContextSwitchTo(oldCtx);
272 MemoryContextDelete(insertCtx);
274 PG_RETURN_BOOL(false);
279 * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
280 * at that offset is atomically removed along with inserting the new tuples.
281 * This is used to replace a tuple with a new one.
283 * If 'leftchildbuf' is valid, we're inserting the downlink for the page
284 * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
285 * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
287 * If there is not enough room on the page, it is split. All the split
288 * pages are kept pinned and locked and returned in *splitinfo, the caller
289 * is responsible for inserting the downlinks for them. However, if
290 * 'buffer' is the root page and it needs to be split, gistplacetopage()
291 * performs the split as one atomic operation, and *splitinfo is set to NIL.
292 * In that case, we continue to hold the root page locked, and the child
293 * pages are released; note that new tuple(s) are *not* on the root page
294 * but in one of the new child pages.
297 gistplacetopage(GISTInsertState *state, GISTSTATE *giststate,
299 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
303 Page page = BufferGetPage(buffer);
304 bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
310 * Refuse to modify a page that's incompletely split. This should not
311 * happen because we finish any incomplete splits while we walk down the
312 * tree. However, it's remotely possible that another concurrent inserter
313 * splits a parent page, and errors out before completing the split. We
314 * will just throw an error in that case, and leave any split we had in
315 * progress unfinished too. The next insert that comes along will clean up
318 if (GistFollowRight(page))
319 elog(ERROR, "concurrent GiST page split was incomplete");
324 * if isupdate, remove old key: This node's key has been modified, either
325 * because a child split occurred or because we needed to adjust our key
326 * for an insert in a child node. Therefore, remove the old version of
329 * for WAL replay, in the non-split case we handle this by setting up a
330 * one-element todelete array; in the split case, it's handled implicitly
331 * because the tuple vector passed to gistSplit won't include this tuple.
333 is_split = gistnospace(page, itup, ntup, oldoffnum, state->freespace);
336 /* no space for insertion */
339 SplitedPageLayout *dist = NULL,
341 BlockNumber oldrlink = InvalidBlockNumber;
342 GistNSN oldnsn = {0, 0};
343 SplitedPageLayout rootpg;
344 BlockNumber blkno = BufferGetBlockNumber(buffer);
347 is_rootsplit = (blkno == GIST_ROOT_BLKNO);
350 * Form index tuples vector to split. If we're replacing an old tuple,
351 * remove the old version from the vector.
353 itvec = gistextractpage(page, &tlen);
354 if (OffsetNumberIsValid(oldoffnum))
356 /* on inner page we should remove old tuple */
357 int pos = oldoffnum - FirstOffsetNumber;
361 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
363 itvec = gistjoinvector(itvec, &tlen, itup, ntup);
364 dist = gistSplit(state->r, page, itvec, tlen, giststate);
367 * Set up pages to work with. Allocate new buffers for all but the
368 * leftmost page. The original page becomes the new leftmost page, and
369 * is just replaced with the new contents.
371 * For a root-split, allocate new buffers for all child pages, the
372 * original page is overwritten with new root page containing
373 * downlinks to the new child pages.
378 /* save old rightlink and NSN */
379 oldrlink = GistPageGetOpaque(page)->rightlink;
380 oldnsn = GistPageGetOpaque(page)->nsn;
382 dist->buffer = buffer;
383 dist->block.blkno = BufferGetBlockNumber(buffer);
384 dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
386 /* clean all flags except F_LEAF */
387 GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
391 for (; ptr; ptr = ptr->next)
393 /* Allocate new page */
394 ptr->buffer = gistNewBuffer(state->r);
395 GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
396 ptr->page = BufferGetPage(ptr->buffer);
397 ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
401 * Now that we know whick blocks the new pages go to, set up downlink
402 * tuples to point to them.
404 for (ptr = dist; ptr; ptr = ptr->next)
406 ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
407 GistTupleSetValid(ptr->itup);
411 * If this is a root split, we construct the new root page with the
412 * downlinks here directly, instead of requiring the caller to insert
413 * them. Add the new root page to the list along with the child pages.
417 IndexTuple *downlinks;
421 rootpg.buffer = buffer;
422 rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
423 GistPageGetOpaque(rootpg.page)->flags = 0;
425 /* Prepare a vector of all the downlinks */
426 for (ptr = dist; ptr; ptr = ptr->next)
428 downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
429 for (i = 0, ptr = dist; ptr; ptr = ptr->next)
430 downlinks[i++] = ptr->itup;
432 rootpg.block.blkno = GIST_ROOT_BLKNO;
433 rootpg.block.num = ndownlinks;
434 rootpg.list = gistfillitupvec(downlinks, ndownlinks,
443 /* Prepare split-info to be returned to caller */
444 for (ptr = dist; ptr; ptr = ptr->next)
446 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
448 si->buf = ptr->buffer;
449 si->downlink = ptr->itup;
450 *splitinfo = lappend(*splitinfo, si);
455 * Fill all pages. All the pages are new, ie. freshly allocated empty
456 * pages, or a temporary copy of the old page.
458 for (ptr = dist; ptr; ptr = ptr->next)
460 char *data = (char *) (ptr->list);
462 for (i = 0; i < ptr->block.num; i++)
464 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
465 elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
466 data += IndexTupleSize((IndexTuple) data);
469 /* Set up rightlinks */
470 if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
471 GistPageGetOpaque(ptr->page)->rightlink =
472 ptr->next->block.blkno;
474 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
476 if (ptr->next && !is_rootsplit)
477 GistMarkFollowRight(ptr->page);
479 GistClearFollowRight(ptr->page);
482 * Copy the NSN of the original page to all pages. The
483 * F_FOLLOW_RIGHT flags ensure that scans will follow the
484 * rightlinks until the downlinks are inserted.
486 GistPageGetOpaque(ptr->page)->nsn = oldnsn;
489 START_CRIT_SECTION();
492 * Must mark buffers dirty before XLogInsert, even though we'll still
493 * be changing their opaque fields below.
495 for (ptr = dist; ptr; ptr = ptr->next)
496 MarkBufferDirty(ptr->buffer);
497 if (BufferIsValid(leftchildbuf))
498 MarkBufferDirty(leftchildbuf);
501 * The first page in the chain was a temporary working copy meant to
502 * replace the old page. Copy it over the old page.
504 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
505 dist->page = BufferGetPage(dist->buffer);
507 /* Write the WAL record */
508 if (RelationNeedsWAL(state->r))
509 recptr = gistXLogSplit(state->r->rd_node, blkno, is_leaf,
510 dist, oldrlink, oldnsn, leftchildbuf);
512 recptr = GetXLogRecPtrForTemp();
514 for (ptr = dist; ptr; ptr = ptr->next)
516 PageSetLSN(ptr->page, recptr);
517 PageSetTLI(ptr->page, ThisTimeLineID);
521 * Return the new child buffers to the caller.
523 * If this was a root split, we've already inserted the downlink
524 * pointers, in the form of a new root page. Therefore we can release
525 * all the new buffers, and keep just the root page locked.
529 for (ptr = dist->next; ptr; ptr = ptr->next)
530 UnlockReleaseBuffer(ptr->buffer);
536 * Enough space. We also get here if ntuples==0.
538 START_CRIT_SECTION();
540 if (OffsetNumberIsValid(oldoffnum))
541 PageIndexTupleDelete(page, oldoffnum);
542 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
544 MarkBufferDirty(buffer);
546 if (BufferIsValid(leftchildbuf))
547 MarkBufferDirty(leftchildbuf);
549 if (RelationNeedsWAL(state->r))
551 OffsetNumber ndeloffs = 0,
554 if (OffsetNumberIsValid(oldoffnum))
556 deloffs[0] = oldoffnum;
560 recptr = gistXLogUpdate(state->r->rd_node, buffer,
561 deloffs, ndeloffs, itup, ntup,
564 PageSetLSN(page, recptr);
565 PageSetTLI(page, ThisTimeLineID);
569 recptr = GetXLogRecPtrForTemp();
570 PageSetLSN(page, recptr);
577 * If we inserted the downlink for a child page, set NSN and clear
578 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
579 * follow the rightlink if and only if they looked at the parent page
580 * before we inserted the downlink.
582 * Note that we do this *after* writing the WAL record. That means that
583 * the possible full page image in the WAL record does not include these
584 * changes, and they must be replayed even if the page is restored from
585 * the full page image. There's a chicken-and-egg problem: if we updated
586 * the child pages first, we wouldn't know the recptr of the WAL record
587 * we're about to write.
589 if (BufferIsValid(leftchildbuf))
591 Page leftpg = BufferGetPage(leftchildbuf);
593 GistPageGetOpaque(leftpg)->nsn = recptr;
594 GistClearFollowRight(leftpg);
596 PageSetLSN(leftpg, recptr);
597 PageSetTLI(leftpg, ThisTimeLineID);
606 * Workhouse routine for doing insertion into a GiST index. Note that
607 * this routine assumes it is invoked in a short-lived memory context,
608 * so it does not bother releasing palloc'd allocations.
611 gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
615 GISTInsertStack firststack;
616 GISTInsertStack *stack;
617 GISTInsertState state;
618 bool xlocked = false;
620 memset(&state, 0, sizeof(GISTInsertState));
621 state.freespace = freespace;
624 /* Start from the root */
625 firststack.blkno = GIST_ROOT_BLKNO;
626 firststack.lsn.xrecoff = 0;
627 firststack.parent = NULL;
628 state.stack = stack = &firststack;
631 * Walk down along the path of smallest penalty, updating the parent
632 * pointers with the key we're inserting as we go. If we crash in the
633 * middle, the tree is consistent, although the possible parent updates
638 if (XLogRecPtrIsInvalid(stack->lsn))
639 stack->buffer = ReadBuffer(state.r, stack->blkno);
642 * Be optimistic and grab shared lock first. Swap it for an exclusive
643 * lock later if we need to update the page.
647 LockBuffer(stack->buffer, GIST_SHARE);
648 gistcheckpage(state.r, stack->buffer);
651 stack->page = (Page) BufferGetPage(stack->buffer);
652 stack->lsn = PageGetLSN(stack->page);
653 Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
656 * If this page was split but the downlink was never inserted to the
657 * parent because the inserting backend crashed before doing that, fix
660 if (GistFollowRight(stack->page))
664 LockBuffer(stack->buffer, GIST_UNLOCK);
665 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
667 /* someone might've completed the split when we unlocked */
668 if (!GistFollowRight(stack->page))
671 gistfixsplit(&state, giststate);
673 UnlockReleaseBuffer(stack->buffer);
675 state.stack = stack = stack->parent;
679 if (stack->blkno != GIST_ROOT_BLKNO &&
680 XLByteLT(stack->parent->lsn,
681 GistPageGetOpaque(stack->page)->nsn))
684 * Concurrent split detected. There's no guarantee that the
685 * downlink for this page is consistent with the tuple we're
686 * inserting anymore, so go back to parent and rechoose the best
689 UnlockReleaseBuffer(stack->buffer);
691 state.stack = stack = stack->parent;
695 if (!GistPageIsLeaf(stack->page))
698 * This is an internal page so continue to walk down the tree.
699 * Find the child node that has the minimum insertion penalty.
701 BlockNumber childblkno;
703 GISTInsertStack *item;
705 stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate);
706 iid = PageGetItemId(stack->page, stack->childoffnum);
707 idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
708 childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
711 * Check that it's not a leftover invalid tuple from pre-9.1
713 if (GistTupleIsInvalid(idxtuple))
715 (errmsg("index \"%s\" contains an inner tuple marked as invalid",
716 RelationGetRelationName(r)),
717 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
718 errhint("Please REINDEX it.")));
721 * Check that the key representing the target child node is
722 * consistent with the key we're inserting. Update it if it's not.
724 newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
728 * Swap shared lock for an exclusive one. Beware, the page may
729 * change while we unlock/lock the page...
733 LockBuffer(stack->buffer, GIST_UNLOCK);
734 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
736 stack->page = (Page) BufferGetPage(stack->buffer);
738 if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn))
740 /* the page was changed while we unlocked it, retry */
748 * We still hold the lock after gistinserttuples(), but it
749 * might have to split the page to make the updated tuple fit.
750 * In that case the updated tuple might migrate to the other
751 * half of the split, so we have to go back to the parent and
752 * descend back to the half that's a better fit for the new
755 if (gistinserttuples(&state, stack, giststate, &newtup, 1,
756 stack->childoffnum, InvalidBuffer))
759 * If this was a root split, the root page continues to be
760 * the parent and the updated tuple went to one of the
761 * child pages, so we just need to retry from the root
764 if (stack->blkno != GIST_ROOT_BLKNO)
766 UnlockReleaseBuffer(stack->buffer);
768 state.stack = stack = stack->parent;
773 LockBuffer(stack->buffer, GIST_UNLOCK);
776 /* descend to the chosen child */
777 item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
778 item->blkno = childblkno;
779 item->parent = stack;
780 state.stack = stack = item;
785 * Leaf page. Insert the new key. We've already updated all the
786 * parents on the way down, but we might have to split the page if
787 * it doesn't fit. gistinserthere() will take care of that.
791 * Swap shared lock for an exclusive one. Be careful, the page may
792 * change while we unlock/lock the page...
796 LockBuffer(stack->buffer, GIST_UNLOCK);
797 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
799 stack->page = (Page) BufferGetPage(stack->buffer);
800 stack->lsn = PageGetLSN(stack->page);
802 if (stack->blkno == GIST_ROOT_BLKNO)
805 * the only page that can become inner instead of leaf is
806 * the root page, so for root we should recheck it
808 if (!GistPageIsLeaf(stack->page))
811 * very rare situation: during unlock/lock index with
812 * number of pages = 1 was increased
814 LockBuffer(stack->buffer, GIST_UNLOCK);
820 * we don't need to check root split, because checking
821 * leaf/inner is enough to recognize split for root
824 else if (GistFollowRight(stack->page) ||
825 XLByteLT(stack->parent->lsn,
826 GistPageGetOpaque(stack->page)->nsn))
829 * The page was split while we momentarily unlocked the
830 * page. Go back to parent.
832 UnlockReleaseBuffer(stack->buffer);
834 state.stack = stack = stack->parent;
839 /* now state.stack->(page, buffer and blkno) points to leaf page */
841 gistinserttuples(&state, stack, giststate, &itup, 1,
842 InvalidOffsetNumber, InvalidBuffer);
843 LockBuffer(stack->buffer, GIST_UNLOCK);
845 /* Release any pins we might still hold before exiting */
846 for (; stack; stack = stack->parent)
847 ReleaseBuffer(stack->buffer);
854 * Traverse the tree to find path from root page to specified "child" block.
856 * returns from the beginning of closest parent;
858 * To prevent deadlocks, this should lock only one page at a time.
861 gistFindPath(Relation r, BlockNumber child)
869 GISTInsertStack *top,
874 top = tail = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
875 top->blkno = GIST_ROOT_BLKNO;
877 while (top && top->blkno != child)
879 buffer = ReadBuffer(r, top->blkno);
880 LockBuffer(buffer, GIST_SHARE);
881 gistcheckpage(r, buffer);
882 page = (Page) BufferGetPage(buffer);
884 if (GistPageIsLeaf(page))
886 /* we can safety go away, follows only leaf pages */
887 UnlockReleaseBuffer(buffer);
891 top->lsn = PageGetLSN(page);
894 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
895 * downlink. This should not normally happen..
897 if (GistFollowRight(page))
898 elog(ERROR, "concurrent GiST page split was incomplete");
900 if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
901 GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
903 /* page splited while we thinking of... */
904 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
905 ptr->blkno = GistPageGetOpaque(page)->rightlink;
906 ptr->childoffnum = InvalidOffsetNumber;
913 maxoff = PageGetMaxOffsetNumber(page);
915 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
917 iid = PageGetItemId(page, i);
918 idxtuple = (IndexTuple) PageGetItem(page, iid);
919 blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
922 OffsetNumber poff = InvalidOffsetNumber;
924 /* make childs links */
928 /* move childoffnum.. */
931 /* first iteration */
932 poff = ptr->parent->childoffnum;
933 ptr->parent->childoffnum = ptr->childoffnum;
937 OffsetNumber tmp = ptr->parent->childoffnum;
939 ptr->parent->childoffnum = poff;
944 top->childoffnum = i;
945 UnlockReleaseBuffer(buffer);
950 /* Install next inner page to the end of stack */
951 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
953 ptr->childoffnum = i; /* set offsetnumber of child to child
962 UnlockReleaseBuffer(buffer);
970 * Updates the stack so that child->parent is the correct parent of the
971 * child. child->parent must be exclusively locked on entry, and will
972 * remain so at exit, but it might not be the same page anymore.
975 gistFindCorrectParent(Relation r, GISTInsertStack *child)
977 GISTInsertStack *parent = child->parent;
979 gistcheckpage(r, parent->buffer);
980 parent->page = (Page) BufferGetPage(parent->buffer);
982 /* here we don't need to distinguish between split and page update */
983 if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page)))
985 /* parent is changed, look child in right links until found */
990 GISTInsertStack *ptr;
994 maxoff = PageGetMaxOffsetNumber(parent->page);
995 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
997 iid = PageGetItemId(parent->page, i);
998 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
999 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1002 parent->childoffnum = i;
1007 parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1008 UnlockReleaseBuffer(parent->buffer);
1009 if (parent->blkno == InvalidBlockNumber)
1012 * end of chain and still didn't found parent, It's very-very
1013 * rare situation when root splited
1016 parent->buffer = ReadBuffer(r, parent->blkno);
1017 LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1018 gistcheckpage(r, parent->buffer);
1019 parent->page = (Page) BufferGetPage(parent->buffer);
1023 * awful!!, we need search tree to find parent ... , but before we
1024 * should release all old parent
1027 ptr = child->parent->parent; /* child->parent already released
1031 ReleaseBuffer(ptr->buffer);
1035 /* ok, find new path */
1036 ptr = parent = gistFindPath(r, child->blkno);
1037 Assert(ptr != NULL);
1039 /* read all buffers as expected by caller */
1040 /* note we don't lock them or gistcheckpage them here! */
1043 ptr->buffer = ReadBuffer(r, ptr->blkno);
1044 ptr->page = (Page) BufferGetPage(ptr->buffer);
1048 /* install new chain of parents to stack */
1049 child->parent = parent;
1051 /* make recursive call to normal processing */
1052 LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1053 gistFindCorrectParent(r, child);
1060 * Form a downlink pointer for the page in 'buf'.
1063 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1064 GISTInsertStack *stack)
1066 Page page = BufferGetPage(buf);
1067 OffsetNumber maxoff;
1068 OffsetNumber offset;
1069 IndexTuple downlink = NULL;
1071 maxoff = PageGetMaxOffsetNumber(page);
1072 for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1074 IndexTuple ituple = (IndexTuple)
1075 PageGetItem(page, PageGetItemId(page, offset));
1077 if (downlink == NULL)
1078 downlink = CopyIndexTuple(ituple);
1081 IndexTuple newdownlink;
1083 newdownlink = gistgetadjusted(rel, downlink, ituple,
1086 downlink = newdownlink;
1091 * If the page is completely empty, we can't form a meaningful downlink
1092 * for it. But we have to insert a downlink for the page. Any key will do,
1093 * as long as its consistent with the downlink of parent page, so that we
1094 * can legally insert it to the parent. A minimal one that matches as few
1095 * scans as possible would be best, to keep scans from doing useless work,
1096 * but we don't know how to construct that. So we just use the downlink of
1097 * the original page that was split - that's as far from optimal as it can
1104 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1105 gistFindCorrectParent(rel, stack);
1106 iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum);
1107 downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1108 downlink = CopyIndexTuple(downlink);
1109 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1112 ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1113 GistTupleSetValid(downlink);
1120 * Complete the incomplete split of state->stack->page.
1123 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1125 GISTInsertStack *stack = state->stack;
1128 List *splitinfo = NIL;
1130 elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1131 RelationGetRelationName(state->r), stack->blkno);
1133 Assert(GistFollowRight(stack->page));
1134 Assert(OffsetNumberIsValid(stack->parent->childoffnum));
1136 buf = stack->buffer;
1139 * Read the chain of split pages, following the rightlinks. Construct a
1140 * downlink tuple for each page.
1144 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1145 IndexTuple downlink;
1147 page = BufferGetPage(buf);
1149 /* Form the new downlink tuples to insert to parent */
1150 downlink = gistformdownlink(state->r, buf, giststate, stack);
1153 si->downlink = downlink;
1155 splitinfo = lappend(splitinfo, si);
1157 if (GistFollowRight(page))
1159 /* lock next page */
1160 buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1161 LockBuffer(buf, GIST_EXCLUSIVE);
1167 /* Insert the downlinks */
1168 gistfinishsplit(state, stack, giststate, splitinfo);
1172 * Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples
1173 * replace an old tuple at oldoffnum. The caller must hold an exclusive lock
1176 * If leftchild is valid, we're inserting/updating the downlink for the
1177 * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and
1178 * update NSN on leftchild, atomically with the insertion of the downlink.
1180 * Returns 'true' if the page had to be split. On return, we will continue
1181 * to hold an exclusive lock on state->stack->buffer, but if we had to split
1182 * the page, it might not contain the tuple we just inserted/updated.
1185 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1186 GISTSTATE *giststate,
1187 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1193 is_split = gistplacetopage(state, giststate, stack->buffer,
1194 tuples, ntup, oldoffnum,
1198 gistfinishsplit(state, stack, giststate, splitinfo);
1204 * Finish an incomplete split by inserting/updating the downlinks in
1205 * parent page. 'splitinfo' contains all the child pages, exclusively-locked,
1206 * involved in the split, from left-to-right.
1209 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1210 GISTSTATE *giststate, List *splitinfo)
1214 GISTPageSplitInfo *right;
1215 GISTPageSplitInfo *left;
1216 IndexTuple tuples[2];
1218 /* A split always contains at least two halves */
1219 Assert(list_length(splitinfo) >= 2);
1222 * We need to insert downlinks for each new page, and update the downlink
1223 * for the original (leftmost) page in the split. Begin at the rightmost
1224 * page, inserting one downlink at a time until there's only two pages
1225 * left. Finally insert the downlink for the last new page and update the
1226 * downlink for the original page as one operation.
1229 /* for convenience, create a copy of the list in reverse order */
1231 foreach(lc, splitinfo)
1233 reversed = lcons(lfirst(lc), reversed);
1236 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1237 gistFindCorrectParent(state->r, stack);
1239 while (list_length(reversed) > 2)
1241 right = (GISTPageSplitInfo *) linitial(reversed);
1242 left = (GISTPageSplitInfo *) lsecond(reversed);
1244 if (gistinserttuples(state, stack->parent, giststate,
1245 &right->downlink, 1,
1246 InvalidOffsetNumber,
1250 * If the parent page was split, need to relocate the original
1253 gistFindCorrectParent(state->r, stack);
1255 UnlockReleaseBuffer(right->buf);
1256 reversed = list_delete_first(reversed);
1259 right = (GISTPageSplitInfo *) linitial(reversed);
1260 left = (GISTPageSplitInfo *) lsecond(reversed);
1263 * Finally insert downlink for the remaining right page and update the
1264 * downlink for the original page to not contain the tuples that were
1265 * moved to the new pages.
1267 tuples[0] = left->downlink;
1268 tuples[1] = right->downlink;
1269 gistinserttuples(state, stack->parent, giststate,
1271 stack->parent->childoffnum,
1273 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1274 UnlockReleaseBuffer(right->buf);
1275 Assert(left->buf == stack->buffer);
1279 * gistSplit -- split a page in the tree and fill struct
1280 * used for XLOG and real writes buffers. Function is recursive, ie
1281 * it will split page until keys will fit in every page.
1284 gistSplit(Relation r,
1286 IndexTuple *itup, /* contains compressed entry */
1288 GISTSTATE *giststate)
1290 IndexTuple *lvectup,
1293 GistEntryVector *entryvec;
1295 SplitedPageLayout *res = NULL;
1297 /* generate the item array */
1298 entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
1299 entryvec->n = len + 1;
1301 memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1302 memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1303 gistSplitByKey(r, page, itup, len, giststate,
1306 /* form left and right vector */
1307 lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1308 rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1310 for (i = 0; i < v.splitVector.spl_nleft; i++)
1311 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1313 for (i = 0; i < v.splitVector.spl_nright; i++)
1314 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1316 /* finalize splitting (may need another split) */
1317 if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1319 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1324 res->block.num = v.splitVector.spl_nright;
1325 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1326 res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1329 if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1331 SplitedPageLayout *resptr,
1334 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1336 /* install on list's tail */
1337 while (resptr->next)
1338 resptr = resptr->next;
1346 res->block.num = v.splitVector.spl_nleft;
1347 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1348 res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1355 * Fill a GISTSTATE with information about the index
1358 initGISTstate(GISTSTATE *giststate, Relation index)
1362 if (index->rd_att->natts > INDEX_MAX_KEYS)
1363 elog(ERROR, "numberOfAttributes %d > %d",
1364 index->rd_att->natts, INDEX_MAX_KEYS);
1366 giststate->tupdesc = index->rd_att;
1368 for (i = 0; i < index->rd_att->natts; i++)
1370 fmgr_info_copy(&(giststate->consistentFn[i]),
1371 index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1372 CurrentMemoryContext);
1373 fmgr_info_copy(&(giststate->unionFn[i]),
1374 index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1375 CurrentMemoryContext);
1376 fmgr_info_copy(&(giststate->compressFn[i]),
1377 index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1378 CurrentMemoryContext);
1379 fmgr_info_copy(&(giststate->decompressFn[i]),
1380 index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1381 CurrentMemoryContext);
1382 fmgr_info_copy(&(giststate->penaltyFn[i]),
1383 index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1384 CurrentMemoryContext);
1385 fmgr_info_copy(&(giststate->picksplitFn[i]),
1386 index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1387 CurrentMemoryContext);
1388 fmgr_info_copy(&(giststate->equalFn[i]),
1389 index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1390 CurrentMemoryContext);
1391 /* opclasses are not required to provide a Distance method */
1392 if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1393 fmgr_info_copy(&(giststate->distanceFn[i]),
1394 index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1395 CurrentMemoryContext);
1397 giststate->distanceFn[i].fn_oid = InvalidOid;
1400 * If the index column has a specified collation, we should honor that
1401 * while doing comparisons. However, we may have a collatable storage
1402 * type for a noncollatable indexed data type. If there's no index
1403 * collation then specify default collation in case the support
1404 * functions need collation. This is harmless if the support
1405 * functions don't care about collation, so we just do it
1406 * unconditionally. (We could alternatively call get_typcollation,
1407 * but that seems like expensive overkill --- there aren't going to be
1408 * any cases where a GiST storage type has a nondefault collation.)
1410 if (OidIsValid(index->rd_indcollation[i]))
1411 giststate->supportCollation[i] = index->rd_indcollation[i];
1413 giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1418 freeGISTstate(GISTSTATE *giststate)