OSDN Git Service

Duplicates handling...
authorVadim B. Mikheev <vadim4o@yahoo.com>
Tue, 10 Jun 1997 07:28:50 +0000 (07:28 +0000)
committerVadim B. Mikheev <vadim4o@yahoo.com>
Tue, 10 Jun 1997 07:28:50 +0000 (07:28 +0000)
src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtsearch.c

index 890a921..b1aa734 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.15 1997/06/06 03:11:42 vadim Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.16 1997/06/10 07:28:47 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
 #endif
 
 static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem);
-static Buffer _bt_split(Relation rel, Buffer buf, BTItem hiRightItem);
+static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright);
 static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit);
 static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
 static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem);
 static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem);
 static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem);
 static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey);
+#if 0
 static InsertIndexResult _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem hikey);
+#endif
 
 /*
  *  _bt_doinsert() -- Handle insertion of a single btitem in the tree.
@@ -52,14 +54,13 @@ _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel
     BTStack stack;
     Buffer buf;
     BlockNumber blkno;
-    int natts;
+    int natts = rel->rd_rel->relnatts;
     InsertIndexResult res;
     
     itup = &(btitem->bti_itup);
     
     /* we need a scan key to do our search, so build one */
     itup_scankey = _bt_mkscankey(rel, itup);
-    natts = rel->rd_rel->relnatts;
     
     /* find the page containing this key */
     stack = _bt_search(rel, natts, itup_scankey, &buf);
@@ -223,17 +224,13 @@ _bt_insertonpg(Relation rel,
 {
     InsertIndexResult res;
     Page page;
-    Buffer rbuf;
-    Buffer pbuf;
-    Page rpage;
-    BTItem ritem;
     BTPageOpaque lpageop;
-    BTPageOpaque rpageop;
-    BlockNumber rbknum, itup_blkno;
+    BlockNumber itup_blkno;
     OffsetNumber itup_off;
+    OffsetNumber firstright = InvalidOffsetNumber;
     int itemsz;
-    Page ppage;
-    BTPageOpaque ppageop;
+    bool do_split = false;
+    bool keys_equal = false;
     
     page = BufferGetPage(buf);
     lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -248,8 +245,9 @@ _bt_insertonpg(Relation rel,
      * page in the chain of duplicates then:
      * 1. if scankey == hikey (i.e. - new duplicate item) then
      *    insert it here;
-     * 2. if scankey < hikey then we grab new page, copy current page
-     *    content there and insert new item on the current page.
+     * 2. if scankey < hikey then:
+     *    2.a if there is duplicate key(s) here - we force splitting;
+     *    2.b else - we may "eat" this page from duplicates chain.
      */
     if ( lpageop->btpo_flags & BTP_CHAIN )
     {
@@ -274,26 +272,105 @@ _bt_insertonpg(Relation rel,
            if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid, 
                                        BTLessStrategyNumber) )
                elog (FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates");
-           return (_bt_shift(rel, buf, stack, keysz, scankey, btitem, hitem));
+           if ( maxoff > P_HIKEY )     /* have duplicate(s) */
+           {
+               firstright = P_FIRSTKEY;
+               do_split = true;
+           }
+           else                        /* "eat" page */
+           {
+               Buffer pbuf;
+               Page ppage;
+               
+               itup_blkno = BufferGetBlockNumber(buf);
+               itup_off = PageAddItem(page, (Item) btitem, itemsz, 
+                                               P_FIRSTKEY, LP_USED);
+               if ( itup_off == InvalidOffsetNumber )
+                   elog (FATAL, "btree: failed to add item");
+               lpageop->btpo_flags &= ~BTP_CHAIN;
+               pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
+               ppage = BufferGetPage(pbuf);
+                       PageIndexTupleDelete(ppage, stack->bts_offset);
+               pfree(stack->bts_btitem);
+               stack->bts_btitem = _bt_formitem(&(btitem->bti_itup));
+               ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), 
+                              itup_blkno, P_HIKEY);
+               _bt_wrtbuf(rel, buf);
+               res = _bt_insertonpg(rel, pbuf, stack->bts_parent,
+                                       keysz, scankey, stack->bts_btitem,
+                                       NULL);
+               ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
+               return (res);
+           }
        }
+       else
+       {
+           keys_equal = true;
+           if ( PageGetFreeSpace(page) < itemsz )
+               do_split = true;
+       }
+    }
+    else if ( PageGetFreeSpace(page) < itemsz )
+       do_split = true;
+    else if ( PageGetFreeSpace(page) < 3*itemsz + 2*sizeof(ItemIdData) )
+    {
+       OffsetNumber offnum = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY;
+       OffsetNumber maxoff = PageGetMaxOffsetNumber (page);
+       ItemId itid;
+       BTItem previtem, chkitem;
+       Size maxsize;
+       Size currsize;
+       
+       itid = PageGetItemId(page, offnum);
+       previtem = (BTItem) PageGetItem(page, itid);
+       maxsize = currsize = (ItemIdGetLength(itid) + sizeof(ItemIdData));
+       for (offnum = OffsetNumberNext(offnum);
+                       offnum <= maxoff; offnum = OffsetNumberNext(offnum) )
+       {
+           itid = PageGetItemId(page, offnum);
+           chkitem = (BTItem) PageGetItem(page, itid);
+           if ( !_bt_itemcmp (rel, keysz, previtem, chkitem,
+                                               BTEqualStrategyNumber) )
+           {
+               if ( currsize > maxsize )
+                   maxsize = currsize;
+               currsize = 0;
+               previtem = chkitem;
+           }
+           currsize += (ItemIdGetLength(itid) + sizeof(ItemIdData));
+       }
+       if ( currsize > maxsize )
+           maxsize = currsize;
+       maxsize += sizeof (PageHeaderData) + 
+                       DOUBLEALIGN (sizeof (BTPageOpaqueData));
+       if ( maxsize >= PageGetPageSize (page) / 2 )
+           do_split = true;
     }
