OSDN Git Service

Fix duplicates handling.
authorVadim B. Mikheev <vadim4o@yahoo.com>
Fri, 30 May 1997 18:35:40 +0000 (18:35 +0000)
committerVadim B. Mikheev <vadim4o@yahoo.com>
Fri, 30 May 1997 18:35:40 +0000 (18:35 +0000)
src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtpage.c
src/backend/access/nbtree/nbtsearch.c
src/backend/access/nbtree/nbtsort.c

index 743583b..5349277 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.12 1997/04/16 01:48:11 vadim Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.13 1997/05/30 18:35:31 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
 #endif
 
 static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem);
-static Buffer _bt_split(Relation rel, Buffer buf);
+static Buffer _bt_split(Relation rel, Buffer buf, BTItem hiRightItem);
 static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit);
 static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
 static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem);
 static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem);
 static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem);
 static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey);
+static InsertIndexResult _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem hikey);
 
 /*
  *  _bt_doinsert() -- Handle insertion of a single btitem in the tree.
@@ -225,31 +226,152 @@ _bt_insertonpg(Relation rel,
     Buffer rbuf;
     Buffer pbuf;
     Page rpage;
-    ScanKey newskey;
     BTItem ritem;
+    BTPageOpaque lpageop;
     BTPageOpaque rpageop;
     BlockNumber rbknum, itup_blkno;
     OffsetNumber itup_off;
     int itemsz;
-    InsertIndexResult newres;
-    BTItem new_item = (BTItem) NULL;
-    BTItem lowLeftItem;
-    OffsetNumber leftmost_offset;
     Page ppage;
     BTPageOpaque ppageop;
-    BlockNumber bknum;
     
     page = BufferGetPage(buf);
+    lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
     itemsz = IndexTupleDSize(btitem->bti_itup)
        + (sizeof(BTItemData) - sizeof(IndexTupleData));
 
     itemsz = DOUBLEALIGN(itemsz);      /* be safe, PageAddItem will do this
                                           but we need to be consistent */
+    /*
+     * If we have to insert item on the leftmost page which is the first
+     * page in the chain of duplicates then:
+     * 1. if scankey == hikey (i.e. - new duplicate item) then
+     *    insert it here;
+     * 2. if scankey < hikey then we grab new page, copy current page
+     *    content there and insert new item on the current page.
+     */
+    if ( lpageop->btpo_flags & BTP_CHAIN )
+    {
+       OffsetNumber maxoff = PageGetMaxOffsetNumber (page);
+       ItemId hitemid;
+       BTItem hitem;
+       
+       Assert ( !P_RIGHTMOST(lpageop) );
+       hitemid = PageGetItemId(page, P_HIKEY);
+       hitem = (BTItem) PageGetItem(page, hitemid);
+       if ( maxoff > P_HIKEY && 
+               !_bt_itemcmp (rel, keysz, hitem, 
+                  (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)),
+                           BTEqualStrategyNumber) )
+           elog (FATAL, "btree: bad key on the page in the chain of duplicates");
+
+       if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid, 
+                                       BTEqualStrategyNumber) )
+       {
+           if ( !P_LEFTMOST(lpageop) )
+               elog (FATAL, "btree: attempt to insert bad key on the non-leftmost page in the chain of duplicates");
+           if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid, 
+                                       BTLessStrategyNumber) )
+               elog (FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates");
+           return (_bt_shift(rel, buf, stack, keysz, scankey, btitem, hitem));
+       }
+    }
+
     
