From: Teodor Sigaev Date: Fri, 19 May 2006 16:15:17 +0000 (+0000) Subject: Simplify gistSplit() and some refactoring related code. X-Git-Tag: REL9_0_0~7921 X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=420cbff8813dd5b1390a1602331b57fbc4fbb899;p=pg-rex%2Fsyncrep.git Simplify gistSplit() and some refactoring related code. --- diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index d207b7ecfa..cb10cbc35b 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.135 2006/05/17 16:34:59 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.136 2006/05/19 16:15:17 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -936,31 +936,6 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1); } -static void -gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset) -{ - int i; - - for (i = 0; i < len; i++) - arr[i] = reasloffset[arr[i]]; -} - -static IndexTupleData * -gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) { - char *ptr, *ret = palloc(BLCKSZ); - int i; - - ptr = ret; - for (i = 0; i < veclen; i++) { - memcpy(ptr, vec[i], IndexTupleSize(vec[i])); - ptr += IndexTupleSize(vec[i]); - } - - *memlen = ptr - ret; - Assert( *memlen < BLCKSZ ); - return (IndexTupleData*)ret; -} - /* * gistSplit -- split a page in the tree. */ @@ -975,100 +950,70 @@ gistSplit(Relation r, *rvectup; GIST_SPLITVEC v; GistEntryVector *entryvec; - int i, - fakeoffset; - OffsetNumber *realoffset; - IndexTuple *cleaneditup = itup; - int lencleaneditup = len; + int i; + OffsetNumber offInvTuples[ MaxOffsetNumber ]; + int nOffInvTuples = 0; SplitedPageLayout *res = NULL; /* generate the item array */ - realoffset = palloc((len + 1) * sizeof(OffsetNumber)); entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY)); entryvec->n = len + 1; - fakeoffset = FirstOffsetNumber; for (i = 1; i <= len; i++) { Datum datum; bool IsNull; if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) - { - entryvec->n--; /* remember position of invalid tuple */ - realoffset[entryvec->n] = i; - continue; - } + offInvTuples[ nOffInvTuples++ ] = i; + + if ( nOffInvTuples > 0 ) + /* we can safely do not decompress other keys, because + we will do splecial processing, but + it's needed to find another invalid tuples */ + continue; datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull); - gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]), + gistdentryinit(giststate, 0, &(entryvec->vector[i]), datum, r, page, i, ATTSIZE(datum, giststate->tupdesc, 1, IsNull), FALSE, IsNull); - realoffset[fakeoffset] = i; - fakeoffset++; } /* - * if it was invalid tuple then we need special processing. If it's - * possible, we move all invalid tuples on right page. We should remember, - * that union with invalid tuples is a invalid tuple. + * if it was invalid tuple then we need special processing. + * We move all invalid tuples on right page. + * + * if there is no place on left page, gistSplit will be called one more + * time for left page. + * + * Normally, we never exec this code, but after crash replay it's possible + * to get 'invalid' tuples (probability is low enough) */ - if (entryvec->n != len + 1) + if (nOffInvTuples > 0) { - lencleaneditup = entryvec->n - 1; - cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple)); - for (i = 1; i < entryvec->n; i++) - cleaneditup[i - 1] = itup[realoffset[i] - 1]; - - if (!gistfitpage(cleaneditup, lencleaneditup)) - { - /* no space on left to put all good tuples, so picksplit */ - gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate); - v.spl_leftvalid = true; - v.spl_rightvalid = false; - gistToRealOffset(v.spl_left, v.spl_nleft, realoffset); - gistToRealOffset(v.spl_right, v.spl_nright, realoffset); - } - else - { - /* we can try to store all valid tuples on one page */ - v.spl_right = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber)); - v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber)); - - if (lencleaneditup == 0) - { - /* all tuples are invalid, so moves half of its to right */ - v.spl_leftvalid = v.spl_rightvalid = false; - v.spl_nright = 0; - v.spl_nleft = 0; - for (i = 1; i <= len; i++) - if (i - 1 < len / 2) - v.spl_left[v.spl_nleft++] = i; - else - v.spl_right[v.spl_nright++] = i; - } - else - { - /* - * we will not call gistUserPicksplit, just put good tuples on - * left and invalid on right - */ - v.spl_nleft = lencleaneditup; - v.spl_nright = 0; - for (i = 1; i < entryvec->n; i++) - v.spl_left[i - 1] = i; - gistToRealOffset(v.spl_left, v.spl_nleft, realoffset); - v.spl_lattr[0] = v.spl_ldatum = (Datum) 0; - v.spl_rattr[0] = v.spl_rdatum = (Datum) 0; - v.spl_lisnull[0] = true; - v.spl_risnull[0] = true; - gistunionsubkey(r, giststate, itup, &v, true); - v.spl_leftvalid = true; - v.spl_rightvalid = false; - } - } + GistSplitVec gsvp; + + v.spl_right = offInvTuples; + v.spl_nright = nOffInvTuples; + v.spl_rightvalid = false; + + v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber)); + v.spl_nleft = 0; + for(i = 1; i <= len; i++) + if ( !GistTupleIsInvalid(itup[i - 1]) ) + v.spl_left[ v.spl_nleft++ ] = i; + v.spl_leftvalid = true; + + gsvp.idgrp = NULL; + gsvp.attrsize = v.spl_lattrsize; + gsvp.attr = v.spl_lattr; + gsvp.len = v.spl_nleft; + gsvp.entries = v.spl_left; + gsvp.isnull = v.spl_lisnull; + + gistunionsubkeyvec(giststate, itup, &gsvp, true); } else { @@ -1088,12 +1033,6 @@ gistSplit(Relation r, for (i = 0; i < v.spl_nright; i++) rvectup[i] = itup[v.spl_right[i] - 1]; - /* place invalid tuples on right page if itsn't done yet */ - for (fakeoffset = entryvec->n; fakeoffset < len + 1 && lencleaneditup; fakeoffset++) - { - rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1]; - } - /* finalyze splitting (may need another split) */ if (!gistfitpage(rvectup, v.spl_nright)) { diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index ca5a9d652d..92798a27d3 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.12 2006/05/17 16:34:59 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.13 2006/05/19 16:15:17 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -140,6 +140,30 @@ gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen) } /* + * make plain IndexTupleVector + */ + +IndexTupleData * +gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) { + char *ptr, *ret; + int i; + + *memlen=0; + + for (i = 0; i < veclen; i++) + *memlen += IndexTupleSize(vec[i]); + + ptr = ret = palloc(*memlen); + + for (i = 0; i < veclen; i++) { + memcpy(ptr, vec[i], IndexTupleSize(vec[i])); + ptr += IndexTupleSize(vec[i]); + } + + return (IndexTupleData*)ret; +} + +/* * Return an IndexTuple containing the result of applying the "union" * method to the specified IndexTuple vector. */ @@ -313,101 +337,102 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis return newtup; } -void -gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall) -{ - int lr; +void +gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec, + GistSplitVec *gsvp, bool isall) { + int i; + GistEntryVector *evec; - for (lr = 0; lr < 2; lr++) - { - OffsetNumber *entries; - int i; - Datum *attr; - int len, - *attrsize; - bool *isnull; - GistEntryVector *evec; - - if (lr) - { - attrsize = spl->spl_lattrsize; - attr = spl->spl_lattr; - len = spl->spl_nleft; - entries = spl->spl_left; - isnull = spl->spl_lisnull; - } - else - { - attrsize = spl->spl_rattrsize; - attr = spl->spl_rattr; - len = spl->spl_nright; - entries = spl->spl_right; - isnull = spl->spl_risnull; - } + evec = palloc(((gsvp->len < 2) ? 2 : gsvp->len) * sizeof(GISTENTRY) + GEVHDRSZ); - evec = palloc(((len < 2) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); + for (i = (isall) ? 0 : 1; i < giststate->tupdesc->natts; i++) + { + int j; + Datum datum; + int datumsize; + int real_len; - for (i = (isall) ? 0 : 1; i < r->rd_att->natts; i++) + real_len = 0; + for (j = 0; j < gsvp->len; j++) { - int j; - Datum datum; - int datumsize; - int real_len; + bool IsNull; - real_len = 0; - for (j = 0; j < len; j++) - { - bool IsNull; + if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[j]]) + continue; - if (spl->spl_idgrp[entries[j]]) - continue; - datum = index_getattr(itvec[entries[j] - 1], i + 1, + datum = index_getattr(itvec[gsvp->entries[j] - 1], i + 1, giststate->tupdesc, &IsNull); - if (IsNull) - continue; - gistdentryinit(giststate, i, - &(evec->vector[real_len]), - datum, - NULL, NULL, (OffsetNumber) 0, + if (IsNull) + continue; + gistdentryinit(giststate, i, + &(evec->vector[real_len]), + datum, + NULL, NULL, (OffsetNumber) 0, ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), - FALSE, IsNull); - real_len++; + FALSE, IsNull); + real_len++; - } + } - if (real_len == 0) + if (real_len == 0) + { + datum = (Datum) 0; + datumsize = 0; + gsvp->isnull[i] = true; + } + else + { + /* + * evec->vector[0].bytes may be not defined, so form union + * with itself + */ + if (real_len == 1) { - datum = (Datum) 0; - datumsize = 0; - isnull[i] = true; + evec->n = 2; + memcpy(&(evec->vector[1]), &(evec->vector[0]), + sizeof(GISTENTRY)); } else - { - /* - * evec->vector[0].bytes may be not defined, so form union - * with itself - */ - if (real_len == 1) - { - evec->n = 2; - memcpy(&(evec->vector[1]), &(evec->vector[0]), - sizeof(GISTENTRY)); - } - else - evec->n = real_len; - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - isnull[i] = false; - } - - attr[i] = datum; - attrsize[i] = datumsize; + evec->n = real_len; + datum = FunctionCall2(&giststate->unionFn[i], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + gsvp->isnull[i] = false; } + + gsvp->attr[i] = datum; + gsvp->attrsize[i] = datumsize; } } /* + * unions subkey for after user picksplit over first column + */ +static void +gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl) +{ + GistSplitVec gsvp; + + gsvp.idgrp = spl->spl_idgrp; + + gsvp.attrsize = spl->spl_lattrsize; + gsvp.attr = spl->spl_lattr; + gsvp.len = spl->spl_nleft; + gsvp.entries = spl->spl_left; + gsvp.isnull = spl->spl_lisnull; + + gistunionsubkeyvec(giststate, itvec, &gsvp, false); + + gsvp.attrsize = spl->spl_rattrsize; + gsvp.attr = spl->spl_rattr; + gsvp.len = spl->spl_nright; + gsvp.entries = spl->spl_right; + gsvp.isnull = spl->spl_risnull; + + gistunionsubkeyvec(giststate, itvec, &gsvp, false); +} + +/* * find group in vector with equal value */ int @@ -840,7 +865,7 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v, * if index is multikey, then we must to try get smaller bounding box for * subkey(s) */ - if (r->rd_att->natts > 1) + if (giststate->tupdesc->natts > 1) { int MaxGrpId; @@ -851,7 +876,7 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v, MaxGrpId = gistfindgroup(giststate, entryvec->vector, v); /* form union of sub keys for each page (l,p) */ - gistunionsubkey(r, giststate, itup, v, false); + gistunionsubkey(giststate, itup, v); /* * if possible, we insert equivalent tuples with control by penalty diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 1126727cd9..aef2056a34 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.18 2006/05/19 11:10:25 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.19 2006/05/19 16:15:17 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -557,28 +557,16 @@ gistMakePageLayout(Buffer *buffers, int nbuffers) { while( nbuffers-- > 0 ) { Page page = BufferGetPage( buffers[ nbuffers ] ); - IndexTuple idxtup; - OffsetNumber i; - char *ptr; + IndexTuple* vec; + int veclen; resptr = (SplitedPageLayout*)palloc0( sizeof(SplitedPageLayout) ); resptr->block.blkno = BufferGetBlockNumber( buffers[ nbuffers ] ); resptr->block.num = PageGetMaxOffsetNumber( page ); - for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) { - idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); - resptr->lenlist += IndexTupleSize(idxtup); - } - - resptr->list = (IndexTupleData*)palloc( resptr->lenlist ); - ptr = (char*)(resptr->list); - - for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) { - idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); - memcpy( ptr, idxtup, IndexTupleSize(idxtup) ); - ptr += IndexTupleSize(idxtup); - } + vec = gistextractpage( page, &veclen ); + resptr->list = gistfillitupvec( vec, veclen, &(resptr->lenlist) ); resptr->next = res; res = resptr; diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index a866277fe9..f08d49dbf9 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.14 2006/05/17 16:34:59 teodor Exp $ + * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.15 2006/05/19 16:15:17 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -283,6 +283,8 @@ extern IndexTuple *gistextractpage(Page page, int *len /* out */ ); extern IndexTuple *gistjoinvector( IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen); +extern IndexTupleData* gistfillitupvec(IndexTuple *vec, int veclen, int *memlen); + extern IndexTuple gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate); extern IndexTuple gistgetadjusted(Relation r, @@ -308,8 +310,19 @@ extern void gistcentryinit(GISTSTATE *giststate, int nkey, extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, OffsetNumber o, GISTENTRY *attdata, bool *isnull); -extern void gistunionsubkey(Relation r, GISTSTATE *giststate, - IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall); + +typedef struct { + int *attrsize; + Datum *attr; + int len; + OffsetNumber *entries; + bool *isnull; + int *idgrp; +} GistSplitVec; + +extern void gistunionsubkeyvec(GISTSTATE *giststate, + IndexTuple *itvec, GistSplitVec *gsvp, bool isall); + extern void GISTInitBuffer(Buffer b, uint32 f); extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, Page pg, OffsetNumber o,