-
     
-    if (PageGetFreeSpace(page) < itemsz)
+    if ( do_split )
     {
+       Buffer rbuf;
+       Page rpage;
+       BTItem ritem;
+       BlockNumber rbknum;
+       BTPageOpaque rpageop;
+       Buffer pbuf;
+       Page ppage;
+       BTPageOpaque ppageop;
        BlockNumber bknum = BufferGetBlockNumber(buf);
        BTItem lowLeftItem;
-       BTItem hiRightItem = NULL;
+       OffsetNumber maxoff;
+       bool shifted = false;
+       bool left_chained = ( lpageop->btpo_flags & BTP_CHAIN ) ? true : false;
        
        /* 
-        * If we have to split leaf page in the chain of duplicates
-        * then we try to look at our right sibling first.
+        * If we have to split leaf page in the chain of duplicates by
+        * new duplicate then we try to look at our right sibling first.
         */
        if ( ( lpageop->btpo_flags & BTP_CHAIN ) &&
-                       ( lpageop->btpo_flags & BTP_LEAF ) )
+               ( lpageop->btpo_flags & BTP_LEAF ) && keys_equal )
        {
            bool use_left = true;
-           bool keys_equal = false;
                
            rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
            rpage = BufferGetPage(rbuf);
@@ -309,29 +386,20 @@ _bt_insertonpg(Relation rel,
                {
                    if ( !( rpageop->btpo_flags & BTP_CHAIN ) )
                        elog (FATAL, "btree: lost page in the chain of duplicates");
-                   keys_equal = true;
                }
                else if ( _bt_skeycmp (rel, keysz, scankey, rpage, 
                                PageGetItemId(rpage, P_HIKEY), 
                                BTGreaterStrategyNumber) )
                    elog (FATAL, "btree: hikey is out of order");
-               /*
-                * If hikey > scankey and BTP_CHAIN is ON 
-                * then it's first page of the chain of higher keys:
-                * our left sibling hikey was lying! We can't add new 
-                * item here, but we can turn BTP_CHAIN off on our
-                * left page and overwrite its hikey.
+               else if ( rpageop->btpo_flags & BTP_CHAIN )
+               /* 
+                * If hikey > scankey then it's last page in chain and
+                * BTP_CHAIN must be OFF 
                 */
-               if ( !keys_equal && ( rpageop->btpo_flags & BTP_CHAIN ) )
-               {
-                   BTItem tmp;
-                   
-                   lpageop->btpo_flags &= ~BTP_CHAIN;
-                   tmp = (BTItem) PageGetItem(rpage, PageGetItemId(rpage, P_HIKEY));
-                   hiRightItem = _bt_formitem(&(tmp->bti_itup));
-               }
+                   elog (FATAL, "btree: lost last page in the chain of duplicates");
+               
                /* if there is room here then we use this page. */
-               else if ( PageGetFreeSpace (rpage) > itemsz )
+               if ( PageGetFreeSpace (rpage) > itemsz )
                    use_left = false;
            }
            else        /* rightmost page */
@@ -349,12 +417,70 @@ _bt_insertonpg(Relation rel,
            }
            _bt_relbuf(rel, rbuf, BT_WRITE);
        }