-    if (PageGetFreeSpace(page) < itemsz) {
+    if (PageGetFreeSpace(page) < itemsz)
+    {
+       BlockNumber bknum = BufferGetBlockNumber(buf);
+       BTItem lowLeftItem;
+       BTItem hiRightItem = NULL;
+       
+       /* 
+        * If we have to split leaf page in the chain of duplicates
+        * then we try to move righter to avoid splitting.
+        */
+       if ( ( lpageop->btpo_flags & BTP_CHAIN ) &&
+                       ( lpageop->btpo_flags & BTP_LEAF ) )
+       {
+           bool use_left = true;
+               
+           for ( ; ; )
+           {
+               bool keys_equal = false;
+                   
+               rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
+               rpage = BufferGetPage(rbuf);
+               rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
+               if ( P_RIGHTMOST (rpageop) )
+               {
+                   Assert ( !( rpageop->btpo_flags & BTP_CHAIN ) );
+                   use_left = false;
+                   break;
+               }
+               /*
+                * If we have the same hikey here then it's
+                * yet another page in chain and we may move
+                * even righter.
+                */
+               if ( _bt_skeycmp (rel, keysz, scankey, rpage, 
+                                       PageGetItemId(rpage, P_HIKEY), 
+                                       BTEqualStrategyNumber) )
+               {
+                   if ( !( rpageop->btpo_flags & BTP_CHAIN ) )
+                       elog (FATAL, "btree: lost page in the chain of duplicates");
+                   keys_equal = true;
+               }
+               else if ( _bt_skeycmp (rel, keysz, scankey, rpage, 
+                                       PageGetItemId(rpage, P_HIKEY), 
+                                       BTGreaterStrategyNumber) )
+                   elog (FATAL, "btree: hikey is out of order");
+               /*
+                * If hikey > scankey and BTP_CHAIN is ON 
+                * then it's first page of the chain of higher keys:
+                * our left sibling hikey was lying! We can't add new 
+                * item here, but we can turn BTP_CHAIN off on our
+                * left page and overwrite its hikey.
+                */
+               if ( !keys_equal && ( rpageop->btpo_flags & BTP_CHAIN ) )
+               {
+                   BTItem tmp;
+                   
+                   lpageop->btpo_flags &= ~BTP_CHAIN;
+                   tmp = (BTItem) PageGetItem(rpage, 
+                                       PageGetItemId(rpage, P_HIKEY));
+                   hiRightItem = _bt_formitem(&(tmp->bti_itup));
+                   break;
+               }
+               /* 
+                * if there is room here or hikey > scankey (so it's our
+                * last page in the chain and we can't move righter) 
+                * we have to use this page .
+                */
+               if ( PageGetFreeSpace (rpage) > itemsz || !keys_equal )
+               {
+                   use_left = false;
+                   break;
+               }
+               /* try to move righter */
+               _bt_relbuf(rel, buf, BT_WRITE);
+               buf = rbuf;
+               page = rpage;
+               lpageop = rpageop;
+           }
+           if ( !use_left )    /* insert on the right page */
+           {
+               _bt_relbuf(rel, buf, BT_WRITE);
+               return ( _bt_insertonpg(rel, rbuf, stack, keysz, 
+                                       scankey, btitem, afteritem) );
+           }
+           _bt_relbuf(rel, rbuf, BT_WRITE);
+           bknum = BufferGetBlockNumber(buf);
+       }
        
        /* split the buffer into left and right halves */
-       rbuf = _bt_split(rel, buf);
+       rbuf = _bt_split(rel, buf, hiRightItem);
+       
+       if ( hiRightItem != (BTItem) NULL )
+           pfree (hiRightItem);
        
        /* which new page (left half or right half) gets the tuple? */
        if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) {
@@ -264,6 +386,14 @@ _bt_insertonpg(Relation rel,
            itup_blkno = BufferGetBlockNumber(rbuf);
        }
        
+       lowLeftItem = (BTItem) PageGetItem(page,
+                                        PageGetItemId(page, P_FIRSTKEY));
+       
+       if ( _bt_itemcmp (rel, keysz, lowLeftItem, 
+               (BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)), 
+                               BTEqualStrategyNumber) ) 
+           lpageop->btpo_flags |= BTP_CHAIN;
+
        /*
         *  By here,
         *
@@ -287,6 +417,11 @@ _bt_insertonpg(Relation rel,
            _bt_relbuf(rel, rbuf, BT_WRITE);
            
        } else {
+           ScanKey newskey;
+           InsertIndexResult newres;
+           BTItem new_item;
+           OffsetNumber upditem_offset = P_HIKEY;
+           bool do_update = false;
 
            /* form a index tuple that points at the new right page */
            rbknum = BufferGetBlockNumber(rbuf);
@@ -294,27 +429,43 @@ _bt_insertonpg(Relation rel,
            rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
            
            /*
-            *  By convention, the first entry (0) on every
+            *  By convention, the first entry (1) on every
             *  non-rightmost page is the high key for that page.  In
             *  order to get the lowest key on the new right page, we
-            *  actually look at its second (1) entry.
+            *  actually look at its second (2) entry.
             */
            
-           if (! P_RIGHTMOST(rpageop)) {
+           if (! P_RIGHTMOST(rpageop))
+           {
                ritem = (BTItem) PageGetItem(rpage,
                                             PageGetItemId(rpage, P_FIRSTKEY));
-           } else {
+               if ( _bt_itemcmp (rel, keysz, ritem, 
+                       (BTItem) PageGetItem(rpage, 
+                                       PageGetItemId(rpage, P_HIKEY)), 
+                               BTEqualStrategyNumber) ) 
+               rpageop->btpo_flags |= BTP_CHAIN;
+           }
+           else
                ritem = (BTItem) PageGetItem(rpage,
                                             PageGetItemId(rpage, P_HIKEY));
-           }
            
            /* get a unique btitem for this key */
            new_item = _bt_formitem(&(ritem->bti_itup));
            
            ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
            
-           /* find the parent buffer */
+           /* 
+            * Find the parent buffer and get the parent page.
+            *
+            * Oops - if we were moved right then we need to
+            * change stack item! We want to find parent pointing to
+            * where we are, right ?    - vadim 05/27/97
+            */
+           ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), 
+                              bknum, P_HIKEY);
            pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
