OSDN Git Service

Simplify gistSplit() and some refactoring related code.
authorTeodor Sigaev <teodor@sigaev.ru>
Fri, 19 May 2006 16:15:17 +0000 (16:15 +0000)
committerTeodor Sigaev <teodor@sigaev.ru>
Fri, 19 May 2006 16:15:17 +0000 (16:15 +0000)
src/backend/access/gist/gist.c
src/backend/access/gist/gistutil.c
src/backend/access/gist/gistxlog.c
src/include/access/gist_private.h

index d207b7e..cb10cbc 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.135 2006/05/17 16:34:59 teodor Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.136 2006/05/19 16:15:17 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -936,31 +936,6 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
                gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
 }
 
-static void
-gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset)
-{
-       int                     i;
-
-       for (i = 0; i < len; i++)
-               arr[i] = reasloffset[arr[i]];
-}
-
-static IndexTupleData *
-gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
-       char *ptr, *ret = palloc(BLCKSZ);
-       int i;
-
-       ptr = ret;
-       for (i = 0; i < veclen; i++) {
-               memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
-               ptr += IndexTupleSize(vec[i]);
-       }
-
-       *memlen = ptr - ret;
-       Assert( *memlen < BLCKSZ );
-       return (IndexTupleData*)ret;
-}
-
 /*
  *     gistSplit -- split a page in the tree.
  */
@@ -975,100 +950,70 @@ gistSplit(Relation r,
                           *rvectup;
        GIST_SPLITVEC v;
        GistEntryVector *entryvec;
-       int                     i,
-                               fakeoffset;
-       OffsetNumber *realoffset;
-       IndexTuple *cleaneditup = itup;
-       int                     lencleaneditup = len;
+       int                     i;
+       OffsetNumber offInvTuples[ MaxOffsetNumber ];
+       int                      nOffInvTuples = 0;
        SplitedPageLayout       *res = NULL;
 
        /* generate the item array */
-       realoffset = palloc((len + 1) * sizeof(OffsetNumber));
        entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
        entryvec->n = len + 1;
 
-       fakeoffset = FirstOffsetNumber;
        for (i = 1; i <= len; i++)
        {
                Datum           datum;
                bool            IsNull;
 
                if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
-               {
-                       entryvec->n--;
                        /* remember position of invalid tuple */
-                       realoffset[entryvec->n] = i;
-                       continue;
-               }
+                       offInvTuples[ nOffInvTuples++ ] = i;                    
+
+               if ( nOffInvTuples > 0 )
+                       /* we can safely do not decompress other keys, because 
+                          we will do splecial processing, but
+                          it's needed to find another invalid tuples */
+                       continue;       
 
                datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
-               gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]),
+               gistdentryinit(giststate, 0, &(entryvec->vector[i]),
                                           datum, r, page, i,
                                           ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
                                           FALSE, IsNull);
-               realoffset[fakeoffset] = i;
-               fakeoffset++;
        }
 
        /*
-        * if it was invalid tuple then we need special processing. If it's
-        * possible, we move all invalid tuples on right page. We should remember,
-        * that union with invalid tuples is a invalid tuple.
+        * if it was invalid tuple then we need special processing.
+        * We move all invalid tuples on right page. 
+        *
+        * if there is no place on left page, gistSplit will be called one more 
+        * time for left page.
+        *
+        * Normally, we never exec this code, but after crash replay it's possible
+        * to get 'invalid' tuples (probability is low enough)
         */