+       /*
+        * If after splitting un-chained page we'll got chain of pages 
+        * with duplicates then we want to know 
+        * 1. on which of two pages new btitem will go (current
+        *    _bt_findsplitloc is quite bad);
+        * 2. what parent (if there's one) thinking about it
+        *    (remember about deletions)
+        */
+       else if ( !( lpageop->btpo_flags & BTP_CHAIN ) )
+       {
+           OffsetNumber start = ( P_RIGHTMOST(lpageop) ) ? P_HIKEY : P_FIRSTKEY;
+           Size llimit;
+           
+           maxoff = PageGetMaxOffsetNumber (page);
+           llimit = PageGetPageSize(page) - sizeof (PageHeaderData) - 
+                       DOUBLEALIGN (sizeof (BTPageOpaqueData)) 
+                       + sizeof(ItemIdData);
+           llimit /= 2;
+           firstright = _bt_findsplitloc(rel, page, start, maxoff, llimit);
+           
+           if ( _bt_itemcmp (rel, keysz, 
+               (BTItem) PageGetItem(page, PageGetItemId(page, start)),
+               (BTItem) PageGetItem(page, PageGetItemId(page, firstright)), 
+                               BTEqualStrategyNumber) )
+           {
+               if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                                       PageGetItemId(page, firstright), 
+                                       BTLessStrategyNumber) )
+               /* 
+                * force moving current items to the new page:
+                * new item will go on the current page.
+                */
+                   firstright = start;
+               else
+               /* 
+                * new btitem >= firstright, start item == firstright -
+                * new chain of duplicates: if this non-leftmost leaf
+                * page and parent item < start item then force moving
+                * all items to the new page - current page will be
+                * "empty" after it.
+                */
+               {
+                   if ( !P_LEFTMOST (lpageop) && 
+                       ( lpageop->btpo_flags & BTP_LEAF ) )
+                   {
+                       ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), 
+                              bknum, P_HIKEY);
+                       pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
+                       if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, 
+                                       (BTItem) PageGetItem(page, 
+                                               PageGetItemId(page, start)),
+                                       BTLessStrategyNumber) )
+                       {
+                           firstright = start;
+                           shifted = true;
+                       }
+                       _bt_relbuf(rel, pbuf, BT_WRITE);
+                   }
+               }
+           }   /* else - no new chain if start item < firstright one */
+       }
        
        /* split the buffer into left and right halves */
