OSDN Git Service

pgindent run for 8.2.
[pg-rex/syncrep.git] / src / backend / access / gist / gistsplit.c
1 /*-------------------------------------------------------------------------
2  *
3  * gistsplit.c
4  *        Split page algorithm
5  *
6  *
7  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/gist/gistsplit.c,v 1.3 2006/10/04 00:29:48 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/gist_private.h"
18
19 typedef struct
20 {
21         Datum      *attr;
22         int                     len;
23         OffsetNumber *entries;
24         bool       *isnull;
25         bool       *equiv;
26 } GistSplitUnion;
27
28
29 /*
30  * Forms unions of subkeys after page split, but
31  * uses only tuples aren't in groups of equalent tuples
32  */
33 static void
34 gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec,
35                                    GistSplitUnion *gsvp, int startkey)
36 {
37         IndexTuple *cleanedItVec;
38         int                     i,
39                                 cleanedLen = 0;
40
41         cleanedItVec = (IndexTuple *) palloc(sizeof(IndexTuple) * gsvp->len);
42
43         for (i = 0; i < gsvp->len; i++)
44         {
45                 if (gsvp->equiv && gsvp->equiv[gsvp->entries[i]])
46                         continue;
47
48                 cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1];
49         }
50
51         gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey,
52                                            gsvp->attr, gsvp->isnull);
53
54         pfree(cleanedItVec);
55 }
56
57 /*
58  * unions subkeys for after user picksplit over attno-1 column
59  */
60 static void
61 gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GistSplitVector *spl, int attno)
62 {
63         GistSplitUnion gsvp;
64
65         gsvp.equiv = spl->spl_equiv;
66
67         gsvp.attr = spl->spl_lattr;
68         gsvp.len = spl->splitVector.spl_nleft;
69         gsvp.entries = spl->splitVector.spl_left;
70         gsvp.isnull = spl->spl_lisnull;
71
72         gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
73
74         gsvp.attr = spl->spl_rattr;
75         gsvp.len = spl->splitVector.spl_nright;
76         gsvp.entries = spl->splitVector.spl_right;
77         gsvp.isnull = spl->spl_risnull;
78
79         gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
80 }
81
82 /*
83  * find group in vector with equivalent value
84  */
85 static int
86 gistfindgroup(Relation r, GISTSTATE *giststate, GISTENTRY *valvec, GistSplitVector *spl, int attno)
87 {
88         int                     i;
89         GISTENTRY       entry;
90         int                     len = 0;
91
92         /*
93          * attno key is always not null (see gistSplitByKey), so we may not check
94          * for nulls
95          */
96         gistentryinit(entry, spl->splitVector.spl_rdatum, r, NULL, (OffsetNumber) 0, FALSE);
97         for (i = 0; i < spl->splitVector.spl_nleft; i++)
98         {
99                 float           penalty = gistpenalty(giststate, attno, &entry, false,
100                                                            &valvec[spl->splitVector.spl_left[i]], false);
101
102                 if (penalty == 0.0)
103                 {
104                         spl->spl_equiv[spl->splitVector.spl_left[i]] = true;
105                         len++;
106                 }
107         }
108
109         gistentryinit(entry, spl->splitVector.spl_ldatum, r, NULL, (OffsetNumber) 0, FALSE);
110         for (i = 0; i < spl->splitVector.spl_nright; i++)
111         {
112                 float           penalty = gistpenalty(giststate, attno, &entry, false,
113                                                           &valvec[spl->splitVector.spl_right[i]], false);
114
115                 if (penalty == 0.0)
116                 {
117                         spl->spl_equiv[spl->splitVector.spl_right[i]] = true;
118                         len++;
119                 }
120         }
121
122         return len;
123 }
124
125 static void
126 cleanupOffsets(OffsetNumber *a, int *len, bool *equiv, int *LenEquiv)
127 {
128         int                     curlen,
129                                 i;
130         OffsetNumber *curwpos;
131
132         curlen = *len;
133         curwpos = a;
134         for (i = 0; i < *len; i++)
135         {
136                 if (equiv[a[i]] == FALSE)
137                 {
138                         *curwpos = a[i];
139                         curwpos++;
140                 }
141                 else
142                 {
143                         /* corner case: we shouldn't make void array */
144                         if (curlen == 1)
145                         {
146                                 equiv[a[i]] = FALSE;    /* mark item as non-equivalent */
147                                 i--;                    /* redo the same */
148                                 *LenEquiv -= 1;
149                                 continue;
150                         }
151                         else
152                                 curlen--;
153                 }
154         }
155
156         *len = curlen;
157 }
158
159 static void
160 placeOne(Relation r, GISTSTATE *giststate, GistSplitVector *v, IndexTuple itup, OffsetNumber off, int attno)
161 {
162         GISTENTRY       identry[INDEX_MAX_KEYS];
163         bool            isnull[INDEX_MAX_KEYS];
164         bool            toLeft = true;
165
166         gistDeCompressAtt(giststate, r, itup, NULL, (OffsetNumber) 0, identry, isnull);
167
168         for (; attno < giststate->tupdesc->natts; attno++)
169         {
170                 float           lpenalty,
171                                         rpenalty;
172                 GISTENTRY       entry;
173
174                 gistentryinit(entry, v->spl_lattr[attno], r, NULL, 0, FALSE);
175                 lpenalty = gistpenalty(giststate, attno, &entry, v->spl_lisnull[attno], identry + attno, isnull[attno]);
176                 gistentryinit(entry, v->spl_rattr[attno], r, NULL, 0, FALSE);
177                 rpenalty = gistpenalty(giststate, attno, &entry, v->spl_risnull[attno], identry + attno, isnull[attno]);
178
179                 if (lpenalty != rpenalty)
180                 {
181                         if (lpenalty > rpenalty)
182                                 toLeft = false;
183                         break;
184                 }
185         }
186
187         if (toLeft)
188                 v->splitVector.spl_left[v->splitVector.spl_nleft++] = off;
189         else
190                 v->splitVector.spl_right[v->splitVector.spl_nright++] = off;
191 }
192
193 #define SWAPVAR( s, d, t ) \
194 do {    \
195         (t) = (s); \
196         (s) = (d); \
197         (d) = (t); \
198 } while(0)
199
200 /*
201  * adjust left and right unions according to splits by previous
202  * split by firsts columns. This function is called only in case
203  * when pickSplit doesn't support subspplit.
204  */
205
206 static void
207 supportSecondarySplit(Relation r, GISTSTATE *giststate, int attno, GIST_SPLITVEC *sv, Datum oldL, Datum oldR)
208 {
209         bool            leaveOnLeft = true,
210                                 tmpBool;
211         GISTENTRY       entryL,
212                                 entryR,
213                                 entrySL,
214                                 entrySR;
215
216         gistentryinit(entryL, oldL, r, NULL, 0, FALSE);
217         gistentryinit(entryR, oldR, r, NULL, 0, FALSE);
218         gistentryinit(entrySL, sv->spl_ldatum, r, NULL, 0, FALSE);
219         gistentryinit(entrySR, sv->spl_rdatum, r, NULL, 0, FALSE);
220
221         if (sv->spl_ldatum_exists && sv->spl_rdatum_exists)
222         {
223                 float           penalty1,
224                                         penalty2;
225
226                 penalty1 = gistpenalty(giststate, attno, &entryL, false, &entrySL, false) +
227                         gistpenalty(giststate, attno, &entryR, false, &entrySR, false);
228                 penalty2 = gistpenalty(giststate, attno, &entryL, false, &entrySR, false) +
229                         gistpenalty(giststate, attno, &entryR, false, &entrySL, false);
230
231                 if (penalty1 > penalty2)
232                         leaveOnLeft = false;
233
234         }
235         else
236         {
237                 GISTENTRY  *entry1 = (sv->spl_ldatum_exists) ? &entryL : &entryR;
238                 float           penalty1,
239                                         penalty2;
240
241                 /*
242                  * there is only one previously defined union, so we just choose swap
243                  * or not by lowest penalty
244                  */
245
246                 penalty1 = gistpenalty(giststate, attno, entry1, false, &entrySL, false);
247                 penalty2 = gistpenalty(giststate, attno, entry1, false, &entrySR, false);
248
249                 if (penalty1 < penalty2)
250                         leaveOnLeft = (sv->spl_ldatum_exists) ? true : false;
251                 else
252                         leaveOnLeft = (sv->spl_rdatum_exists) ? true : false;
253         }
254
255         if (leaveOnLeft == false)
256         {
257                 /*
258                  * swap left and right
259                  */
260                 OffsetNumber *off,
261                                         noff;
262                 Datum           datum;
263
264                 SWAPVAR(sv->spl_left, sv->spl_right, off);
265                 SWAPVAR(sv->spl_nleft, sv->spl_nright, noff);
266                 SWAPVAR(sv->spl_ldatum, sv->spl_rdatum, datum);
267                 gistentryinit(entrySL, sv->spl_ldatum, r, NULL, 0, FALSE);
268                 gistentryinit(entrySR, sv->spl_rdatum, r, NULL, 0, FALSE);
269         }
270
271         if (sv->spl_ldatum_exists)
272                 gistMakeUnionKey(giststate, attno, &entryL, false, &entrySL, false,
273                                                  &sv->spl_ldatum, &tmpBool);
274
275         if (sv->spl_rdatum_exists)
276                 gistMakeUnionKey(giststate, attno, &entryR, false, &entrySR, false,
277                                                  &sv->spl_rdatum, &tmpBool);
278
279         sv->spl_ldatum_exists = sv->spl_rdatum_exists = false;
280 }
281
282 /*
283  * Calls user picksplit method for attno columns to split vector to
284  * two vectors. May use attno+n columns data to
285  * get better split.
286  * Returns TRUE and v->spl_equiv = NULL if left and right unions of attno columns are the same,
287  * so caller may find better split
288  * Returns TRUE and v->spl_equiv != NULL if there is tuples which may be freely moved
289  */
290 static bool
291 gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVector *v,
292                                   IndexTuple *itup, int len, GISTSTATE *giststate)
293 {
294         GIST_SPLITVEC *sv = &v->splitVector;
295
296         /*
297          * now let the user-defined picksplit function set up the split vector; in
298          * entryvec have no null value!!
299          */
300
301         sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
302         sv->spl_rdatum_exists = (v->spl_risnull[attno]) ? false : true;
303         sv->spl_ldatum = v->spl_lattr[attno];
304         sv->spl_rdatum = v->spl_rattr[attno];
305
306         FunctionCall2(&giststate->picksplitFn[attno],
307                                   PointerGetDatum(entryvec),
308                                   PointerGetDatum(sv));
309
310         /* compatibility with old code */
311         if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber)
312                 sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
313         if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber)
314                 sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
315
316         if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
317         {
318                 elog(LOG, "PickSplit method of %d columns of index '%s' doesn't support secondary split",
319                          attno + 1, RelationGetRelationName(r));
320
321                 supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
322         }
323
324         v->spl_lattr[attno] = sv->spl_ldatum;
325         v->spl_rattr[attno] = sv->spl_rdatum;
326         v->spl_lisnull[attno] = false;
327         v->spl_risnull[attno] = false;
328
329         /*
330          * if index is multikey, then we must to try get smaller bounding box for
331          * subkey(s)
332          */
333         v->spl_equiv = NULL;
334
335         if (giststate->tupdesc->natts > 1 && attno + 1 != giststate->tupdesc->natts)
336         {
337                 if (gistKeyIsEQ(giststate, attno, sv->spl_ldatum, sv->spl_rdatum))
338                 {
339                         /*
340                          * Left and right key's unions are equial, so we can get better
341                          * split by following columns. Note, unions for attno columns are
342                          * already done.
343                          */
344
345                         return true;
346                 }
347                 else
348                 {
349                         int                     LenEquiv;
350
351                         v->spl_equiv = (bool *) palloc0(sizeof(bool) * (entryvec->n + 1));
352
353                         LenEquiv = gistfindgroup(r, giststate, entryvec->vector, v, attno);
354
355                         /*
356                          * if possible, we should distribute equivalent tuples
357                          */
358                         if (LenEquiv == 0)
359                         {
360                                 gistunionsubkey(giststate, itup, v, attno + 1);
361                         }
362                         else
363                         {
364                                 cleanupOffsets(sv->spl_left, &sv->spl_nleft, v->spl_equiv, &LenEquiv);
365                                 cleanupOffsets(sv->spl_right, &sv->spl_nright, v->spl_equiv, &LenEquiv);
366
367                                 gistunionsubkey(giststate, itup, v, attno + 1);
368                                 if (LenEquiv == 1)
369                                 {
370                                         /*
371                                          * In case with one tuple we just choose left-right by
372                                          * penalty. It's simplify user-defined pickSplit
373                                          */
374                                         OffsetNumber toMove = InvalidOffsetNumber;
375
376                                         for (toMove = FirstOffsetNumber; toMove < entryvec->n; toMove++)
377                                                 if (v->spl_equiv[toMove])
378                                                         break;
379                                         Assert(toMove < entryvec->n);
380
381                                         placeOne(r, giststate, v, itup[toMove - 1], toMove, attno + 1);
382
383                                         /*
384                                          * redo gistunionsubkey(): it will not degradate
385                                          * performance, because it's very rarely
386                                          */
387                                         v->spl_equiv = NULL;
388                                         gistunionsubkey(giststate, itup, v, attno + 1);
389
390                                         return false;
391                                 }
392                                 else if (LenEquiv > 1)
393                                         return true;
394                         }
395                 }
396         }
397
398         return false;
399 }
400
401 /*
402  * simple split page
403  */
404 static void
405 gistSplitHalf(GIST_SPLITVEC *v, int len)
406 {
407         int                     i;
408
409         v->spl_nright = v->spl_nleft = 0;
410         v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
411         v->spl_right = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
412         for (i = 1; i <= len; i++)
413                 if (i < len / 2)
414                         v->spl_right[v->spl_nright++] = i;
415                 else
416                         v->spl_left[v->spl_nleft++] = i;
417 }
418
419 /*
420  * if it was invalid tuple then we need special processing.
421  * We move all invalid tuples on right page.
422  *
423  * if there is no place on left page, gistSplit will be called one more
424  * time for left page.
425  *
426  * Normally, we never exec this code, but after crash replay it's possible
427  * to get 'invalid' tuples (probability is low enough)
428  */
429 static void
430 gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len)
431 {
432         int                     i;
433         static OffsetNumber offInvTuples[MaxOffsetNumber];
434         int                     nOffInvTuples = 0;
435
436         for (i = 1; i <= len; i++)
437                 if (GistTupleIsInvalid(itup[i - 1]))
438                         offInvTuples[nOffInvTuples++] = i;
439
440         if (nOffInvTuples == len)
441         {
442                 /* corner case, all tuples are invalid */
443                 v->spl_rightvalid = v->spl_leftvalid = false;
444                 gistSplitHalf(&v->splitVector, len);
445         }
446         else
447         {
448                 GistSplitUnion gsvp;
449
450                 v->splitVector.spl_right = offInvTuples;
451                 v->splitVector.spl_nright = nOffInvTuples;
452                 v->spl_rightvalid = false;
453
454                 v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
455                 v->splitVector.spl_nleft = 0;
456                 for (i = 1; i <= len; i++)
457                         if (!GistTupleIsInvalid(itup[i - 1]))
458                                 v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
459                 v->spl_leftvalid = true;
460
461                 gsvp.equiv = NULL;
462                 gsvp.attr = v->spl_lattr;
463                 gsvp.len = v->splitVector.spl_nleft;
464                 gsvp.entries = v->splitVector.spl_left;
465                 gsvp.isnull = v->spl_lisnull;
466
467                 gistunionsubkeyvec(giststate, itup, &gsvp, 0);
468         }
469 }
470
471 /*
472  * trys to split page by attno key, in a case of null
473  * values move its to separate page.
474  */
475 void
476 gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate,
477                            GistSplitVector *v, GistEntryVector *entryvec, int attno)
478 {
479         int                     i;
480         static OffsetNumber offNullTuples[MaxOffsetNumber];
481         int                     nOffNullTuples = 0;
482
483         for (i = 1; i <= len; i++)
484         {
485                 Datum           datum;
486                 bool            IsNull;
487
488                 if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
489                 {
490                         gistSplitByInvalid(giststate, v, itup, len);
491                         return;
492                 }
493
494                 datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull);
495                 gistdentryinit(giststate, attno, &(entryvec->vector[i]),
496                                            datum, r, page, i,
497                                            FALSE, IsNull);
498                 if (IsNull)
499                         offNullTuples[nOffNullTuples++] = i;
500         }
501
502         v->spl_leftvalid = v->spl_rightvalid = true;
503
504         if (nOffNullTuples == len)
505         {
506                 /*
507                  * Corner case: All keys in attno column are null, we should try to
508                  * split by keys in next column. It all keys in all columns are NULL
509                  * just split page half by half
510                  */
511                 v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE;
512
513                 if (attno + 1 == r->rd_att->natts)
514                         gistSplitHalf(&v->splitVector, len);
515                 else
516                         gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno + 1);
517         }
518         else if (nOffNullTuples > 0)
519         {
520                 int                     j = 0;
521
522                 /*
523                  * We don't want to mix NULLs and not-NULLs keys on one page, so move
524                  * nulls to right page
525                  */
526                 v->splitVector.spl_right = offNullTuples;
527                 v->splitVector.spl_nright = nOffNullTuples;
528                 v->spl_risnull[attno] = TRUE;
529
530                 v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
531                 v->splitVector.spl_nleft = 0;
532                 for (i = 1; i <= len; i++)
533                         if (j < v->splitVector.spl_nright && offNullTuples[j] == i)
534                                 j++;
535                         else
536                                 v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
537
538                 v->spl_equiv = NULL;
539                 gistunionsubkey(giststate, itup, v, attno);
540         }
541         else
542         {
543                 /*
544                  * all keys are not-null
545                  */
546                 entryvec->n = len + 1;
547
548                 if (gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate) && attno + 1 != r->rd_att->natts)
549                 {
550                         /*
551                          * Splitting on attno column is not optimized: there is a tuples
552                          * which can be freely left or right page, we will try to split
553                          * page by following columns
554                          */
555                         if (v->spl_equiv == NULL)
556                         {
557                                 /*
558                                  * simple case: left and right keys for attno column are
559                                  * equial
560                                  */
561                                 gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno + 1);
562                         }
563                         else
564                         {
565                                 /* we should clean up vector from already distributed tuples */
566                                 IndexTuple *newitup = (IndexTuple *) palloc((len + 1) * sizeof(IndexTuple));
567                                 OffsetNumber *map = (OffsetNumber *) palloc((len + 1) * sizeof(IndexTuple));
568                                 int                     newlen = 0;
569                                 GIST_SPLITVEC backupSplit = v->splitVector;
570
571                                 for (i = 0; i < len; i++)
572                                         if (v->spl_equiv[i + 1])
573                                         {
574                                                 map[newlen] = i + 1;
575                                                 newitup[newlen++] = itup[i];
576                                         }
577
578                                 Assert(newlen > 0);
579
580                                 backupSplit.spl_left = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
581                                 memcpy(backupSplit.spl_left, v->splitVector.spl_left, sizeof(OffsetNumber) * v->splitVector.spl_nleft);
582                                 backupSplit.spl_right = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
583                                 memcpy(backupSplit.spl_right, v->splitVector.spl_right, sizeof(OffsetNumber) * v->splitVector.spl_nright);
584
585                                 gistSplitByKey(r, page, newitup, newlen, giststate, v, entryvec, attno + 1);
586
587                                 /* merge result of subsplit */
588                                 for (i = 0; i < v->splitVector.spl_nleft; i++)
589                                         backupSplit.spl_left[backupSplit.spl_nleft++] = map[v->splitVector.spl_left[i] - 1];
590                                 for (i = 0; i < v->splitVector.spl_nright; i++)
591                                         backupSplit.spl_right[backupSplit.spl_nright++] = map[v->splitVector.spl_right[i] - 1];
592
593                                 v->splitVector = backupSplit;
594                                 /* reunion left and right datums */
595                                 gistunionsubkey(giststate, itup, v, attno);
596                         }
597                 }
598         }
599 }