-       if (entryvec->n != len + 1)
+       if (nOffInvTuples > 0)
        {
-               lencleaneditup = entryvec->n - 1;
-               cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple));
-               for (i = 1; i < entryvec->n; i++)
-                       cleaneditup[i - 1] = itup[realoffset[i] - 1];
-
-               if (!gistfitpage(cleaneditup, lencleaneditup))
-               {
-                       /* no space on left to put all good tuples, so picksplit */
-                       gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
-                       v.spl_leftvalid = true;
-                       v.spl_rightvalid = false;
-                       gistToRealOffset(v.spl_left, v.spl_nleft, realoffset);
-                       gistToRealOffset(v.spl_right, v.spl_nright, realoffset);
-               }
-               else
-               {
-                       /* we can try to store all valid tuples on one page */
-                       v.spl_right = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
-                       v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
-
-                       if (lencleaneditup == 0)
-                       {
-                               /* all tuples are invalid, so moves half of its to right */
-                               v.spl_leftvalid = v.spl_rightvalid = false;
-                               v.spl_nright = 0;
-                               v.spl_nleft = 0;
-                               for (i = 1; i <= len; i++)
-                                       if (i - 1 < len / 2)
-                                               v.spl_left[v.spl_nleft++] = i;
-                                       else
-                                               v.spl_right[v.spl_nright++] = i;
-                       }
-                       else
-                       {
-                               /*
-                                * we will not call gistUserPicksplit, just put good tuples on
-                                * left and invalid on right
-                                */
-                               v.spl_nleft = lencleaneditup;
-                               v.spl_nright = 0;
-                               for (i = 1; i < entryvec->n; i++)
-                                       v.spl_left[i - 1] = i;
-                               gistToRealOffset(v.spl_left, v.spl_nleft, realoffset);
-                               v.spl_lattr[0] = v.spl_ldatum = (Datum) 0;
-                               v.spl_rattr[0] = v.spl_rdatum = (Datum) 0;
-                               v.spl_lisnull[0] = true;
-                               v.spl_risnull[0] = true;
-                               gistunionsubkey(r, giststate, itup, &v, true);
-                               v.spl_leftvalid = true;
-                               v.spl_rightvalid = false;
-                       }
-               }
+               GistSplitVec    gsvp;
+                               
+               v.spl_right = offInvTuples;
+               v.spl_nright = nOffInvTuples;
+               v.spl_rightvalid = false;
+
+               v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
+               v.spl_nleft = 0;
+               for(i = 1; i <= len; i++) 
+                       if ( !GistTupleIsInvalid(itup[i - 1]) )
+                               v.spl_left[ v.spl_nleft++ ] = i;
+               v.spl_leftvalid = true;
+               
+               gsvp.idgrp = NULL;
+               gsvp.attrsize = v.spl_lattrsize;
+               gsvp.attr = v.spl_lattr;
+               gsvp.len = v.spl_nleft;
+               gsvp.entries = v.spl_left;
+               gsvp.isnull = v.spl_lisnull;
+
+               gistunionsubkeyvec(giststate, itup, &gsvp, true);
        }
        else
        {
@@ -1088,12 +1033,6 @@ gistSplit(Relation r,
        for (i = 0; i < v.spl_nright; i++)
                rvectup[i] = itup[v.spl_right[i] - 1];
 
-       /* place invalid tuples on right page if itsn't done yet */
-       for (fakeoffset = entryvec->n; fakeoffset < len + 1 && lencleaneditup; fakeoffset++)
-       {
-               rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
-       }
-
        /* finalyze splitting (may need another split) */
        if (!gistfitpage(rvectup, v.spl_nright))
        {
index ca5a9d6..92798a2 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *                     $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.12 2006/05/17 16:34:59 teodor Exp $
+ *                     $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.13 2006/05/19 16:15:17 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -140,6 +140,30 @@ gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
 }
 
 /*
+ * make plain IndexTupleVector
+ */
+
+IndexTupleData *
+gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
+       char *ptr, *ret;
+       int i;
+
+       *memlen=0;
+                                       
+       for (i = 0; i < veclen; i++)
+               *memlen += IndexTupleSize(vec[i]);
+
+       ptr = ret = palloc(*memlen);
+
+       for (i = 0; i < veclen; i++) { 
+               memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
+               ptr += IndexTupleSize(vec[i]);
+       }
+
+       return (IndexTupleData*)ret;
+}
+
+/*
  * Return an IndexTuple containing the result of applying the "union"
  * method to the specified IndexTuple vector.
  */
@@ -313,101 +337,102 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
        return newtup;
 }
 
