OSDN Git Service

pgindent run for 8.3.
[pg-rex/syncrep.git] / src / backend / access / gin / gindatapage.c
1 /*-------------------------------------------------------------------------
2  *
3  * gindatapage.c
4  *        page utilities routines for the postgres inverted index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      $PostgreSQL: pgsql/src/backend/access/gin/gindatapage.c,v 1.8 2007/11/15 21:14:31 momjian Exp $
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16 #include "access/gin.h"
17
18 int
19 compareItemPointers(ItemPointer a, ItemPointer b)
20 {
21         if (GinItemPointerGetBlockNumber(a) == GinItemPointerGetBlockNumber(b))
22         {
23                 if (GinItemPointerGetOffsetNumber(a) == GinItemPointerGetOffsetNumber(b))
24                         return 0;
25                 return (GinItemPointerGetOffsetNumber(a) > GinItemPointerGetOffsetNumber(b)) ? 1 : -1;
26         }
27
28         return (GinItemPointerGetBlockNumber(a) > GinItemPointerGetBlockNumber(b)) ? 1 : -1;
29 }
30
31 /*
32  * Merge two ordered array of itempointer
33  */
34 void
35 MergeItemPointers(ItemPointerData *dst, ItemPointerData *a, uint32 na, ItemPointerData *b, uint32 nb)
36 {
37         ItemPointerData *dptr = dst;
38         ItemPointerData *aptr = a,
39                            *bptr = b;
40
41         while (aptr - a < na && bptr - b < nb)
42         {
43                 if (compareItemPointers(aptr, bptr) > 0)
44                         *dptr++ = *bptr++;
45                 else
46                         *dptr++ = *aptr++;
47         }
48
49         while (aptr - a < na)
50                 *dptr++ = *aptr++;
51
52         while (bptr - b < nb)
53                 *dptr++ = *bptr++;
54 }
55
56 /*
57  * Checks, should we move to right link...
58  * Compares inserting itemp pointer with right bound of current page
59  */
60 static bool
61 dataIsMoveRight(GinBtree btree, Page page)
62 {
63         ItemPointer iptr = GinDataPageGetRightBound(page);
64
65         if (GinPageRightMost(page))
66                 return FALSE;
67
68         return (compareItemPointers(btree->items + btree->curitem, iptr) > 0) ? TRUE : FALSE;
69 }
70
71 /*
72  * Find correct PostingItem in non-leaf page. It supposed that page
73  * correctly chosen and searching value SHOULD be on page
74  */
75 static BlockNumber
76 dataLocateItem(GinBtree btree, GinBtreeStack *stack)
77 {
78         OffsetNumber low,
79                                 high,
80                                 maxoff;
81         PostingItem *pitem = NULL;
82         int                     result;
83         Page            page = BufferGetPage(stack->buffer);
84
85         Assert(!GinPageIsLeaf(page));
86         Assert(GinPageIsData(page));
87
88         if (btree->fullScan)
89         {
90                 stack->off = FirstOffsetNumber;
91                 stack->predictNumber *= GinPageGetOpaque(page)->maxoff;
92                 return btree->getLeftMostPage(btree, page);
93         }
94
95         low = FirstOffsetNumber;
96         maxoff = high = GinPageGetOpaque(page)->maxoff;
97         Assert(high >= low);
98
99         high++;
100
101         while (high > low)
102         {
103                 OffsetNumber mid = low + ((high - low) / 2);
104
105                 pitem = (PostingItem *) GinDataPageGetItem(page, mid);
106
107                 if (mid == maxoff)
108
109                         /*
110                          * Right infinity, page already correctly chosen with a help of
111                          * dataIsMoveRight
112                          */
113                         result = -1;
114                 else
115                 {
116                         pitem = (PostingItem *) GinDataPageGetItem(page, mid);
117                         result = compareItemPointers(btree->items + btree->curitem, &(pitem->key));
118                 }
119
120                 if (result == 0)
121                 {
122                         stack->off = mid;
123                         return PostingItemGetBlockNumber(pitem);
124                 }
125                 else if (result > 0)
126                         low = mid + 1;
127                 else
128                         high = mid;
129         }
130
131         Assert(high >= FirstOffsetNumber && high <= maxoff);
132
133         stack->off = high;
134         pitem = (PostingItem *) GinDataPageGetItem(page, high);
135         return PostingItemGetBlockNumber(pitem);
136 }
137
138 /*
139  * Searches correct position for value on leaf page.
140  * Page should be correctly chosen.
141  * Returns true if value found on page.
142  */
143 static bool
144 dataLocateLeafItem(GinBtree btree, GinBtreeStack *stack)
145 {
146         Page            page = BufferGetPage(stack->buffer);
147         OffsetNumber low,
148                                 high;
149         int                     result;
150
151         Assert(GinPageIsLeaf(page));
152         Assert(GinPageIsData(page));
153
154         if (btree->fullScan)
155         {
156                 stack->off = FirstOffsetNumber;
157                 return TRUE;
158         }
159
160         low = FirstOffsetNumber;
161         high = GinPageGetOpaque(page)->maxoff;
162
163         if (high < low)
164         {
165                 stack->off = FirstOffsetNumber;
166                 return false;
167         }
168
169         high++;
170
171         while (high > low)
172         {
173                 OffsetNumber mid = low + ((high - low) / 2);
174
175                 result = compareItemPointers(btree->items + btree->curitem, (ItemPointer) GinDataPageGetItem(page, mid));
176
177                 if (result == 0)
178                 {
179                         stack->off = mid;
180                         return true;
181                 }
182                 else if (result > 0)
183                         low = mid + 1;
184                 else
185                         high = mid;
186         }
187
188         stack->off = high;
189         return false;
190 }
191
192 /*
193  * Finds links to blkno on non-leaf page, returns
194  * offset of PostingItem
195  */
196 static OffsetNumber
197 dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
198 {
199         OffsetNumber i,
200                                 maxoff = GinPageGetOpaque(page)->maxoff;
201         PostingItem *pitem;
202
203         Assert(!GinPageIsLeaf(page));
204         Assert(GinPageIsData(page));
205
206         /* if page isn't changed, we returns storedOff */
207         if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
208         {
209                 pitem = (PostingItem *) GinDataPageGetItem(page, storedOff);
210                 if (PostingItemGetBlockNumber(pitem) == blkno)
211                         return storedOff;
212
213                 /*
214                  * we hope, that needed pointer goes to right. It's true if there
215                  * wasn't a deletion
216                  */
217                 for (i = storedOff + 1; i <= maxoff; i++)
218                 {
219                         pitem = (PostingItem *) GinDataPageGetItem(page, i);
220                         if (PostingItemGetBlockNumber(pitem) == blkno)
221                                 return i;
222                 }
223
224                 maxoff = storedOff - 1;
225         }
226
227         /* last chance */
228         for (i = FirstOffsetNumber; i <= maxoff; i++)
229         {
230                 pitem = (PostingItem *) GinDataPageGetItem(page, i);
231                 if (PostingItemGetBlockNumber(pitem) == blkno)
232                         return i;
233         }
234
235         return InvalidOffsetNumber;
236 }
237
238 /*
239  * returns blkno of leftmost child
240  */
241 static BlockNumber
242 dataGetLeftMostPage(GinBtree btree, Page page)
243 {
244         PostingItem *pitem;
245
246         Assert(!GinPageIsLeaf(page));
247         Assert(GinPageIsData(page));
248         Assert(GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber);
249
250         pitem = (PostingItem *) GinDataPageGetItem(page, FirstOffsetNumber);
251         return PostingItemGetBlockNumber(pitem);
252 }
253
254 /*
255  * add ItemPointer or PostingItem to page. data should point to
256  * correct value! depending on leaf or non-leaf page
257  */
258 void
259 GinDataPageAddItem(Page page, void *data, OffsetNumber offset)
260 {
261         OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
262         char       *ptr;
263
264         if (offset == InvalidOffsetNumber)
265         {
266                 ptr = GinDataPageGetItem(page, maxoff + 1);
267         }
268         else
269         {
270                 ptr = GinDataPageGetItem(page, offset);
271                 if (maxoff + 1 - offset != 0)
272                         memmove(ptr + GinSizeOfItem(page), ptr, (maxoff - offset + 1) * GinSizeOfItem(page));
273         }
274         memcpy(ptr, data, GinSizeOfItem(page));
275
276         GinPageGetOpaque(page)->maxoff++;
277 }
278
279 /*
280  * Deletes posting item from non-leaf page
281  */
282 void
283 PageDeletePostingItem(Page page, OffsetNumber offset)
284 {
285         OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
286
287         Assert(!GinPageIsLeaf(page));
288         Assert(offset >= FirstOffsetNumber && offset <= maxoff);
289
290         if (offset != maxoff)
291                 memmove(GinDataPageGetItem(page, offset), GinDataPageGetItem(page, offset + 1),
292                                 sizeof(PostingItem) * (maxoff - offset));
293
294         GinPageGetOpaque(page)->maxoff--;
295 }
296
297 /*
298  * checks space to install new value,
299  * item pointer never deletes!
300  */
301 static bool
302 dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
303 {
304         Page            page = BufferGetPage(buf);
305
306         Assert(GinPageIsData(page));
307         Assert(!btree->isDelete);
308
309         if (GinPageIsLeaf(page))
310         {
311                 if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
312                 {
313                         if ((btree->nitem - btree->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
314                                 return true;
315                 }
316                 else if (sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
317                         return true;
318         }
319         else if (sizeof(PostingItem) <= GinDataPageGetFreeSpace(page))
320                 return true;
321
322         return false;
323 }
324
325 /*
326  * In case of previous split update old child blkno to
327  * new right page
328  * item pointer never deletes!
329  */
330 static BlockNumber
331 dataPrepareData(GinBtree btree, Page page, OffsetNumber off)
332 {
333         BlockNumber ret = InvalidBlockNumber;
334
335         Assert(GinPageIsData(page));
336
337         if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
338         {
339                 PostingItem *pitem = (PostingItem *) GinDataPageGetItem(page, off);
340
341                 PostingItemSetBlockNumber(pitem, btree->rightblkno);
342                 ret = btree->rightblkno;
343         }
344
345         btree->rightblkno = InvalidBlockNumber;
346
347         return ret;
348 }
349
350 /*
351  * Places keys to page and fills WAL record. In case leaf page and
352  * build mode puts all ItemPointers to page.
353  */
354 static void
355 dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata)
356 {
357         Page            page = BufferGetPage(buf);
358         static XLogRecData rdata[3];
359         int                     sizeofitem = GinSizeOfItem(page);
360         static ginxlogInsert data;
361         int                     cnt = 0;
362
363         *prdata = rdata;
364         Assert(GinPageIsData(page));
365
366         data.updateBlkno = dataPrepareData(btree, page, off);
367
368         data.node = btree->index->rd_node;
369         data.blkno = BufferGetBlockNumber(buf);
370         data.offset = off;
371         data.nitem = 1;
372         data.isDelete = FALSE;
373         data.isData = TRUE;
374         data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
375
376         /*
377          * Prevent full page write if child's split occurs. That is needed to
378          * remove incomplete splits while replaying WAL
379          *
380          * data.updateBlkno contains new block number (of newly created right
381          * page) for recently splited page.
382          */
383         if (data.updateBlkno == InvalidBlockNumber)
384         {
385                 rdata[0].buffer = buf;
386                 rdata[0].buffer_std = FALSE;
387                 rdata[0].data = NULL;
388                 rdata[0].len = 0;
389                 rdata[0].next = &rdata[1];
390                 cnt++;
391         }
392
393         rdata[cnt].buffer = InvalidBuffer;
394         rdata[cnt].data = (char *) &data;
395         rdata[cnt].len = sizeof(ginxlogInsert);
396         rdata[cnt].next = &rdata[cnt + 1];
397         cnt++;
398
399         rdata[cnt].buffer = InvalidBuffer;
400         rdata[cnt].data = (GinPageIsLeaf(page)) ? ((char *) (btree->items + btree->curitem)) : ((char *) &(btree->pitem));
401         rdata[cnt].len = sizeofitem;
402         rdata[cnt].next = NULL;
403
404         if (GinPageIsLeaf(page))
405         {
406                 if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
407                 {
408                         /* usually, create index... */
409                         uint32          savedPos = btree->curitem;
410
411                         while (btree->curitem < btree->nitem)
412                         {
413                                 GinDataPageAddItem(page, btree->items + btree->curitem, off);
414                                 off++;
415                                 btree->curitem++;
416                         }
417                         data.nitem = btree->curitem - savedPos;
418                         rdata[cnt].len = sizeofitem * data.nitem;
419                 }
420                 else
421                 {
422                         GinDataPageAddItem(page, btree->items + btree->curitem, off);
423                         btree->curitem++;
424                 }
425         }
426         else
427                 GinDataPageAddItem(page, &(btree->pitem), off);
428 }
429
430 /*
431  * split page and fills WAL record. original buffer(lbuf) leaves untouched,
432  * returns shadow page of lbuf filled new data. In leaf page and build mode puts all
433  * ItemPointers to pages. Also, in build mode splits data by way to full fulled
434  * left page
435  */
436 static Page
437 dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
438 {
439         static ginxlogSplit data;
440         static XLogRecData rdata[4];
441         static char vector[2 * BLCKSZ];
442         char       *ptr;
443         OffsetNumber separator;
444         ItemPointer bound;
445         Page            lpage = GinPageGetCopyPage(BufferGetPage(lbuf));
446         ItemPointerData oldbound = *GinDataPageGetRightBound(lpage);
447         int                     sizeofitem = GinSizeOfItem(lpage);
448         OffsetNumber maxoff = GinPageGetOpaque(lpage)->maxoff;
449         Page            rpage = BufferGetPage(rbuf);
450         Size            pageSize = PageGetPageSize(lpage);
451         Size            freeSpace;
452         uint32          nCopied = 1;
453
454         GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
455         freeSpace = GinDataPageGetFreeSpace(rpage);
456
457         *prdata = rdata;
458         data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
459                 InvalidOffsetNumber : PostingItemGetBlockNumber(&(btree->pitem));
460         data.updateBlkno = dataPrepareData(btree, lpage, off);
461
462         memcpy(vector, GinDataPageGetItem(lpage, FirstOffsetNumber),
463                    maxoff * sizeofitem);
464
465         if (GinPageIsLeaf(lpage) && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
466         {
467                 nCopied = 0;
468                 while (btree->curitem < btree->nitem && maxoff * sizeof(ItemPointerData) < 2 * (freeSpace - sizeof(ItemPointerData)))
469                 {
470                         memcpy(vector + maxoff * sizeof(ItemPointerData), btree->items + btree->curitem,
471                                    sizeof(ItemPointerData));
472                         maxoff++;
473                         nCopied++;
474                         btree->curitem++;
475                 }
476         }
477         else
478         {
479                 ptr = vector + (off - 1) * sizeofitem;
480                 if (maxoff + 1 - off != 0)
481                         memmove(ptr + sizeofitem, ptr, (maxoff - off + 1) * sizeofitem);
482                 if (GinPageIsLeaf(lpage))
483                 {
484                         memcpy(ptr, btree->items + btree->curitem, sizeofitem);
485                         btree->curitem++;
486                 }
487                 else
488                         memcpy(ptr, &(btree->pitem), sizeofitem);
489
490                 maxoff++;
491         }
492
493         /*
494          * we suppose that during index creation table scaned from begin to end,
495          * so ItemPointers are monotonically increased..
496          */
497         if (btree->isBuild && GinPageRightMost(lpage))
498                 separator = freeSpace / sizeofitem;
499         else
500                 separator = maxoff / 2;
501
502         GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
503         GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
504
505         memcpy(GinDataPageGetItem(lpage, FirstOffsetNumber), vector, separator * sizeofitem);
506         GinPageGetOpaque(lpage)->maxoff = separator;
507         memcpy(GinDataPageGetItem(rpage, FirstOffsetNumber),
508                  vector + separator * sizeofitem, (maxoff - separator) * sizeofitem);
509         GinPageGetOpaque(rpage)->maxoff = maxoff - separator;
510
511         PostingItemSetBlockNumber(&(btree->pitem), BufferGetBlockNumber(lbuf));
512         if (GinPageIsLeaf(lpage))
513                 btree->pitem.key = *(ItemPointerData *) GinDataPageGetItem(lpage,
514                                                                                         GinPageGetOpaque(lpage)->maxoff);
515         else
516                 btree->pitem.key = ((PostingItem *) GinDataPageGetItem(lpage,
517                                                                           GinPageGetOpaque(lpage)->maxoff))->key;
518         btree->rightblkno = BufferGetBlockNumber(rbuf);
519
520         /* set up right bound for left page */
521         bound = GinDataPageGetRightBound(lpage);
522         *bound = btree->pitem.key;
523
524         /* set up right bound for right page */
525         bound = GinDataPageGetRightBound(rpage);
526         *bound = oldbound;
527
528         data.node = btree->index->rd_node;
529         data.rootBlkno = InvalidBlockNumber;
530         data.lblkno = BufferGetBlockNumber(lbuf);
531         data.rblkno = BufferGetBlockNumber(rbuf);
532         data.separator = separator;
533         data.nitem = maxoff;
534         data.isData = TRUE;
535         data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
536         data.isRootSplit = FALSE;
537         data.rightbound = oldbound;
538
539         rdata[0].buffer = InvalidBuffer;
540         rdata[0].data = (char *) &data;
541         rdata[0].len = sizeof(ginxlogSplit);
542         rdata[0].next = &rdata[1];
543
544         rdata[1].buffer = InvalidBuffer;
545         rdata[1].data = vector;
546         rdata[1].len = MAXALIGN(maxoff * sizeofitem);
547         rdata[1].next = NULL;
548
549         return lpage;
550 }
551
552 /*
553  * Fills new root by right bound values from child.
554  * Also called from ginxlog, should not use btree
555  */
556 void
557 dataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
558 {
559         Page            page = BufferGetPage(root),
560                                 lpage = BufferGetPage(lbuf),
561                                 rpage = BufferGetPage(rbuf);
562         PostingItem li,
563                                 ri;
564
565         li.key = *GinDataPageGetRightBound(lpage);
566         PostingItemSetBlockNumber(&li, BufferGetBlockNumber(lbuf));
567         GinDataPageAddItem(page, &li, InvalidOffsetNumber);
568
569         ri.key = *GinDataPageGetRightBound(rpage);
570         PostingItemSetBlockNumber(&ri, BufferGetBlockNumber(rbuf));
571         GinDataPageAddItem(page, &ri, InvalidOffsetNumber);
572 }
573
574 void
575 prepareDataScan(GinBtree btree, Relation index)
576 {
577         memset(btree, 0, sizeof(GinBtreeData));
578         btree->index = index;
579         btree->isMoveRight = dataIsMoveRight;
580         btree->findChildPage = dataLocateItem;
581         btree->findItem = dataLocateLeafItem;
582         btree->findChildPtr = dataFindChildPtr;
583         btree->getLeftMostPage = dataGetLeftMostPage;
584         btree->isEnoughSpace = dataIsEnoughSpace;
585         btree->placeToPage = dataPlaceToPage;
586         btree->splitPage = dataSplitPage;
587         btree->fillRoot = dataFillRoot;
588
589         btree->searchMode = FALSE;
590         btree->isDelete = FALSE;
591         btree->fullScan = FALSE;
592         btree->isBuild = FALSE;
593 }
594
595 GinPostingTreeScan *
596 prepareScanPostingTree(Relation index, BlockNumber rootBlkno, bool searchMode)
597 {
598         GinPostingTreeScan *gdi = (GinPostingTreeScan *) palloc0(sizeof(GinPostingTreeScan));
599
600         prepareDataScan(&gdi->btree, index);
601
602         gdi->btree.searchMode = searchMode;
603         gdi->btree.fullScan = searchMode;
604
605         gdi->stack = ginPrepareFindLeafPage(&gdi->btree, rootBlkno);
606
607         return gdi;
608 }
609
610 /*
611  * Inserts array of item pointers, may execute several tree scan (very rare)
612  */
613 void
614 insertItemPointer(GinPostingTreeScan *gdi, ItemPointerData *items, uint32 nitem)
615 {
616         BlockNumber rootBlkno = gdi->stack->blkno;
617
618         gdi->btree.items = items;
619         gdi->btree.nitem = nitem;
620         gdi->btree.curitem = 0;
621
622         while (gdi->btree.curitem < gdi->btree.nitem)
623         {
624                 if (!gdi->stack)
625                         gdi->stack = ginPrepareFindLeafPage(&gdi->btree, rootBlkno);
626
627                 gdi->stack = ginFindLeafPage(&gdi->btree, gdi->stack);
628
629                 if (gdi->btree.findItem(&(gdi->btree), gdi->stack))
630                         elog(ERROR, "item pointer (%u,%d) already exists",
631                         ItemPointerGetBlockNumber(gdi->btree.items + gdi->btree.curitem),
632                                  ItemPointerGetOffsetNumber(gdi->btree.items + gdi->btree.curitem));
633
634                 ginInsertValue(&(gdi->btree), gdi->stack);
635
636                 gdi->stack = NULL;
637         }
638 }
639
640 Buffer
641 scanBeginPostingTree(GinPostingTreeScan *gdi)
642 {
643         gdi->stack = ginFindLeafPage(&gdi->btree, gdi->stack);
644         return gdi->stack->buffer;
645 }