+           ppage = BufferGetPage(pbuf);
+           ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
            
            /*
             *  If the key of new_item is < than the key of the item
@@ -330,29 +481,59 @@ _bt_insertonpg(Relation rel,
             *  key spills over to our new right page, we get an
             *  inconsistency if we don't update the left key in the
             *  parent page.
+            *
+            *  Also, new duplicates handling code require us to update
+            *  parent item if some smaller items left on the left page 
+            *  (which is possible in splitting leftmost page) and 
+            *  current parent item == new_item.        - vadim 05/27/97
             */
-           
-           if (_bt_itemcmp(rel, keysz, stack->bts_btitem, new_item,
-                           BTGreaterStrategyNumber)) {
-               ppageop = (BTPageOpaque) PageGetSpecialPointer(page);
-               Assert (P_LEFTMOST(ppageop));
-               lowLeftItem =
-                   (BTItem) PageGetItem(page,
-                                        PageGetItemId(page, P_FIRSTKEY));
-               
-               /* this method does not work--_bt_updateitem tries to     */
-               /* overwrite an entry with another entry that might be    */
-               /* bigger.  if lowLeftItem is bigger, it corrupts the     */
-               /* parent page.  instead, we have to delete the original  */
-               /* leftmost item from the parent, and insert the new one  */
-               /* with a regular _bt_insertonpg (it could cause a split  */
-               /* because it's bigger than what was there before).       */
-                /*                                  --djm 8/21/96         */
-
+           if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, new_item,
+                                               BTGreaterStrategyNumber) ||
+                       ( _bt_itemcmp(rel, keysz, stack->bts_btitem, 
+                                       new_item, BTEqualStrategyNumber) &&
+                         _bt_itemcmp(rel, keysz, lowLeftItem, 
+                                       new_item, BTLessStrategyNumber) ) )
+           {
+               do_update = true;
                /* 
-                * but it works for items with the same size and so why don't
-                * use it for them ? - vadim 12/05/96
+                * figure out which key is leftmost (if the parent page
+                * is rightmost, too, it must be the root)
                 */
+               if(P_RIGHTMOST(ppageop))
+                   upditem_offset = P_HIKEY;
+               else
+                   upditem_offset = P_FIRSTKEY;
+               if ( !P_LEFTMOST(lpageop) || 
+                       stack->bts_offset != upditem_offset )
+                   elog (FATAL, "btree: items are out of order");
+           }
+           /*
+            * There was bug caused by deletion all minimum keys (K1) from 
+            * an index page and insertion there (up to page splitting) 
+            * higher duplicate keys (K2): after it parent item for left
+            * page contained K1 and the next item (for new right page) - K2, 
+            * - and scan for the key = K2 lost items on the left page.
+            * So, we have to update parent item if its key < minimum
+            * key on the left and minimum keys on the left and on the right
+            * are equal. It would be nice to update hikey on the previous
+            * page of the left one too, but we may get deadlock here
+            * (read comments in _bt_split), so we leave previous page
+            * hikey _inconsistent_, but there should to be BTP_CHAIN flag
+            * on it, which privents _bt_moveright from dangerous movings 
+            * from there.      - vadim 05/27/97
+            */
+           else if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, 
+                                       lowLeftItem, BTLessStrategyNumber) && 
+                       _bt_itemcmp (rel, keysz, new_item, 
+                                       lowLeftItem, BTEqualStrategyNumber) ) 
+           {
+               do_update = true;
+               upditem_offset = stack->bts_offset;
+           }
+           
+           if ( do_update )
+           {
+               /* Try to update in place. */
                if ( DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) ==
                                DOUBLEALIGN (IndexTupleDSize (stack->bts_btitem->bti_itup)) ) 
                        {
@@ -363,33 +544,16 @@ _bt_insertonpg(Relation rel,
                }
                else
                {
-                   /* get the parent page */
-                   ppage = BufferGetPage(pbuf);
-                   ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
-
-                   /* 
-                    * figure out which key is leftmost (if the parent page
-                    * is rightmost, too, it must be the root)
-                    */
-                   if(P_RIGHTMOST(ppageop)) {
-                       leftmost_offset = P_HIKEY;
-                   } else {
-                       leftmost_offset = P_FIRSTKEY;
-                   }
-                           PageIndexTupleDelete(ppage, leftmost_offset);
+                           PageIndexTupleDelete(ppage, upditem_offset);
                
                   /* 
                    * don't write anything out yet--we still have the write
                    * lock, and now we call another _bt_insertonpg to
-                   * insert the correct leftmost key
+                   * insert the correct key.
+                   * First, make a new item, using the tuple data from
+                   * lowLeftItem. Point it to the left child.
+                   * Update it on the stack at the same time.
                    */
-
-                   /* 
-                    * make a new leftmost item, using the tuple data from
-                    * lowLeftItem.  point it to the left child.
-                    * update it on the stack at the same time.
-                    */
-                   bknum = BufferGetBlockNumber(buf);
                    pfree(stack->bts_btitem);
                    stack->bts_btitem = _bt_formitem(&(lowLeftItem->bti_itup));
                    ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), 
@@ -400,9 +564,10 @@ _bt_insertonpg(Relation rel,
                    _bt_relbuf(rel, rbuf, BT_WRITE);
                
                    /* 
-                    * a regular _bt_binsrch should find the right place to
-                    * put the new entry, since it should be lower than any
-                    * other key on the page, therefore set afteritem to NULL
+                    * A regular _bt_binsrch should find the right place to
+                    * put the new entry, since it should be either lower 
+                    * than any other key on the page or unique. 
+                    * Therefore set afteritem to NULL.
                     */
                    newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup));
                    newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
