OSDN Git Service

1a623698f57c34711c62225384e7c73feb510281
[pg-rex/syncrep.git] / src / backend / access / nbtree / nbtpage.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtpage.c
4  *        BTree-specific page management code for the Postgres btree access
5  *        method.
6  *
7  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.36 2000/04/12 17:14:49 momjian Exp $
13  *
14  *      NOTES
15  *         Postgres btree pages look like ordinary relation pages.      The opaque
16  *         data at high addresses includes pointers to left and right siblings
17  *         and flag data describing page state.  The first page in a btree, page
18  *         zero, is special -- it stores meta-information describing the tree.
19  *         Pages one and higher store the actual tree data.
20  *
21  *-------------------------------------------------------------------------
22  */
23 #include <time.h>
24
25 #include "postgres.h"
26
27 #include "access/nbtree.h"
28 #include "miscadmin.h"
29
30 #define BTREE_METAPAGE  0
31 #define BTREE_MAGIC             0x053162
32
33 #define BTREE_VERSION   1
34
35 typedef struct BTMetaPageData
36 {
37         uint32          btm_magic;
38         uint32          btm_version;
39         BlockNumber btm_root;
40         int32           btm_level;
41 } BTMetaPageData;
42
43 #define BTPageGetMeta(p) \
44         ((BTMetaPageData *) &((PageHeader) p)->pd_linp[0])
45
46
47 /*
48  *      We use high-concurrency locking on btrees.      There are two cases in
49  *      which we don't do locking.  One is when we're building the btree.
50  *      Since the creating transaction has not committed, no one can see
51  *      the index, and there's no reason to share locks.  The second case
52  *      is when we're just starting up the database system.  We use some
53  *      special-purpose initialization code in the relation cache manager
54  *      (see utils/cache/relcache.c) to allow us to do indexed scans on
55  *      the system catalogs before we'd normally be able to.  This happens
56  *      before the lock table is fully initialized, so we can't use it.
57  *      Strictly speaking, this violates 2pl, but we don't do 2pl on the
58  *      system catalogs anyway, so I declare this to be okay.
59  */
60
61 #define USELOCKING              (!BuildingBtree && !IsInitProcessingMode())
62
63 /*
64  *      _bt_metapinit() -- Initialize the metadata page of a btree.
65  */
66 void
67 _bt_metapinit(Relation rel)
68 {
69         Buffer          buf;
70         Page            pg;
71         int                     nblocks;
72         BTMetaPageData metad;
73         BTPageOpaque op;
74
75         /* can't be sharing this with anyone, now... */
76         if (USELOCKING)
77                 LockRelation(rel, AccessExclusiveLock);
78
79         if ((nblocks = RelationGetNumberOfBlocks(rel)) != 0)
80         {
81                 elog(ERROR, "Cannot initialize non-empty btree %s",
82                          RelationGetRelationName(rel));
83         }
84
85         buf = ReadBuffer(rel, P_NEW);
86         pg = BufferGetPage(buf);
87         _bt_pageinit(pg, BufferGetPageSize(buf));
88
89         metad.btm_magic = BTREE_MAGIC;
90         metad.btm_version = BTREE_VERSION;
91         metad.btm_root = P_NONE;
92         metad.btm_level = 0;
93         memmove((char *) BTPageGetMeta(pg), (char *) &metad, sizeof(metad));
94
95         op = (BTPageOpaque) PageGetSpecialPointer(pg);
96         op->btpo_flags = BTP_META;
97
98         WriteBuffer(buf);
99
100         /* all done */
101         if (USELOCKING)
102                 UnlockRelation(rel, AccessExclusiveLock);
103 }
104
105 #ifdef NOT_USED
106 /*
107  *      _bt_checkmeta() -- Verify that the metadata stored in a btree are
108  *                                         reasonable.
109  */
110 void
111 _bt_checkmeta(Relation rel)
112 {
113         Buffer          metabuf;
114         Page            metap;
115         BTMetaPageData *metad;
116         BTPageOpaque op;
117         int                     nblocks;
118
119         /* if the relation is empty, this is init time; don't complain */
120         if ((nblocks = RelationGetNumberOfBlocks(rel)) == 0)
121                 return;
122
123         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
124         metap = BufferGetPage(metabuf);
125         op = (BTPageOpaque) PageGetSpecialPointer(metap);
126         if (!(op->btpo_flags & BTP_META))
127         {
128                 elog(ERROR, "Invalid metapage for index %s",
129                          RelationGetRelationName(rel));
130         }
131         metad = BTPageGetMeta(metap);
132
133         if (metad->btm_magic != BTREE_MAGIC)
134         {
135                 elog(ERROR, "Index %s is not a btree",
136                          RelationGetRelationName(rel));
137         }
138
139         if (metad->btm_version != BTREE_VERSION)
140         {
141                 elog(ERROR, "Version mismatch on %s:  version %d file, version %d code",
142                          RelationGetRelationName(rel),
143                          metad->btm_version, BTREE_VERSION);
144         }
145
146         _bt_relbuf(rel, metabuf, BT_READ);
147 }
148
149 #endif
150
151 /*
152  *      _bt_getroot() -- Get the root page of the btree.
153  *
154  *              Since the root page can move around the btree file, we have to read
155  *              its location from the metadata page, and then read the root page
156  *              itself.  If no root page exists yet, we have to create one.  The
157  *              standard class of race conditions exists here; I think I covered
158  *              them all in the Hopi Indian rain dance of lock requests below.
159  *
160  *              We pass in the access type (BT_READ or BT_WRITE), and return the
161  *              root page's buffer with the appropriate lock type set.  Reference
162  *              count on the root page gets bumped by ReadBuffer.  The metadata
163  *              page is unlocked and unreferenced by this process when this routine
164  *              returns.
165  */
166 Buffer
167 _bt_getroot(Relation rel, int access)
168 {
169         Buffer          metabuf;
170         Page            metapg;
171         BTPageOpaque metaopaque;
172         Buffer          rootbuf;
173         Page            rootpg;
174         BTPageOpaque rootopaque;
175         BlockNumber rootblkno;
176         BTMetaPageData *metad;
177
178         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
179         metapg = BufferGetPage(metabuf);
180         metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg);
181         Assert(metaopaque->btpo_flags & BTP_META);
182         metad = BTPageGetMeta(metapg);
183
184         if (metad->btm_magic != BTREE_MAGIC)
185         {
186                 elog(ERROR, "Index %s is not a btree",
187                          RelationGetRelationName(rel));
188         }
189
190         if (metad->btm_version != BTREE_VERSION)
191         {
192                 elog(ERROR, "Version mismatch on %s:  version %d file, version %d code",
193                          RelationGetRelationName(rel),
194                          metad->btm_version, BTREE_VERSION);
195         }
196
197         /* if no root page initialized yet, do it */
198         if (metad->btm_root == P_NONE)
199         {
200
201                 /* turn our read lock in for a write lock */
202                 _bt_relbuf(rel, metabuf, BT_READ);
203                 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
204                 metapg = BufferGetPage(metabuf);
205                 metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg);
206                 Assert(metaopaque->btpo_flags & BTP_META);
207                 metad = BTPageGetMeta(metapg);
208
209                 /*
210                  * Race condition:      if someone else initialized the metadata
211                  * between the time we released the read lock and acquired the
212                  * write lock, above, we want to avoid doing it again.
213                  */
214
215                 if (metad->btm_root == P_NONE)
216                 {
217
218                         /*
219                          * Get, initialize, write, and leave a lock of the appropriate
220                          * type on the new root page.  Since this is the first page in
221                          * the tree, it's a leaf.
222                          */
223
224                         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
225                         rootblkno = BufferGetBlockNumber(rootbuf);
226                         rootpg = BufferGetPage(rootbuf);
227                         metad->btm_root = rootblkno;
228                         metad->btm_level = 1;
229                         _bt_pageinit(rootpg, BufferGetPageSize(rootbuf));
230                         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpg);
231                         rootopaque->btpo_flags |= (BTP_LEAF | BTP_ROOT);
232                         _bt_wrtnorelbuf(rel, rootbuf);
233
234                         /* swap write lock for read lock, if appropriate */
235                         if (access != BT_WRITE)
236                         {
237                                 LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK);
238                                 LockBuffer(rootbuf, BT_READ);
239                         }
240
241                         /* okay, metadata is correct */
242                         _bt_wrtbuf(rel, metabuf);
243                 }
244                 else
245                 {
246
247                         /*
248                          * Metadata initialized by someone else.  In order to
249                          * guarantee no deadlocks, we have to release the metadata
250                          * page and start all over again.
251                          */
252
253                         _bt_relbuf(rel, metabuf, BT_WRITE);
254                         return _bt_getroot(rel, access);
255                 }
256         }
257         else
258         {
259                 rootblkno = metad->btm_root;
260                 _bt_relbuf(rel, metabuf, BT_READ);              /* done with the meta page */
261
262                 rootbuf = _bt_getbuf(rel, rootblkno, access);
263         }
264
265         /*
266          * Race condition:      If the root page split between the time we looked
267          * at the metadata page and got the root buffer, then we got the wrong
268          * buffer.
269          */
270
271         rootpg = BufferGetPage(rootbuf);
272         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpg);
273         if (!(rootopaque->btpo_flags & BTP_ROOT))
274         {
275
276                 /* it happened, try again */
277                 _bt_relbuf(rel, rootbuf, access);
278                 return _bt_getroot(rel, access);
279         }
280
281         /*
282          * By here, we have a correct lock on the root block, its reference
283          * count is correct, and we have no lock set on the metadata page.
284          * Return the root block.
285          */
286
287         return rootbuf;
288 }
289
290 /*
291  *      _bt_getbuf() -- Get a buffer by block number for read or write.
292  *
293  *              When this routine returns, the appropriate lock is set on the
294  *              requested buffer its reference count is correct.
295  */
296 Buffer
297 _bt_getbuf(Relation rel, BlockNumber blkno, int access)
298 {
299         Buffer          buf;
300         Page            page;
301
302         if (blkno != P_NEW)
303         {
304                 buf = ReadBuffer(rel, blkno);
305                 LockBuffer(buf, access);
306         }
307         else
308         {
309
310                 /*
311                  * Extend bufmgr code is unclean and so we have to use locking
312                  * here.
313                  */
314                 LockPage(rel, 0, ExclusiveLock);
315                 buf = ReadBuffer(rel, blkno);
316                 UnlockPage(rel, 0, ExclusiveLock);
317                 blkno = BufferGetBlockNumber(buf);
318                 page = BufferGetPage(buf);
319                 _bt_pageinit(page, BufferGetPageSize(buf));
320                 LockBuffer(buf, access);
321         }
322
323         /* ref count and lock type are correct */
324         return buf;
325 }
326
327 /*
328  *      _bt_relbuf() -- release a locked buffer.
329  */
330 void
331 _bt_relbuf(Relation rel, Buffer buf, int access)
332 {
333         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
334         ReleaseBuffer(buf);
335 }
336
337 /*
338  *      _bt_wrtbuf() -- write a btree page to disk.
339  *
340  *              This routine releases the lock held on the buffer and our reference
341  *              to it.  It is an error to call _bt_wrtbuf() without a write lock
342  *              or a reference to the buffer.
343  */
344 void
345 _bt_wrtbuf(Relation rel, Buffer buf)
346 {
347         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
348         WriteBuffer(buf);
349 }
350
351 /*
352  *      _bt_wrtnorelbuf() -- write a btree page to disk, but do not release
353  *                                               our reference or lock.
354  *
355  *              It is an error to call _bt_wrtnorelbuf() without a write lock
356  *              or a reference to the buffer.
357  */
358 void
359 _bt_wrtnorelbuf(Relation rel, Buffer buf)
360 {
361         WriteNoReleaseBuffer(buf);
362 }
363
364 /*
365  *      _bt_pageinit() -- Initialize a new page.
366  */
367 void
368 _bt_pageinit(Page page, Size size)
369 {
370
371         /*
372          * Cargo_cult programming -- don't really need this to be zero, but
373          * creating new pages is an infrequent occurrence and it makes me feel
374          * good when I know they're empty.
375          */
376
377         MemSet(page, 0, size);
378
379         PageInit(page, size, sizeof(BTPageOpaqueData));
380         ((BTPageOpaque) PageGetSpecialPointer(page))->btpo_parent =
381                 InvalidBlockNumber;
382 }
383
384 /*
385  *      _bt_metaproot() -- Change the root page of the btree.
386  *
387  *              Lehman and Yao require that the root page move around in order to
388  *              guarantee deadlock-free short-term, fine-granularity locking.  When
389  *              we split the root page, we record the new parent in the metadata page
390  *              for the relation.  This routine does the work.
391  *
392  *              No direct preconditions, but if you don't have the a write lock on
393  *              at least the old root page when you call this, you're making a big
394  *              mistake.  On exit, metapage data is correct and we no longer have
395  *              a reference to or lock on the metapage.
396  */
397 void
398 _bt_metaproot(Relation rel, BlockNumber rootbknum, int level)
399 {
400         Buffer          metabuf;
401         Page            metap;
402         BTPageOpaque metaopaque;
403         BTMetaPageData *metad;
404
405         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
406         metap = BufferGetPage(metabuf);
407         metaopaque = (BTPageOpaque) PageGetSpecialPointer(metap);
408         Assert(metaopaque->btpo_flags & BTP_META);
409         metad = BTPageGetMeta(metap);
410         metad->btm_root = rootbknum;
411         if (level == 0)                         /* called from _do_insert */
412                 metad->btm_level += 1;
413         else
414                 metad->btm_level = level;               /* called from btsort */
415         _bt_wrtbuf(rel, metabuf);
416 }
417
418 /*
419  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
420  *                                               we last looked at in the parent.
421  *
422  *              This is possible because we save a bit image of the last item
423  *              we looked at in the parent, and the update algorithm guarantees
424  *              that if items above us in the tree move, they only move right.
425  *
426  *              Also, re-set bts_blkno & bts_offset if changed and
427  *              bts_btitem (it may be changed - see _bt_insertonpg).
428  */
429 Buffer
430 _bt_getstackbuf(Relation rel, BTStack stack, int access)
431 {
432         Buffer          buf;
433         BlockNumber blkno;
434         OffsetNumber start,
435                                 offnum,
436                                 maxoff;
437         OffsetNumber i;
438         Page            page;
439         ItemId          itemid;
440         BTItem          item;
441         BTPageOpaque opaque;
442         BTItem          item_save;
443         int                     item_nbytes;
444
445         blkno = stack->bts_blkno;
446         buf = _bt_getbuf(rel, blkno, access);
447         page = BufferGetPage(buf);
448         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
449         maxoff = PageGetMaxOffsetNumber(page);
450
451         if (stack->bts_offset == InvalidOffsetNumber ||
452                 maxoff >= stack->bts_offset)
453         {
454
455                 /*
456                  * _bt_insertonpg set bts_offset to InvalidOffsetNumber in the
457                  * case of concurrent ROOT page split
458                  */
459                 if (stack->bts_offset == InvalidOffsetNumber)
460                         i = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY;
461                 else
462                 {
463                         itemid = PageGetItemId(page, stack->bts_offset);
464                         item = (BTItem) PageGetItem(page, itemid);
465
466                         /* if the item is where we left it, we're done */
467                         if (BTItemSame(item, stack->bts_btitem))
468                         {
469                                 pfree(stack->bts_btitem);
470                                 item_nbytes = ItemIdGetLength(itemid);
471                                 item_save = (BTItem) palloc(item_nbytes);
472                                 memmove((char *) item_save, (char *) item, item_nbytes);
473                                 stack->bts_btitem = item_save;
474                                 return buf;
475                         }
476                         i = OffsetNumberNext(stack->bts_offset);
477                 }
478
479                 /* if the item has just moved right on this page, we're done */
480                 for (;
481                          i <= maxoff;
482                          i = OffsetNumberNext(i))
483                 {
484                         itemid = PageGetItemId(page, i);
485                         item = (BTItem) PageGetItem(page, itemid);
486
487                         /* if the item is where we left it, we're done */
488                         if (BTItemSame(item, stack->bts_btitem))
489                         {
490                                 stack->bts_offset = i;
491                                 pfree(stack->bts_btitem);
492                                 item_nbytes = ItemIdGetLength(itemid);
493                                 item_save = (BTItem) palloc(item_nbytes);
494                                 memmove((char *) item_save, (char *) item, item_nbytes);
495                                 stack->bts_btitem = item_save;
496                                 return buf;
497                         }
498                 }
499         }
500
501         /* by here, the item we're looking for moved right at least one page */
502         for (;;)
503         {
504                 blkno = opaque->btpo_next;
505                 if (P_RIGHTMOST(opaque))
506                         elog(FATAL, "my bits moved right off the end of the world!\
507 \n\tRecreate index %s.", RelationGetRelationName(rel));
508
509                 _bt_relbuf(rel, buf, access);
510                 buf = _bt_getbuf(rel, blkno, access);
511                 page = BufferGetPage(buf);
512                 maxoff = PageGetMaxOffsetNumber(page);
513                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
514
515                 /* if we have a right sibling, step over the high key */
516                 start = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY;
517
518                 /* see if it's on this page */
519                 for (offnum = start;
520                          offnum <= maxoff;
521                          offnum = OffsetNumberNext(offnum))
522                 {
523                         itemid = PageGetItemId(page, offnum);
524                         item = (BTItem) PageGetItem(page, itemid);
525                         if (BTItemSame(item, stack->bts_btitem))
526                         {
527                                 stack->bts_offset = offnum;
528                                 stack->bts_blkno = blkno;
529                                 pfree(stack->bts_btitem);
530                                 item_nbytes = ItemIdGetLength(itemid);
531                                 item_save = (BTItem) palloc(item_nbytes);
532                                 memmove((char *) item_save, (char *) item, item_nbytes);
533                                 stack->bts_btitem = item_save;
534                                 return buf;
535                         }
536                 }
537         }
538 }
539
540 void
541 _bt_pagedel(Relation rel, ItemPointer tid)
542 {
543         Buffer          buf;
544         Page            page;
545         BlockNumber blkno;
546         OffsetNumber offno;
547
548         blkno = ItemPointerGetBlockNumber(tid);
549         offno = ItemPointerGetOffsetNumber(tid);
550
551         buf = _bt_getbuf(rel, blkno, BT_WRITE);
552         page = BufferGetPage(buf);
553
554         PageIndexTupleDelete(page, offno);
555
556         /* write the buffer and release the lock */
557         _bt_wrtbuf(rel, buf);
558 }