-       rbuf = _bt_split(rel, buf, hiRightItem);
-       
-       if ( hiRightItem != (BTItem) NULL )
-           pfree (hiRightItem);
+       rbuf = _bt_split(rel, buf, firstright);
        
        /* which new page (left half or right half) gets the tuple? */
        if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) {
@@ -369,13 +495,24 @@ _bt_insertonpg(Relation rel,
            itup_blkno = BufferGetBlockNumber(rbuf);
        }
        
-       lowLeftItem = (BTItem) PageGetItem(page,
+       maxoff = PageGetMaxOffsetNumber (page);
+       if ( shifted )
+       {
+           if ( maxoff > P_FIRSTKEY )
+               elog (FATAL, "btree: shifted page is not empty");
+           lowLeftItem = (BTItem) NULL;
+       }
+       else
+       {
+           if ( maxoff < P_FIRSTKEY )
+               elog (FATAL, "btree: un-shifted page is empty");
+           lowLeftItem = (BTItem) PageGetItem(page,
                                         PageGetItemId(page, P_FIRSTKEY));
-       
-       if ( _bt_itemcmp (rel, keysz, lowLeftItem, 
+           if ( _bt_itemcmp (rel, keysz, lowLeftItem, 
                (BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)), 
                                BTEqualStrategyNumber) ) 
-           lpageop->btpo_flags |= BTP_CHAIN;
+               lpageop->btpo_flags |= BTP_CHAIN;
+       }
 
        /*
         *  By here,
@@ -405,6 +542,8 @@ _bt_insertonpg(Relation rel,
            BTItem new_item;
            OffsetNumber upditem_offset = P_HIKEY;
            bool do_update = false;
+           bool update_in_place = true;
+           bool parent_chained;
 
            /* form a index tuple that points at the new right page */
            rbknum = BufferGetBlockNumber(rbuf);
@@ -449,6 +588,10 @@ _bt_insertonpg(Relation rel,
            pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
            ppage = BufferGetPage(pbuf);
            ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
+           parent_chained = (( ppageop->btpo_flags & BTP_CHAIN )) ? true : false;
+           
+           if ( parent_chained && !left_chained )
+               elog (FATAL, "nbtree: unexpected chained parent of unchained page");
            
            /*
             *  If the key of new_item is < than the key of the item
@@ -472,7 +615,8 @@ _bt_insertonpg(Relation rel,
             */
            if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, new_item,
                                                BTGreaterStrategyNumber) ||
-                       ( _bt_itemcmp(rel, keysz, stack->bts_btitem, 
+                       ( !shifted && 
+                         _bt_itemcmp(rel, keysz, stack->bts_btitem, 
                                        new_item, BTEqualStrategyNumber) &&
                          _bt_itemcmp(rel, keysz, lowLeftItem, 
                                        new_item, BTLessStrategyNumber) ) )
@@ -491,34 +635,17 @@ _bt_insertonpg(Relation rel,
                    elog (FATAL, "btree: items are out of order (leftmost %d, stack %u, update %u)",
                        P_LEFTMOST(lpageop), stack->bts_offset, upditem_offset);
            }
-           /*
-            * There was bug caused by deletion all minimum keys (K1) from 
-            * an index page and insertion there (up to page splitting) 
-            * higher duplicate keys (K2): after it parent item for left
-            * page contained K1 and the next item (for new right page) - K2, 
-            * - and scan for the key = K2 lost items on the left page.
-            * So, we have to update parent item if its key < minimum
-            * key on the left and minimum keys on the left and on the right
-            * are equal. It would be nice to update hikey on the previous
-            * page of the left one too, but we may get deadlock here
-            * (read comments in _bt_split), so we leave previous page
-            * hikey _inconsistent_, but there should to be BTP_CHAIN flag
-            * on it, which privents _bt_moveright from dangerous movings 
-            * from there.      - vadim 05/27/97
-            */
-           else if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, 
-                                       lowLeftItem, BTLessStrategyNumber) && 
-                       _bt_itemcmp (rel, keysz, new_item, 
-                                       lowLeftItem, BTEqualStrategyNumber) ) 
-           {
-               do_update = true;
-               upditem_offset = stack->bts_offset;
-           }
            
            if ( do_update )
            {
-               /* Try to update in place. */
-               if ( DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) ==
+               if ( shifted )
+                   elog (FATAL, "btree: attempt to update parent for shifted page");
+               /* 
+                * Try to update in place. If out parent page is chained
+                * then we must forse insertion.
+                */
+               if ( !parent_chained && 
+                       DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) ==
                                DOUBLEALIGN (IndexTupleDSize (stack->bts_btitem->bti_itup)) ) 
                        {
                    _bt_updateitem(rel, keysz, pbuf, 
@@ -528,6 +655,7 @@ _bt_insertonpg(Relation rel,
                }
                else
                {
+                   update_in_place = false;
                            PageIndexTupleDelete(ppage, upditem_offset);
                
                   /* 
@@ -543,14 +671,18 @@ _bt_insertonpg(Relation rel,
                    ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), 
                               bknum, P_HIKEY);
                
-                   /* unlock the children before doing this */
+                   /* 
+                    * Unlock the children before doing this
+                    *
+                    * Mmm ... I foresee problems here. - vadim 06/10/97
+                    */
                    _bt_relbuf(rel, buf, BT_WRITE);
                    _bt_relbuf(rel, rbuf, BT_WRITE);
                
                    /* 
                     * A regular _bt_binsrch should find the right place to
-                    * put the new entry, since it should be either lower 
-                    * than any other key on the page or unique. 
+                    * put the new entry, since it should be lower than any 
+                    * other key on the page. 
                     * Therefore set afteritem to NULL.
                     */
                    newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup));
@@ -575,9 +707,49 @@ _bt_insertonpg(Relation rel,
            }
            
            newskey = _bt_mkscankey(rel, &(new_item->bti_itup));
+           
+           afteritem = stack->bts_btitem;
+           if ( parent_chained && !update_in_place )
+           {
+               ppage = BufferGetPage(pbuf);
+               ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
+               if ( ppageop->btpo_flags & BTP_CHAIN )
+                   elog (FATAL, "btree: unexpected BTP_CHAIN flag in parent after update");
+               if ( P_RIGHTMOST (ppageop) )
+                   elog (FATAL, "btree: chained parent is RIGHTMOST after update");
+               maxoff = PageGetMaxOffsetNumber (ppage);
+               if ( maxoff != P_FIRSTKEY )
+                   elog (FATAL, "btree: FIRSTKEY was unexpected in parent after update");
+               if ( _bt_skeycmp (rel, keysz, newskey, ppage, 
+                                       PageGetItemId(ppage, P_FIRSTKEY), 
+                                       BTLessEqualStrategyNumber) )
+                   elog (FATAL, "btree: parent FIRSTKEY is >= duplicate key after update");
+               if ( !_bt_skeycmp (rel, keysz, newskey, ppage, 
+                                       PageGetItemId(ppage, P_HIKEY), 
+                                       BTEqualStrategyNumber) )
+                   elog (FATAL, "btree: parent HIGHKEY is not equal duplicate key after update");
+               afteritem = (BTItem) NULL;
+           }
+           else if ( left_chained && !update_in_place )
+           {
+               ppage = BufferGetPage(pbuf);
+               ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
+               if ( !P_RIGHTMOST (ppageop) &&
+                       _bt_skeycmp (rel, keysz, newskey, ppage, 
+                                       PageGetItemId(ppage, P_HIKEY), 
+                                       BTGreaterStrategyNumber) )
+                   afteritem = (BTItem) NULL;
+           }
+           if ( afteritem == (BTItem) NULL)
+           {
+               rbuf = _bt_getbuf(rel, ppageop->btpo_next, BT_WRITE);
+               _bt_relbuf(rel, pbuf, BT_WRITE);
+               pbuf = rbuf;
+           }
+           
            newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
                                    keysz, newskey, new_item,
-                                   stack->bts_btitem);
+                                   afteritem);
            
            /* be tidy */
            pfree(newres);
@@ -607,7 +779,7 @@ _bt_insertonpg(Relation rel,
  *     pin and lock on buf are maintained.
  */
 static Buffer
-_bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
+_bt_split(Relation rel, Buffer buf, OffsetNumber firstright)
 {
     Buffer rbuf;
     Page origpage;
@@ -622,9 +794,7 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
     OffsetNumber leftoff, rightoff;
     OffsetNumber start;
     OffsetNumber maxoff;
-    OffsetNumber firstright;
     OffsetNumber i;
-    Size llimit;
     
     rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
     origpage = BufferGetPage(buf);
@@ -666,23 +836,9 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
        /* splitting a non-rightmost page, start at the first data item */
        start = P_FIRSTKEY;
 
-       /* 
-        * Copy the original high key to the new page if high key
-        * was not passed by caller.
-        */
-       if ( hiRightItem == NULL )
-       {
-           itemid = PageGetItemId(origpage, P_HIKEY);
-           itemsz = ItemIdGetLength(itemid);
-           item = (BTItem) PageGetItem(origpage, itemid);
-       }
-       else
-       {
-           item = hiRightItem;
-           itemsz = IndexTupleDSize(hiRightItem->bti_itup)
-                       + (sizeof(BTItemData) - sizeof(IndexTupleData));
-           itemsz = DOUBLEALIGN(itemsz);
-       }
+       itemid = PageGetItemId(origpage, P_HIKEY);
+       itemsz = ItemIdGetLength(itemid);
+       item = (BTItem) PageGetItem(origpage, itemid);
        if ( PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber )
            elog (FATAL, "btree: failed to add hikey to the right sibling");
        rightoff = P_FIRSTKEY;
@@ -694,8 +850,11 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
        rightoff = P_HIKEY;
     }
     maxoff = PageGetMaxOffsetNumber(origpage);
-    llimit = PageGetFreeSpace(leftpage) / 2;
-    firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit);
+    if ( firstright == InvalidOffsetNumber )
+    {
+       Size llimit = PageGetFreeSpace(leftpage) / 2;
+       firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit);
+    }
     
     for (i = start; i <= maxoff; i = OffsetNumberNext(i)) {
        itemid = PageGetItemId(origpage, i);
@@ -814,6 +973,9 @@ _bt_findsplitloc(Relation rel,
     Size nbytes;
     int natts;
     
+    if ( start >= maxoff )
+       elog (FATAL, "btree: cannot split if start (%d) >= maxoff (%d)",
+                       start, maxoff);
     natts = rel->rd_rel->relnatts;
     saferight = start;
     safeitemid = PageGetItemId(page, saferight);
@@ -822,8 +984,8 @@ _bt_findsplitloc(Relation rel,
     
     i = OffsetNumberNext(start);
     
-    while (nbytes < llimit) {
-       
+    while (nbytes < llimit)
+    {
        /* check the next item on the page */
        nxtitemid = PageGetItemId(page, i);
        nbytes += (ItemIdGetLength(nxtitemid) + sizeof(ItemIdData));
@@ -840,7 +1002,10 @@ _bt_findsplitloc(Relation rel,
            safeitem = nxtitem;
            saferight = i;
        }
-       i = OffsetNumberNext(i);
+       if ( i < maxoff )
+           i = OffsetNumberNext(i);
+       else
+           break;
     }
     
     /*
@@ -851,6 +1016,9 @@ _bt_findsplitloc(Relation rel,
     if (saferight == start)
        saferight = i;
     
+    if ( saferight == maxoff  && ( maxoff - start ) > 1 )
+       saferight = start + ( maxoff - start ) / 2;
+    
     return (saferight);
 }
 
@@ -1051,10 +1219,22 @@ _bt_goesonpg(Relation rel,
      *  If we have no adjacency information, and the item is equal to the
      *  high key on the page (by here it is), then the item does not belong
      *  on this page.
+     *
+     *  Now it's not true in all cases.                - vadim 06/10/97
      */
     
     if (afteritem == (BTItem) NULL)
+    {
+       if ( opaque->btpo_flags & BTP_LEAF )
+           return (false);
+       if ( opaque->btpo_flags & BTP_CHAIN )
+           return (true);
+       if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                               PageGetItemId(page, P_FIRSTKEY), 
+                               BTEqualStrategyNumber) )
+           return (true);
        return (false);
+    }
     
     /* damn, have to work for it.  i hate that. */
     maxoff = PageGetMaxOffsetNumber(page);
@@ -1269,6 +1449,7 @@ _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum,
     return (true);
 }
 
+#if 0
 /*
  * _bt_shift - insert btitem on the passed page after shifting page 
  *            to the right in the tree.
@@ -1404,3 +1585,4 @@ _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz,
     
     return (res);
 }
+#endif
index 45f4f76..a78acd0 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.20 1997/05/30 18:35:37 vadim Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.21 1997/06/10 07:28:50 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -159,6 +159,7 @@ _bt_moveright(Relation rel,
     BTPageOpaque opaque;
     ItemId hikey;
     BlockNumber rblkno;
+    int natts = rel->rd_rel->relnatts;
         
     page = BufferGetPage(buf);
     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -195,26 +196,9 @@ _bt_moveright(Relation rel,
             *  on this page to do not lose "good" tuples if number
             *  of attrs > keysize. Example: (2,0) - last items on 
             *  this page, (2,1) - first item on next page (hikey), 
-            *  our scankey is x = 2. Scankey >= (2,1) because of 
+            *  our scankey is x = 2. Scankey == (2,1) because of 
             *  we compare first attrs only, but we shouldn't to move
             *  right of here.          - vadim 04/15/97
-            *
-            *  XXX
-            *  This code changed again! Actually, we break our
-            *  duplicates handling in single case: if we insert 
-            *  new minimum key into leftmost page with duplicates
-            *  and splitting doesn't occure then _bt_insertonpg doesn't 
-            *  worry about duplicates-rule. Fix _bt_insertonpg ?
-            *  But I don't see why don't compare scankey with _last_
-            *  item on the page instead of first one, in any cases.
-            *  So - we do it in that way now.  - vadim 05/26/97
-            *
-            *  Also, if we are on an "pseudo-empty" leaf page (i.e. there is
-            *  only hikey here) and scankey == hikey then we don't move
-            *  right! It's fix for bug described in _bt_insertonpg(). It's 
-            *  right - at least till index cleanups are perfomed by vacuum 
-            *  in exclusive mode: so, though this page may be just splitted, 
-            *  it may not be "emptied" before we got here. - vadim 05/27/97
             */
            
            if ( _bt_skeycmp (rel, keysz, scankey, page, hikey, 
@@ -227,14 +211,27 @@ _bt_moveright(Relation rel,
                }
                if ( offmax > P_HIKEY )
                {
-                   if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                   if ( natts == keysz )       /* sanity checks */
+                   {
+                       if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                                       PageGetItemId (page, P_FIRSTKEY),
+                                       BTEqualStrategyNumber) )
+                           elog (FATAL, "btree: BTP_CHAIN flag was expected");
+                       if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                                       PageGetItemId (page, offmax),
+                                       BTEqualStrategyNumber) )
+                           elog (FATAL, "btree: unexpected equal last item");
+                       if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                                       PageGetItemId (page, offmax),
+                                       BTLessStrategyNumber) )
+                           elog (FATAL, "btree: unexpected greater last item");
+                       /* move right */
+                   }
+                   else if ( _bt_skeycmp (rel, keysz, scankey, page, 
                                PageGetItemId (page, offmax),
                                BTLessEqualStrategyNumber) )
                        break;
                }
-               else if ( offmax == P_HIKEY && 
-                       ( opaque->btpo_flags & BTP_LEAF ) )
-                   break;
            }
            
            /* step right one page */