OSDN Git Service

Fix 'all at one page bug' in picksplit method of R-tree emulation. Add defense
authorTeodor Sigaev <teodor@sigaev.ru>
Mon, 6 Apr 2009 14:27:27 +0000 (14:27 +0000)
committerTeodor Sigaev <teodor@sigaev.ru>
Mon, 6 Apr 2009 14:27:27 +0000 (14:27 +0000)
from buggy user-defined picksplit to GiST.

src/backend/access/gist/gistproc.c
src/backend/access/gist/gistsplit.c

index 39a597a..396b93a 100644 (file)
@@ -10,7 +10,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *     $PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.15 2009/01/01 17:23:35 momjian Exp $
+ *     $PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.16 2009/04/06 14:27:27 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -273,6 +273,69 @@ chooseLR(GIST_SPLITVEC *v,
 }
 
 /*
+ * Trivial split: half of entries will be placed on one page
+ * and another half - to another
+ */
+static void
+fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v)
+{
+       OffsetNumber    i, 
+                                       maxoff;
+       BOX                        *unionL = NULL,
+                                  *unionR = NULL;
+       int                             nbytes;
+
+       maxoff = entryvec->n - 1;
+
+       nbytes = (maxoff + 2) * sizeof(OffsetNumber);
+       v->spl_left = (OffsetNumber *) palloc(nbytes);
+       v->spl_right = (OffsetNumber *) palloc(nbytes);
+       v->spl_nleft = v->spl_nright = 0;
+
+       for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+       {
+               BOX * cur = DatumGetBoxP(entryvec->vector[i].key);
+
+               if (i <= (maxoff - FirstOffsetNumber + 1) / 2)
+               {
+                       v->spl_left[v->spl_nleft] = i;
+                       if (unionL == NULL)
+                       {
+                               unionL = (BOX *) palloc(sizeof(BOX));
+                               *unionL = *cur;
+                       }
+                       else
+                               adjustBox(unionL, cur);
+
+                       v->spl_nleft++;
+               }
+               else
+               {
+                       v->spl_right[v->spl_nright] = i;
+                       if (unionR == NULL)
+                       {
+                               unionR = (BOX *) palloc(sizeof(BOX));
+                               *unionR = *cur;
+                       }
+                       else
+                               adjustBox(unionR, cur);
+
+                       v->spl_nright++;
+               }
+       }
+
+       if (v->spl_ldatum_exists)
+               adjustBox(unionL, DatumGetBoxP(v->spl_ldatum));
+       v->spl_ldatum = BoxPGetDatum(unionL);
+
+       if (v->spl_rdatum_exists)
+               adjustBox(unionR, DatumGetBoxP(v->spl_rdatum));
+       v->spl_rdatum = BoxPGetDatum(unionR);
+
+       v->spl_ldatum_exists = v->spl_rdatum_exists = false;
+}
+
+/*
  * The GiST PickSplit method
  *
  * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree',
@@ -324,52 +387,22 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
                adjustBox(&pageunion, cur);
        }
 
-       nbytes = (maxoff + 2) * sizeof(OffsetNumber);
-       listL = (OffsetNumber *) palloc(nbytes);
-       listR = (OffsetNumber *) palloc(nbytes);
-       unionL = (BOX *) palloc(sizeof(BOX));
-       unionR = (BOX *) palloc(sizeof(BOX));
        if (allisequal)
        {
-               cur = DatumGetBoxP(entryvec->vector[OffsetNumberNext(FirstOffsetNumber)].key);
-               if (memcmp((void *) cur, (void *) &pageunion, sizeof(BOX)) == 0)
-               {
-                       v->spl_left = listL;
-                       v->spl_right = listR;
-                       v->spl_nleft = v->spl_nright = 0;
-                       memcpy((void *) unionL, (void *) &pageunion, sizeof(BOX));
-                       memcpy((void *) unionR, (void *) &pageunion, sizeof(BOX));
-
-                       for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-                       {
-                               if (i <= (maxoff - FirstOffsetNumber + 1) / 2)
-                               {
-                                       v->spl_left[v->spl_nleft] = i;
-                                       v->spl_nleft++;
-                               }
-                               else
-                               {
-                                       v->spl_right[v->spl_nright] = i;
-                                       v->spl_nright++;
-                               }
-                       }
-
-                       if (v->spl_ldatum_exists)
-                               adjustBox(unionL, DatumGetBoxP(v->spl_ldatum));
-                       v->spl_ldatum = BoxPGetDatum(unionL);
-
-                       if (v->spl_rdatum_exists)
-                               adjustBox(unionR, DatumGetBoxP(v->spl_rdatum));
-                       v->spl_rdatum = BoxPGetDatum(unionR);
-
-                       v->spl_ldatum_exists = v->spl_rdatum_exists = false;
-
-                       PG_RETURN_POINTER(v);
-               }
+               /*
+                * All entries are the same
+                */
+               fallbackSplit(entryvec, v);
+               PG_RETURN_POINTER(v);
        }
 
+       nbytes = (maxoff + 2) * sizeof(OffsetNumber);
+       listL = (OffsetNumber *) palloc(nbytes);
+       listR = (OffsetNumber *) palloc(nbytes);
        listB = (OffsetNumber *) palloc(nbytes);
        listT = (OffsetNumber *) palloc(nbytes);