@@ -458,7 +623,7 @@ _bt_insertonpg(Relation rel,
  *     pin and lock on buf are maintained.
  */
 static Buffer
-_bt_split(Relation rel, Buffer buf)
+_bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
 {
     Buffer rbuf;
     Page origpage;
@@ -492,6 +657,7 @@ _bt_split(Relation rel, Buffer buf)
     
     /* if we're splitting this page, it won't be the root when we're done */
     oopaque->btpo_flags &= ~BTP_ROOT;
+    oopaque->btpo_flags &= ~BTP_CHAIN;
     lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags;
     lopaque->btpo_prev = oopaque->btpo_prev;
     ropaque->btpo_prev = BufferGetBlockNumber(buf);
@@ -516,10 +682,23 @@ _bt_split(Relation rel, Buffer buf)
        /* splitting a non-rightmost page, start at the first data item */
        start = P_FIRSTKEY;
 
-       /* copy the original high key to the new page */
-       itemid = PageGetItemId(origpage, P_HIKEY);
-       itemsz = ItemIdGetLength(itemid);
-       item = (BTItem) PageGetItem(origpage, itemid);
+       /* 
+        * Copy the original high key to the new page if high key
+        * was not passed by caller.
+        */
+       if ( hiRightItem == NULL )
+       {
+           itemid = PageGetItemId(origpage, P_HIKEY);
+           itemsz = ItemIdGetLength(itemid);
+           item = (BTItem) PageGetItem(origpage, itemid);
+       }
+       else
+       {
+           item = hiRightItem;
+           itemsz = IndexTupleDSize(hiRightItem->bti_itup)
+                       + (sizeof(BTItemData) - sizeof(IndexTupleData));
+           itemsz = DOUBLEALIGN(itemsz);
+       }
        (void) PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED);
        rightoff = P_FIRSTKEY;
     } else {
@@ -744,7 +923,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
     itemsz = ItemIdGetLength(itemid);
     item = (BTItem) PageGetItem(lpage, itemid);
     new_item = _bt_formitem(&(item->bti_itup));
-    ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_FIRSTKEY);
+    ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
     
     /*
      * insert the left page pointer into the new root page.  the root
@@ -1098,3 +1277,137 @@ _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum,
     /* by here, the keys are equal */
     return (true);
 }
