OSDN Git Service

Restructure local-buffer handling per recent pghackers discussion.
[pg-rex/syncrep.git] / src / backend / access / nbtree / nbtpage.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtpage.c
4  *        BTree-specific page management code for the Postgres btree access
5  *        method.
6  *
7  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.58 2002/08/06 02:36:33 tgl Exp $
13  *
14  *      NOTES
15  *         Postgres btree pages look like ordinary relation pages.      The opaque
16  *         data at high addresses includes pointers to left and right siblings
17  *         and flag data describing page state.  The first page in a btree, page
18  *         zero, is special -- it stores meta-information describing the tree.
19  *         Pages one and higher store the actual tree data.
20  *
21  *-------------------------------------------------------------------------
22  */
23 #include "postgres.h"
24
25 #include <time.h>
26
27 #include "access/nbtree.h"
28 #include "miscadmin.h"
29 #include "storage/lmgr.h"
30
31 extern bool FixBTree;                   /* comments in nbtree.c */
32 extern Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
33
34 /*
35  *      We use high-concurrency locking on btrees.      There are two cases in
36  *      which we don't do locking.  One is when we're building the btree.
37  *      Since the creating transaction has not committed, no one can see
38  *      the index, and there's no reason to share locks.  The second case
39  *      is when we're just starting up the database system.  We use some
40  *      special-purpose initialization code in the relation cache manager
41  *      (see utils/cache/relcache.c) to allow us to do indexed scans on
42  *      the system catalogs before we'd normally be able to.  This happens
43  *      before the lock table is fully initialized, so we can't use it.
44  *      Strictly speaking, this violates 2pl, but we don't do 2pl on the
45  *      system catalogs anyway, so I declare this to be okay.
46  */
47
48 #define USELOCKING              (!BuildingBtree && !IsInitProcessingMode())
49
50 /*
51  *      _bt_metapinit() -- Initialize the metadata page of a btree.
52  */
53 void
54 _bt_metapinit(Relation rel)
55 {
56         Buffer          buf;
57         Page            pg;
58         BTMetaPageData metad;
59         BTPageOpaque op;
60
61         /* can't be sharing this with anyone, now... */
62         if (USELOCKING)
63                 LockRelation(rel, AccessExclusiveLock);
64
65         if (RelationGetNumberOfBlocks(rel) != 0)
66                 elog(ERROR, "Cannot initialize non-empty btree %s",
67                          RelationGetRelationName(rel));
68
69         buf = ReadBuffer(rel, P_NEW);
70         pg = BufferGetPage(buf);
71         _bt_pageinit(pg, BufferGetPageSize(buf));
72
73         metad.btm_magic = BTREE_MAGIC;
74         metad.btm_version = BTREE_VERSION;
75         metad.btm_root = P_NONE;
76         metad.btm_level = 0;
77         memcpy((char *) BTPageGetMeta(pg), (char *) &metad, sizeof(metad));
78
79         op = (BTPageOpaque) PageGetSpecialPointer(pg);
80         op->btpo_flags = BTP_META;
81
82         WriteBuffer(buf);
83
84         /* all done */
85         if (USELOCKING)
86                 UnlockRelation(rel, AccessExclusiveLock);
87 }
88
89 /*
90  *      _bt_getroot() -- Get the root page of the btree.
91  *
92  *              Since the root page can move around the btree file, we have to read
93  *              its location from the metadata page, and then read the root page
94  *              itself.  If no root page exists yet, we have to create one.  The
95  *              standard class of race conditions exists here; I think I covered
96  *              them all in the Hopi Indian rain dance of lock requests below.
97  *
98  *              The access type parameter (BT_READ or BT_WRITE) controls whether
99  *              a new root page will be created or not.  If access = BT_READ,
100  *              and no root page exists, we just return InvalidBuffer.  For
101  *              BT_WRITE, we try to create the root page if it doesn't exist.
102  *              NOTE that the returned root page will have only a read lock set
103  *              on it even if access = BT_WRITE!
104  *
105  *              On successful return, the root page is pinned and read-locked.
106  *              The metadata page is not locked or pinned on exit.
107  */
108 Buffer
109 _bt_getroot(Relation rel, int access)
110 {
111         Buffer          metabuf;
112         Page            metapg;
113         BTPageOpaque metaopaque;
114         Buffer          rootbuf;
115         Page            rootpage;
116         BTPageOpaque rootopaque;
117         BlockNumber rootblkno;
118         BTMetaPageData *metad;
119
120         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
121         metapg = BufferGetPage(metabuf);
122         metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg);
123         metad = BTPageGetMeta(metapg);
124
125         if (!(metaopaque->btpo_flags & BTP_META) ||
126                 metad->btm_magic != BTREE_MAGIC)
127                 elog(ERROR, "Index %s is not a btree",
128                          RelationGetRelationName(rel));
129
130         if (metad->btm_version != BTREE_VERSION)
131                 elog(ERROR, "Version mismatch on %s: version %d file, version %d code",
132                          RelationGetRelationName(rel),
133                          metad->btm_version, BTREE_VERSION);
134
135         /* if no root page initialized yet, do it */
136         if (metad->btm_root == P_NONE)
137         {
138                 /* If access = BT_READ, caller doesn't want us to create root yet */
139                 if (access == BT_READ)
140                 {
141                         _bt_relbuf(rel, metabuf);
142                         return InvalidBuffer;
143                 }
144
145                 /* trade in our read lock for a write lock */
146                 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
147                 LockBuffer(metabuf, BT_WRITE);
148
149                 /*
150                  * Race condition:      if someone else initialized the metadata
151                  * between the time we released the read lock and acquired the
152                  * write lock, above, we must avoid doing it again.
153                  */
154                 if (metad->btm_root == P_NONE)
155                 {
156                         /*
157                          * Get, initialize, write, and leave a lock of the appropriate
158                          * type on the new root page.  Since this is the first page in
159                          * the tree, it's a leaf as well as the root.
160                          */
161                         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
162                         rootblkno = BufferGetBlockNumber(rootbuf);
163                         rootpage = BufferGetPage(rootbuf);
164
165                         /* NO ELOG(ERROR) till meta is updated */
166                         START_CRIT_SECTION();
167
168                         metad->btm_root = rootblkno;
169                         metad->btm_level = 1;
170
171                         _bt_pageinit(rootpage, BufferGetPageSize(rootbuf));
172                         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
173                         rootopaque->btpo_flags |= (BTP_LEAF | BTP_ROOT);
174
175                         /* XLOG stuff */
176                         if (!rel->rd_istemp)
177                         {
178                                 xl_btree_newroot xlrec;
179                                 XLogRecPtr      recptr;
180                                 XLogRecData rdata;
181
182                                 xlrec.node = rel->rd_node;
183                                 xlrec.level = 1;
184                                 BlockIdSet(&(xlrec.rootblk), rootblkno);
185                                 rdata.buffer = InvalidBuffer;
186                                 rdata.data = (char *) &xlrec;
187                                 rdata.len = SizeOfBtreeNewroot;
188                                 rdata.next = NULL;
189
190                                 recptr = XLogInsert(RM_BTREE_ID,
191                                                                         XLOG_BTREE_NEWROOT | XLOG_BTREE_LEAF,
192                                                                         &rdata);
193
194                                 PageSetLSN(rootpage, recptr);
195                                 PageSetSUI(rootpage, ThisStartUpID);
196                                 PageSetLSN(metapg, recptr);
197                                 PageSetSUI(metapg, ThisStartUpID);
198                         }
199
200                         END_CRIT_SECTION();
201
202                         _bt_wrtnorelbuf(rel, rootbuf);
203
204                         /* swap write lock for read lock */
205                         LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK);
206                         LockBuffer(rootbuf, BT_READ);
207
208                         /* okay, metadata is correct, write and release it */
209                         _bt_wrtbuf(rel, metabuf);
210                 }
211                 else
212                 {
213                         /*
214                          * Metadata initialized by someone else.  In order to
215                          * guarantee no deadlocks, we have to release the metadata
216                          * page and start all over again.
217                          */
218                         _bt_relbuf(rel, metabuf);
219                         return _bt_getroot(rel, access);
220                 }
221         }
222         else
223         {
224                 rootblkno = metad->btm_root;
225                 _bt_relbuf(rel, metabuf);               /* done with the meta page */
226
227                 rootbuf = _bt_getbuf(rel, rootblkno, BT_READ);
228         }
229
230         /*
231          * Race condition:      If the root page split between the time we looked
232          * at the metadata page and got the root buffer, then we got the wrong
233          * buffer.      Release it and try again.
234          */
235         rootpage = BufferGetPage(rootbuf);
236         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
237
238         if (!P_ISROOT(rootopaque))
239         {
240                 /*
241                  * It happened, but if root page splitter failed to create new
242                  * root page then we'll go in loop trying to call _bt_getroot
243                  * again and again.
244                  */
245                 if (FixBTree)
246                 {
247                         Buffer          newrootbuf;
248
249         check_parent:;
250                         if (BTreeInvalidParent(rootopaque)) /* unupdated! */
251                         {
252                                 LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK);
253                                 LockBuffer(rootbuf, BT_WRITE);
254
255                                 /* handle concurrent fix of root page */
256                                 if (BTreeInvalidParent(rootopaque))             /* unupdated! */
257                                 {
258                                         elog(WARNING, "bt_getroot[%s]: fixing root page", RelationGetRelationName(rel));
259                                         newrootbuf = _bt_fixroot(rel, rootbuf, true);
260                                         LockBuffer(newrootbuf, BUFFER_LOCK_UNLOCK);
261                                         LockBuffer(newrootbuf, BT_READ);
262                                         rootbuf = newrootbuf;
263                                         rootpage = BufferGetPage(rootbuf);
264                                         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
265                                         /* New root might be splitted while changing lock */
266                                         if (P_ISROOT(rootopaque))
267                                                 return (rootbuf);
268                                         /* rootbuf is read locked */
269                                         goto check_parent;
270                                 }
271                                 else
272                                 {
273                                         /* someone else already fixed root */
274                                         LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK);
275                                         LockBuffer(rootbuf, BT_READ);
276                                 }
277                         }
278
279                         /*
280                          * Ok, here we have old root page with btpo_parent pointing to
281                          * upper level - check parent page because of there is good
282                          * chance that parent is root page.
283                          */
284                         newrootbuf = _bt_getbuf(rel, rootopaque->btpo_parent, BT_READ);
285                         _bt_relbuf(rel, rootbuf);
286                         rootbuf = newrootbuf;
287                         rootpage = BufferGetPage(rootbuf);
288                         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
289                         if (P_ISROOT(rootopaque))
290                                 return (rootbuf);
291                         /* no luck -:( */
292                 }
293
294                 /* try again */
295                 _bt_relbuf(rel, rootbuf);
296                 return _bt_getroot(rel, access);
297         }
298
299         /*
300          * By here, we have a correct lock on the root block, its reference
301          * count is correct, and we have no lock set on the metadata page.
302          * Return the root block.
303          */
304         return rootbuf;
305 }
306
307 /*
308  *      _bt_getbuf() -- Get a buffer by block number for read or write.
309  *
310  *              When this routine returns, the appropriate lock is set on the
311  *              requested buffer and its reference count has been incremented
312  *              (ie, the buffer is "locked and pinned").
313  */
314 Buffer
315 _bt_getbuf(Relation rel, BlockNumber blkno, int access)
316 {
317         Buffer          buf;
318
319         if (blkno != P_NEW)
320         {
321                 /* Read an existing block of the relation */
322                 buf = ReadBuffer(rel, blkno);
323                 LockBuffer(buf, access);
324         }
325         else
326         {
327                 Page            page;
328
329                 /*
330                  * Extend the relation by one page.
331                  *
332                  * Extend bufmgr code is unclean and so we have to use extra locking
333                  * here.
334                  */
335                 LockPage(rel, 0, ExclusiveLock);
336                 buf = ReadBuffer(rel, blkno);
337                 LockBuffer(buf, access);
338                 UnlockPage(rel, 0, ExclusiveLock);
339
340                 /* Initialize the new page before returning it */
341                 page = BufferGetPage(buf);
342                 _bt_pageinit(page, BufferGetPageSize(buf));
343         }
344
345         /* ref count and lock type are correct */
346         return buf;
347 }
348
349 /*
350  *      _bt_relbuf() -- release a locked buffer.
351  *
352  * Lock and pin (refcount) are both dropped.  Note that either read or
353  * write lock can be dropped this way, but if we modified the buffer,
354  * this is NOT the right way to release a write lock.
355  */
356 void
357 _bt_relbuf(Relation rel, Buffer buf)
358 {
359         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
360         ReleaseBuffer(buf);
361 }
362
363 /*
364  *      _bt_wrtbuf() -- write a btree page to disk.
365  *
366  *              This routine releases the lock held on the buffer and our refcount
367  *              for it.  It is an error to call _bt_wrtbuf() without a write lock
368  *              and a pin on the buffer.
369  *
370  * NOTE: actually, the buffer manager just marks the shared buffer page
371  * dirty here, the real I/O happens later.      Since we can't persuade the
372  * Unix kernel to schedule disk writes in a particular order, there's not
373  * much point in worrying about this.  The most we can say is that all the
374  * writes will occur before commit.
375  */
376 void
377 _bt_wrtbuf(Relation rel, Buffer buf)
378 {
379         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
380         WriteBuffer(buf);
381 }
382
383 /*
384  *      _bt_wrtnorelbuf() -- write a btree page to disk, but do not release
385  *                                               our reference or lock.
386  *
387  *              It is an error to call _bt_wrtnorelbuf() without a write lock
388  *              and a pin on the buffer.
389  *
390  * See above NOTE.
391  */
392 void
393 _bt_wrtnorelbuf(Relation rel, Buffer buf)
394 {
395         WriteNoReleaseBuffer(buf);
396 }
397
398 /*
399  *      _bt_pageinit() -- Initialize a new page.
400  */
401 void
402 _bt_pageinit(Page page, Size size)
403 {
404         PageInit(page, size, sizeof(BTPageOpaqueData));
405         ((BTPageOpaque) PageGetSpecialPointer(page))->btpo_parent =
406                 InvalidBlockNumber;
407 }
408
409 /*
410  *      _bt_metaproot() -- Change the root page of the btree.
411  *
412  *              Lehman and Yao require that the root page move around in order to
413  *              guarantee deadlock-free short-term, fine-granularity locking.  When
414  *              we split the root page, we record the new parent in the metadata page
415  *              for the relation.  This routine does the work.
416  *
417  *              No direct preconditions, but if you don't have the write lock on
418  *              at least the old root page when you call this, you're making a big
419  *              mistake.  On exit, metapage data is correct and we no longer have
420  *              a pin or lock on the metapage.
421  */
422 void
423 _bt_metaproot(Relation rel, BlockNumber rootbknum, int level)
424 {
425         Buffer          metabuf;
426         Page            metap;
427         BTPageOpaque metaopaque;
428         BTMetaPageData *metad;
429
430         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
431         metap = BufferGetPage(metabuf);
432         metaopaque = (BTPageOpaque) PageGetSpecialPointer(metap);
433         Assert(metaopaque->btpo_flags & BTP_META);
434         metad = BTPageGetMeta(metap);
435         metad->btm_root = rootbknum;
436         if (level == 0)                         /* called from _do_insert */
437                 metad->btm_level += 1;
438         else
439                 metad->btm_level = level;               /* called from btsort */
440         _bt_wrtbuf(rel, metabuf);
441 }
442
443 /*
444  * Delete an item from a btree page.
445  *
446  * This routine assumes that the caller has pinned and locked the buffer,
447  * and will write the buffer afterwards.
448  */
449 void
450 _bt_itemdel(Relation rel, Buffer buf, ItemPointer tid)
451 {
452         Page            page = BufferGetPage(buf);
453         OffsetNumber offno;
454
455         offno = ItemPointerGetOffsetNumber(tid);
456
457         START_CRIT_SECTION();
458
459         PageIndexTupleDelete(page, offno);
460
461         /* XLOG stuff */
462         if (!rel->rd_istemp)
463         {
464                 xl_btree_delete xlrec;
465                 XLogRecPtr      recptr;
466                 XLogRecData rdata[2];
467
468                 xlrec.target.node = rel->rd_node;
469                 xlrec.target.tid = *tid;
470                 rdata[0].buffer = InvalidBuffer;
471                 rdata[0].data = (char *) &xlrec;
472                 rdata[0].len = SizeOfBtreeDelete;
473                 rdata[0].next = &(rdata[1]);
474
475                 rdata[1].buffer = buf;
476                 rdata[1].data = NULL;
477                 rdata[1].len = 0;
478                 rdata[1].next = NULL;
479
480                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata);
481
482                 PageSetLSN(page, recptr);
483                 PageSetSUI(page, ThisStartUpID);
484         }
485
486         END_CRIT_SECTION();
487 }