+       unionL = (BOX *) palloc(sizeof(BOX));
+       unionR = (BOX *) palloc(sizeof(BOX));
        unionB = (BOX *) palloc(sizeof(BOX));
        unionT = (BOX *) palloc(sizeof(BOX));
 
@@ -452,6 +485,12 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
                        else
                                ADDLIST(listT, unionT, posT, i);
                }
+
+               if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
+               {
+                       fallbackSplit(entryvec, v);
+                       PG_RETURN_POINTER(v);
+               }
        }
 
        /* which split more optimal? */
index f707c72..f969e1e 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gistsplit.c,v 1.7 2009/01/01 17:23:35 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gistsplit.c,v 1.8 2009/04/06 14:27:27 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -281,6 +281,63 @@ supportSecondarySplit(Relation r, GISTSTATE *giststate, int attno, GIST_SPLITVEC
 }
 
 /*
+ * Trivial picksplit implementaion. Function called only 
+ * if user-defined picksplit puts all keys to the one page.
+ * That is a bug of user-defined picksplit but we'd like
+ * to "fix" that.
+ */
+static void
+genericPickSplit(GISTSTATE *giststate, GistEntryVector *entryvec, GIST_SPLITVEC *v, int attno)
+{
+       OffsetNumber     i,
+                                        maxoff;
+       int                              nbytes;
+       GistEntryVector *evec;
+
+       maxoff = entryvec->n - 1;
+
+       nbytes = (maxoff + 2) * sizeof(OffsetNumber);
+
+       v->spl_left = (OffsetNumber *) palloc(nbytes);
+       v->spl_right = (OffsetNumber *) palloc(nbytes);
+       v->spl_nleft = v->spl_nright = 0;
+
+       for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+       {
+               if (i <= (maxoff - FirstOffsetNumber + 1) / 2)
+               {
+                       v->spl_left[v->spl_nleft] = i;
+                       v->spl_nleft++;
+               }
+               else
+               {
+                       v->spl_right[v->spl_nright] = i;
+                       v->spl_nright++;
+               }
+       }
+
+       /*
+        * Form unions of each page
+        */
+
+       evec = palloc( sizeof(GISTENTRY) * entryvec->n + GEVHDRSZ );
+
+       evec->n = v->spl_nleft;
+       memcpy(evec->vector, entryvec->vector + FirstOffsetNumber, 
+                                                sizeof(GISTENTRY) * evec->n);
+    v->spl_ldatum = FunctionCall2(&giststate->unionFn[attno],
+                                                                       PointerGetDatum(evec),
+                                                                       PointerGetDatum(&nbytes));
+
+       evec->n = v->spl_nright;
+       memcpy(evec->vector, entryvec->vector + FirstOffsetNumber + v->spl_nleft, 
+                                                sizeof(GISTENTRY) * evec->n);
+    v->spl_rdatum = FunctionCall2(&giststate->unionFn[attno],
+                                                                       PointerGetDatum(evec),
+                                                                       PointerGetDatum(&nbytes));
+}
+
+/*
  * Calls user picksplit method for attno columns to split vector to
  * two vectors. May use attno+n columns data to
  * get better split.
@@ -296,7 +353,7 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVec
 
        /*
         * now let the user-defined picksplit function set up the split vector; in
-        * entryvec have no null value!!
+        * entryvec there is no null value!!
         */
 
        sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
@@ -308,18 +365,43 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVec
                                  PointerGetDatum(entryvec),
                                  PointerGetDatum(sv));
 
-       /* compatibility with old code */
-       if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber)
-               sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
-       if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber)
-               sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
+       if ( sv->spl_nleft == 0 || sv->spl_nright == 0 )
+       {
+               ereport(DEBUG1,
+                               (errcode(ERRCODE_INTERNAL_ERROR),
+                                errmsg("Picksplit method for %d column of index \"%s\" failed",
+                                                                                       attno+1, RelationGetRelationName(r)),
+                                errhint("Index is not optimal, to optimize it contact developer or try to use the column as a second one in create index command")));
 
-       if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
+               /*
+                * Reinit GIST_SPLITVEC. Although that fields are not used
+                * by genericPickSplit(), let us set up it for further processing 
+                */
+               sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
+               sv->spl_rdatum_exists = (v->spl_risnull[attno]) ? false : true;
+               sv->spl_ldatum = v->spl_lattr[attno];
+               sv->spl_rdatum = v->spl_rattr[attno];
+
+               genericPickSplit(giststate, entryvec, sv, attno);
+
+               if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
+                       supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
+       }
+       else
        {
-               elog(LOG, "PickSplit method of %d columns of index '%s' doesn't support secondary split",
-                        attno + 1, RelationGetRelationName(r));
+               /* compatibility with old code */
+               if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber)
+                       sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
+               if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber)
+                       sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
 
-               supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
+               if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
+               {
+                       elog(LOG, "PickSplit method of %d columns of index '%s' doesn't support secondary split",
+                                attno + 1, RelationGetRelationName(r));
+
+                       supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
+               }
        }
 
        v->spl_lattr[attno] = sv->spl_ldatum;