+
+/*
+ * _bt_shift - insert btitem on the passed page after shifting page 
+ *            to the right in the tree.
+ *
+ * NOTE: tested for shifting leftmost page only, having btitem < hikey.
+ */
+static InsertIndexResult 
+_bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, 
+                       ScanKey scankey, BTItem btitem, BTItem hikey)
+{
+    InsertIndexResult res;
+    int itemsz;
+    Page page;
+    BlockNumber bknum;
+    BTPageOpaque pageop;
+    Buffer rbuf;
+    Page rpage;
+    BTPageOpaque rpageop;
+    Buffer pbuf;
+    Page ppage;
+    BTPageOpaque ppageop;
+    Buffer nbuf;
+    Page npage;
+    BTPageOpaque npageop;
+    BlockNumber nbknum;
+    BTItem nitem;
+    OffsetNumber afteroff;
+    
+    btitem = _bt_formitem(&(btitem->bti_itup));
+    hikey = _bt_formitem(&(hikey->bti_itup));
+
+    page = BufferGetPage(buf);
+
+    /* grab new page */
+    nbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
+    nbknum = BufferGetBlockNumber(nbuf);
+    npage = BufferGetPage(nbuf);
+    _bt_pageinit(npage, BufferGetPageSize(nbuf));
+    npageop = (BTPageOpaque) PageGetSpecialPointer(npage);
+    
+    /* copy content of the passed page */
+    memmove ((char *) npage, (char *) page, BufferGetPageSize(buf));
+    
+    /* re-init old (passed) page */
+    _bt_pageinit(page, BufferGetPageSize(buf));
+    pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+    /* init old page opaque */
+    pageop->btpo_flags = npageop->btpo_flags;  /* restore flags */
+    pageop->btpo_flags &= ~BTP_CHAIN;
+    if ( _bt_itemcmp (rel, keysz, hikey, btitem, BTEqualStrategyNumber) ) 
+       pageop->btpo_flags |= BTP_CHAIN;
+    pageop->btpo_prev = npageop->btpo_prev;    /* restore prev */
+    pageop->btpo_next = nbknum;                        /* next points to the new page */
+
+    /* init shifted page opaque */
+    npageop->btpo_prev = bknum = BufferGetBlockNumber(buf);
+    
+    /* shifted page is ok, populate old page */
+
+    /* add passed hikey */
+    itemsz = IndexTupleDSize(hikey->bti_itup)
+       + (sizeof(BTItemData) - sizeof(IndexTupleData));
+    itemsz = DOUBLEALIGN(itemsz);
+    (void) PageAddItem(page, (Item) hikey, itemsz, P_HIKEY, LP_USED);
+    pfree (hikey);
+
+    /* add btitem */
+    itemsz = IndexTupleDSize(btitem->bti_itup)
+       + (sizeof(BTItemData) - sizeof(IndexTupleData));
+    itemsz = DOUBLEALIGN(itemsz);
+    (void) PageAddItem(page, (Item) btitem, itemsz, P_FIRSTKEY, LP_USED);
+    pfree (btitem);
+    nitem = (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
+    btitem = _bt_formitem(&(nitem->bti_itup));
+    ItemPointerSet(&(btitem->bti_itup.t_tid), bknum, P_HIKEY);
+    
+    /* ok, write them out */
+    _bt_wrtnorelbuf(rel, nbuf);
+    _bt_wrtnorelbuf(rel, buf);
+
+    /* fix btpo_prev on right sibling of old page */
+    if ( !P_RIGHTMOST (npageop) )
+    {
+       rbuf = _bt_getbuf(rel, npageop->btpo_next, BT_WRITE);
+       rpage = BufferGetPage(rbuf);
+       rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
+       rpageop->btpo_prev = nbknum;
+       _bt_wrtbuf(rel, rbuf);
+    }
+
+    /* get parent pointing to the old page */
+    ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), 
+                              bknum, P_HIKEY);
+    pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
+    ppage = BufferGetPage(pbuf);
+    ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
+
+    _bt_relbuf(rel, nbuf, BT_WRITE);
+    _bt_relbuf(rel, buf, BT_WRITE);
+    
+    /* re-set parent' pointer - we shifted our page to the right ! */
+    nitem = (BTItem) PageGetItem (ppage, 
+                               PageGetItemId (ppage, stack->bts_offset));
+    ItemPointerSet(&(nitem->bti_itup.t_tid), nbknum, P_HIKEY);
+    ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), nbknum, P_HIKEY);
+    _bt_wrtnorelbuf(rel, pbuf);
+
+    /* 
+     * Now we want insert into the parent pointer to our old page. It has to
+     * be inserted before the pointer to new page. You may get problems here
+     * (in the _bt_goesonpg and/or _bt_pgaddtup), but may be not - I don't 
+     * know. It works if old page is leftmost (nitem is NULL) and 
+     * btitem < hikey and it's all what we need currently. - vadim 05/30/97
+     */
+    nitem = NULL;
+    afteroff = P_FIRSTKEY;
+    if ( !P_RIGHTMOST (ppageop) )
+       afteroff = OffsetNumberNext (afteroff);
+    if ( stack->bts_offset >= afteroff )
+    {
+       afteroff = OffsetNumberPrev (stack->bts_offset);
+       nitem = (BTItem) PageGetItem (ppage, PageGetItemId (ppage, afteroff));
+       nitem = _bt_formitem(&(nitem->bti_itup));
+    }
+    res = _bt_insertonpg(rel, pbuf, stack->bts_parent,
+                                   keysz, scankey, btitem, nitem);
+    pfree (btitem);
+               
+    ItemPointerSet(&(res->pointerData), nbknum, P_HIKEY);
+    
+    return (res);
+}
index 440a118..93b420f 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.7 1997/04/16 01:48:15 vadim Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.8 1997/05/30 18:35:33 vadim Exp $
  *
  *  NOTES
  *     Postgres btree pages look like ordinary relation pages.  The opaque