-void
-gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall)
-{
-       int                     lr;
+void 
+gistunionsubkeyvec(GISTSTATE *giststate,  IndexTuple *itvec, 
+                                                       GistSplitVec *gsvp, bool isall) {
+       int                     i;
+       GistEntryVector *evec;
 
-       for (lr = 0; lr < 2; lr++)
-       {
-               OffsetNumber *entries;
-               int                     i;
-               Datum      *attr;
-               int                     len,
-                                  *attrsize;
-               bool       *isnull;
-               GistEntryVector *evec;
-
-               if (lr)
-               {
-                       attrsize = spl->spl_lattrsize;
-                       attr = spl->spl_lattr;
-                       len = spl->spl_nleft;
-                       entries = spl->spl_left;
-                       isnull = spl->spl_lisnull;
-               }
-               else
-               {
-                       attrsize = spl->spl_rattrsize;
-                       attr = spl->spl_rattr;
-                       len = spl->spl_nright;
-                       entries = spl->spl_right;
-                       isnull = spl->spl_risnull;
-               }
+       evec = palloc(((gsvp->len < 2) ? 2 : gsvp->len) * sizeof(GISTENTRY) + GEVHDRSZ);
 
-               evec = palloc(((len < 2) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+       for (i = (isall) ? 0 : 1; i < giststate->tupdesc->natts; i++)
+       {
+               int                     j;
+               Datum           datum;
+               int                     datumsize;
+               int                     real_len;
 
-               for (i = (isall) ? 0 : 1; i < r->rd_att->natts; i++)
+               real_len = 0;
+               for (j = 0; j < gsvp->len; j++)
                {
-                       int                     j;
-                       Datum           datum;
-                       int                     datumsize;
-                       int                     real_len;
+                       bool            IsNull;
 
-                       real_len = 0;
-                       for (j = 0; j < len; j++)
-                       {
-                               bool            IsNull;
+                       if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[j]])
+                               continue;
 
-                               if (spl->spl_idgrp[entries[j]])
-                                       continue;
-                               datum = index_getattr(itvec[entries[j] - 1], i + 1,
+                       datum = index_getattr(itvec[gsvp->entries[j] - 1], i + 1,
                                                                          giststate->tupdesc, &IsNull);
-                               if (IsNull)
-                                       continue;
-                               gistdentryinit(giststate, i,
-                                                          &(evec->vector[real_len]),
-                                                          datum,
-                                                          NULL, NULL, (OffsetNumber) 0,
+                       if (IsNull)
+                               continue;
+                       gistdentryinit(giststate, i,
+                                                  &(evec->vector[real_len]),
+                                                  datum,
+                                                  NULL, NULL, (OffsetNumber) 0,
                                                   ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
-                                                          FALSE, IsNull);
-                               real_len++;
+                                                  FALSE, IsNull);
+                       real_len++;
 
-                       }
+               }
 
-                       if (real_len == 0)
+               if (real_len == 0)
+               {
+                       datum = (Datum) 0;
+                       datumsize = 0;
+                       gsvp->isnull[i] = true;
+               }
+               else
+               {
+                       /*
+                        * evec->vector[0].bytes may be not defined, so form union
+                        * with itself
+                        */
+                       if (real_len == 1)
                        {
-                               datum = (Datum) 0;
-                               datumsize = 0;
-                               isnull[i] = true;
+                               evec->n = 2;
+                               memcpy(&(evec->vector[1]), &(evec->vector[0]),
+                                          sizeof(GISTENTRY));
                        }
                        else
-                       {
-                               /*
-                                * evec->vector[0].bytes may be not defined, so form union
-                                * with itself
-                                */
-                               if (real_len == 1)
-                               {
-                                       evec->n = 2;
-                                       memcpy(&(evec->vector[1]), &(evec->vector[0]),
-                                                  sizeof(GISTENTRY));
-                               }
-                               else
-                                       evec->n = real_len;
-                               datum = FunctionCall2(&giststate->unionFn[i],
-                                                                         PointerGetDatum(evec),
-                                                                         PointerGetDatum(&datumsize));
-                               isnull[i] = false;
-                       }
-
-                       attr[i] = datum;
-                       attrsize[i] = datumsize;
+                               evec->n = real_len;
+                       datum = FunctionCall2(&giststate->unionFn[i],
+                                                                 PointerGetDatum(evec),
+                                                                 PointerGetDatum(&datumsize));
+                       gsvp->isnull[i] = false;
                }
+
+               gsvp->attr[i] = datum;
+               gsvp->attrsize[i] = datumsize;
        }
 }
 
 /*
+ * unions subkey for after user picksplit over first column
+ */
+static void
+gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
+{
+       GistSplitVec    gsvp;
+
+       gsvp.idgrp = spl->spl_idgrp;
+
+       gsvp.attrsize = spl->spl_lattrsize;
+       gsvp.attr = spl->spl_lattr;
+       gsvp.len = spl->spl_nleft;
+       gsvp.entries = spl->spl_left;
+       gsvp.isnull = spl->spl_lisnull;
+
+       gistunionsubkeyvec(giststate, itvec, &gsvp, false);
+
+       gsvp.attrsize = spl->spl_rattrsize;
+       gsvp.attr = spl->spl_rattr;
+       gsvp.len = spl->spl_nright;
+       gsvp.entries = spl->spl_right;
+       gsvp.isnull = spl->spl_risnull;
+
+       gistunionsubkeyvec(giststate, itvec, &gsvp, false);
+}
+
+/*
  * find group in vector with equal value
  */
 int
@@ -840,7 +865,7 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
         * if index is multikey, then we must to try get smaller bounding box for
         * subkey(s)
         */
-       if (r->rd_att->natts > 1)
+       if (giststate->tupdesc->natts > 1)
        {
                int                     MaxGrpId;
 
@@ -851,7 +876,7 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
                MaxGrpId = gistfindgroup(giststate, entryvec->vector, v);
 
                /* form union of sub keys for each page (l,p) */
-               gistunionsubkey(r, giststate, itup, v, false);
+               gistunionsubkey(giststate, itup, v);
 
                /*
                 * if possible, we insert equivalent tuples with control by penalty
index 1126727..aef2056 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *                      $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.18 2006/05/19 11:10:25 teodor Exp $
+ *                      $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.19 2006/05/19 16:15:17 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -557,28 +557,16 @@ gistMakePageLayout(Buffer *buffers, int nbuffers) {
 
        while( nbuffers-- > 0 ) {
                Page page = BufferGetPage( buffers[ nbuffers ] );
-               IndexTuple      idxtup;
-               OffsetNumber    i;
-               char *ptr;
+               IndexTuple*     vec;
+               int     veclen;
 
                resptr = (SplitedPageLayout*)palloc0( sizeof(SplitedPageLayout) );
 
                resptr->block.blkno = BufferGetBlockNumber( buffers[ nbuffers ] );
                resptr->block.num = PageGetMaxOffsetNumber( page );
 
-               for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
-                       idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
-                       resptr->lenlist += IndexTupleSize(idxtup);
-               }
-
-               resptr->list = (IndexTupleData*)palloc( resptr->lenlist );
-               ptr = (char*)(resptr->list);
-
-               for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
-                       idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
-                       memcpy( ptr, idxtup, IndexTupleSize(idxtup) );
-                       ptr += IndexTupleSize(idxtup);
-               }
+               vec = gistextractpage( page, &veclen ); 
+               resptr->list = gistfillitupvec( vec, veclen, &(resptr->lenlist) );
 
                resptr->next = res;
                res = resptr;
index a866277..f08d49d 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.14 2006/05/17 16:34:59 teodor Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.15 2006/05/19 16:15:17 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -283,6 +283,8 @@ extern IndexTuple *gistextractpage(Page page, int *len /* out */ );
 extern IndexTuple *gistjoinvector(
                           IndexTuple *itvec, int *len,
                           IndexTuple *additvec, int addlen);
+extern IndexTupleData* gistfillitupvec(IndexTuple *vec, int veclen, int *memlen);
+
 extern IndexTuple gistunion(Relation r, IndexTuple *itvec,
                  int len, GISTSTATE *giststate);
 extern IndexTuple gistgetadjusted(Relation r,
@@ -308,8 +310,19 @@ extern void gistcentryinit(GISTSTATE *giststate, int nkey,
 extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
                                  IndexTuple tuple, Page p, OffsetNumber o,
                                  GISTENTRY *attdata, bool *isnull);
-extern void gistunionsubkey(Relation r, GISTSTATE *giststate,
-                               IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall);
+
+typedef struct {
+       int             *attrsize;
+       Datum   *attr;
+       int             len;
+       OffsetNumber *entries;
+       bool    *isnull;
+       int             *idgrp;
+} GistSplitVec;
+
+extern void gistunionsubkeyvec(GISTSTATE *giststate, 
+       IndexTuple *itvec, GistSplitVec *gsvp,  bool isall);
+
 extern void GISTInitBuffer(Buffer b, uint32 f);
 extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
                           Datum k, Relation r, Page pg, OffsetNumber o,