@@ -441,6 +441,9 @@ _bt_metaproot(Relation rel, BlockNumber rootbknum, int level)
  *     This is possible because we save a bit image of the last item
  *     we looked at in the parent, and the update algorithm guarantees
  *     that if items above us in the tree move, they only move right.
+ *
+ *     Also, re-set bts_blkno & bts_offset if changed and 
+ *     bts_btitem (it may be changed - see _bt_insertonpg).
  */
 Buffer
 _bt_getstackbuf(Relation rel, BTStack stack, int access)
@@ -453,6 +456,8 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
     ItemId itemid;
     BTItem item;
     BTPageOpaque opaque;
+    BTItem item_save;
+    int item_nbytes;
     
     blkno = stack->bts_blkno;
     buf = _bt_getbuf(rel, blkno, access);
@@ -466,7 +471,14 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
        
        /* if the item is where we left it, we're done */
        if ( BTItemSame (item, stack->bts_btitem) )
+       {
+           pfree(stack->bts_btitem);
+           item_nbytes = ItemIdGetLength(itemid);
+           item_save = (BTItem) palloc(item_nbytes);
+           memmove((char *) item_save, (char *) item, item_nbytes);
+           stack->bts_btitem = item_save;
            return (buf);
+       }
        
        /* if the item has just moved right on this page, we're done */
        for (i = OffsetNumberNext(stack->bts_offset);
@@ -477,7 +489,15 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
            
            /* if the item is where we left it, we're done */
            if ( BTItemSame (item, stack->bts_btitem) )
+           {
+               stack->bts_offset = i;
+               pfree(stack->bts_btitem);
+               item_nbytes = ItemIdGetLength(itemid);
+               item_save = (BTItem) palloc(item_nbytes);
+               memmove((char *) item_save, (char *) item, item_nbytes);
+               stack->bts_btitem = item_save;
                return (buf);
+           }
        }
     }
     
@@ -503,7 +523,16 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
            itemid = PageGetItemId(page, offnum);
            item = (BTItem) PageGetItem(page, itemid);
            if ( BTItemSame (item, stack->bts_btitem) )
+           {
+               stack->bts_offset = offnum;
+               stack->bts_blkno = blkno;
+               pfree(stack->bts_btitem);
+               item_nbytes = ItemIdGetLength(itemid);
+               item_save = (BTItem) palloc(item_nbytes);
+               memmove((char *) item_save, (char *) item, item_nbytes);
+               stack->bts_btitem = item_save;
                return (buf);
+           }
        }
     }
 }
index fec7392..45f4f76 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.19 1997/05/05 03:41:19 vadim Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.20 1997/05/30 18:35:37 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -158,9 +158,7 @@ _bt_moveright(Relation rel,
     Page page;
     BTPageOpaque opaque;
     ItemId hikey;
-    ItemId itemid;
     BlockNumber rblkno;
-    int natts = rel->rd_rel->relnatts;
         
     page = BufferGetPage(buf);
     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -184,7 +182,7 @@ _bt_moveright(Relation rel,
        /* move right as long as we need to */
        do
        {
-           OffsetNumber offmax;
+           OffsetNumber offmax = PageGetMaxOffsetNumber(page);
            /*
             *  If this page consists of all duplicate keys (hikey and first
             *  key on the page have the same value), then we don't need to
@@ -200,22 +198,43 @@ _bt_moveright(Relation rel,
             *  our scankey is x = 2. Scankey >= (2,1) because of 
             *  we compare first attrs only, but we shouldn't to move
             *  right of here.          - vadim 04/15/97
+            *
+            *  XXX
+            *  This code changed again! Actually, we break our
+            *  duplicates handling in single case: if we insert 
+            *  new minimum key into leftmost page with duplicates
+            *  and splitting doesn't occure then _bt_insertonpg doesn't 
+            *  worry about duplicates-rule. Fix _bt_insertonpg ?
+            *  But I don't see why don't compare scankey with _last_
+            *  item on the page instead of first one, in any cases.
+            *  So - we do it in that way now.  - vadim 05/26/97
+            *
+            *  Also, if we are on an "pseudo-empty" leaf page (i.e. there is
+            *  only hikey here) and scankey == hikey then we don't move
+            *  right! It's fix for bug described in _bt_insertonpg(). It's 
+            *  right - at least till index cleanups are perfomed by vacuum 
+            *  in exclusive mode: so, though this page may be just splitted, 
+            *  it may not be "emptied" before we got here. - vadim 05/27/97
             */
-           if ( (offmax = PageGetMaxOffsetNumber(page)) > P_HIKEY)
+           
+           if ( _bt_skeycmp (rel, keysz, scankey, page, hikey, 
+                                       BTEqualStrategyNumber) )
            {
-               itemid = PageGetItemId(page, P_FIRSTKEY);
-               if (_bt_skeycmp(rel, keysz, scankey, page, itemid,
-                               BTEqualStrategyNumber)) {
-                   /* break is for the "move right" while loop */
-                   break;
-               }
-               else if ( natts > keysz )
-               {
-                   itemid = PageGetItemId(page, offmax);
-                   if (_bt_skeycmp(rel, keysz, scankey, page, itemid,
-                               BTLessEqualStrategyNumber))
+               if ( opaque->btpo_flags & BTP_CHAIN )
+               {
+                   Assert ( ( opaque->btpo_flags & BTP_LEAF ) || offmax > P_HIKEY );
+                   break;
+               }
+               if ( offmax > P_HIKEY )
+               {
+                   if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                               PageGetItemId (page, offmax),
+                               BTLessEqualStrategyNumber) )
                        break;
-               }
+               }
+               else if ( offmax == P_HIKEY && 
+                       ( opaque->btpo_flags & BTP_LEAF ) )
+                   break;
            }
            
            /* step right one page */
@@ -371,27 +390,37 @@ _bt_binsrch(Relation rel,
     int natts = rel->rd_rel->relnatts;
     int result;
     
+    itupdesc = RelationGetTupleDescriptor(rel);
     page = BufferGetPage(buf);
     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
     
-    /* by convention, item 0 on any non-rightmost page is the high key */
+    /* by convention, item 1 on any non-rightmost page is the high key */
     low = mid = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY;
     
     high = PageGetMaxOffsetNumber(page);
     
     /*
-     *  Since for non-rightmost pages, the zeroeth item on the page is the
+     *  Since for non-rightmost pages, the first item on the page is the
      *  high key, there are two notions of emptiness.  One is if nothing
      *  appears on the page.  The other is if nothing but the high key does.
      *  The reason we test high <= low, rather than high == low, is that
      *  after vacuuming there may be nothing *but* the high key on a page.
-     *  In that case, given the scheme above, low = 1 and high = 0.
+     *  In that case, given the scheme above, low = 2 and high = 1.
      */
     
-    if (PageIsEmpty(page) || (! P_RIGHTMOST(opaque) && high <= low))
+    if ( PageIsEmpty (page) )
        return (low);
-    
-    itupdesc = RelationGetTupleDescriptor(rel);
+    if ( (! P_RIGHTMOST(opaque) && high <= low))
+    {
+       if ( high < low || 
+               (srchtype == BT_DESCENT && !(opaque->btpo_flags & BTP_LEAF)) )
+           return (low);
+       /* It's insertion and high == low == 2 */
+       result = _bt_compare(rel, itupdesc, page, keysz, scankey, low);
+       if ( result > 0 )
+           return ( OffsetNumberNext (low) );
+       return (low);
+    }
     
     while ((high - low) > 1) {
        mid = low + ((high - low) / 2);
@@ -736,6 +765,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
     TupleDesc itupdesc;
     Buffer buf;
     Page page;
+    BTPageOpaque pop;
     BTStack stack;
     OffsetNumber offnum, maxoff;
     bool offGmax = false;
@@ -803,11 +833,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
     
     stack = _bt_search(rel, 1, &skdata, &buf);
     _bt_freestack(stack);
-    
-    /* find the nearest match to the manufactured scan key on the page */
-    offnum = _bt_binsrch(rel, buf, 1, &skdata, BT_DESCENT);
+
+    blkno = BufferGetBlockNumber(buf);
     page = BufferGetPage(buf);
-    
+
     /*
      *  This will happen if the tree we're searching is entirely empty,
      *  or if we're doing a search for a key that would appear on an
@@ -821,8 +850,39 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        _bt_relbuf(rel, buf, BT_READ);
        return ((RetrieveIndexResult) NULL);
     }
-    
     maxoff = PageGetMaxOffsetNumber(page);
+    pop = (BTPageOpaque) PageGetSpecialPointer(page);
+    
+    /*
+     * Now _bt_moveright doesn't move from non-rightmost leaf page
+     * if scankey == hikey and there is only hikey there. It's
+     * good for insertion, but we need to do work for scan here.
+     *                 - vadim 05/27/97
+     */
+    
+    while ( maxoff == P_HIKEY && !P_RIGHTMOST(pop) &&
+               _bt_skeycmp(rel, 1, &skdata, page, 
+                       PageGetItemId(page, P_HIKEY),
+                       BTGreaterEqualStrategyNumber) )
+    {
+       /* step right one page */
+       blkno = pop->btpo_next;
+       _bt_relbuf(rel, buf, BT_READ);
+       buf = _bt_getbuf(rel, blkno, BT_READ);
+       page = BufferGetPage(buf);
+       if (PageIsEmpty(page)) {
+           ItemPointerSetInvalid(current);
+           so->btso_curbuf = InvalidBuffer;
+           _bt_relbuf(rel, buf, BT_READ);
+           return ((RetrieveIndexResult) NULL);
+       }
+       maxoff = PageGetMaxOffsetNumber(page);
+       pop = (BTPageOpaque) PageGetSpecialPointer(page);
+    }
+
+
+    /* find the nearest match to the manufactured scan key on the page */
+    offnum = _bt_binsrch(rel, buf, 1, &skdata, BT_DESCENT);
     
     if (offnum > maxoff)
     {
@@ -830,7 +890,6 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        offGmax = true;
     }
     
-    blkno = BufferGetBlockNumber(buf);
     ItemPointerSet(current, blkno, offnum);
     
     /*
@@ -889,7 +948,32 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        break;
        
     case BTGreaterEqualStrategyNumber:
-       if (result < 0) {
+       if ( offGmax )
+       {
+           if (result < 0)
+           {
+               Assert ( !P_RIGHTMOST(pop) && maxoff == P_HIKEY );
+               if ( !_bt_step(scan, &buf, ForwardScanDirection) )
+               {
+                   _bt_relbuf(scan->relation, buf, BT_READ);
+                   so->btso_curbuf = InvalidBuffer;
+                   ItemPointerSetInvalid(&(scan->currentItemData));
+                   return ((RetrieveIndexResult) NULL);
+               }
+           }
+           else if (result > 0)
+           {   /*
+                * Just remember:  _bt_binsrch() returns the OffsetNumber of 
+                * the first matching key on the page, or the OffsetNumber at 
+                * which the matching key WOULD APPEAR IF IT WERE on this page.
+                * No key on this page, but offnum from _bt_binsrch() greater
+                * maxoff - have to move right. - vadim 12/06/96
+                */
+               (void) _bt_twostep(scan, &buf, ForwardScanDirection);
+           }
+       }
+       else if (result < 0)
+       {
            do {
                if (!_bt_twostep(scan, &buf, BackwardScanDirection))
                    break;
@@ -902,16 +986,6 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
            if (result > 0)
                (void) _bt_twostep(scan, &buf, ForwardScanDirection);
        }
-       else if ( offGmax && result > 0 )
-       {   /*
-            * Just remember:  _bt_binsrch() returns the OffsetNumber of 
-            * the first matching key on the page, or the OffsetNumber at 
-            * which the matching key WOULD APPEAR IF IT WERE on this page.
-            * No key on this page, but offnum from _bt_binsrch() greater
-            * maxoff - have to move right. - vadim 12/06/96
-            */
-           (void) _bt_twostep(scan, &buf, ForwardScanDirection);
-       }
        break;
        
     case BTGreaterStrategyNumber:
index 978e73d..db298d5 100644 (file)
@@ -5,7 +5,7 @@
  *
  *
  * IDENTIFICATION
- *    $Id: nbtsort.c,v 1.15 1997/04/18 03:37:57 vadim Exp $
+ *    $Id: nbtsort.c,v 1.16 1997/05/30 18:35:40 vadim Exp $
  *
  * NOTES
  *
@@ -983,6 +983,12 @@ _bt_buildadd(Relation index, void *pstate, BTItem bti, int flags)
            oopaque->btpo_next = BufferGetBlockNumber(nbuf);
            nopaque->btpo_prev = BufferGetBlockNumber(obuf);
            nopaque->btpo_next = P_NONE;
+
+           if ( _bt_itemcmp(index, _bt_nattr, 
+                  (BTItem) PageGetItem(opage, PageGetItemId(opage, P_HIKEY)), 
+                  (BTItem) PageGetItem(opage, PageGetItemId(opage, P_FIRSTKEY)), 
+                       BTEqualStrategyNumber) )
+               oopaque->btpo_flags |= BTP_CHAIN;
        }
 
        /*