OSDN Git Service

Tweak indexscan and seqscan code to arrange that steps from one page to
[pg-rex/syncrep.git] / src / backend / access / nbtree / nbtpage.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtpage.c
4  *        BTree-specific page management code for the Postgres btree access
5  *        method.
6  *
7  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtpage.c,v 1.75 2004/04/21 18:24:25 tgl Exp $
13  *
14  *      NOTES
15  *         Postgres btree pages look like ordinary relation pages.      The opaque
16  *         data at high addresses includes pointers to left and right siblings
17  *         and flag data describing page state.  The first page in a btree, page
18  *         zero, is special -- it stores meta-information describing the tree.
19  *         Pages one and higher store the actual tree data.
20  *
21  *-------------------------------------------------------------------------
22  */
23 #include "postgres.h"
24
25 #include "access/nbtree.h"
26 #include "miscadmin.h"
27 #include "storage/freespace.h"
28 #include "storage/lmgr.h"
29
30
31 /*
32  *      _bt_metapinit() -- Initialize the metadata page of a new btree.
33  *
34  * If markvalid is true, the index is immediately marked valid, else it
35  * will be invalid until _bt_metaproot() is called.
36  *
37  * Note: there's no real need for any locking here.  Since the transaction
38  * creating the index hasn't committed yet, no one else can even see the index
39  * much less be trying to use it.  (In a REINDEX-in-place scenario, that's
40  * not true, but we assume the caller holds sufficient locks on the index.)
41  */
42 void
43 _bt_metapinit(Relation rel, bool markvalid)
44 {
45         Buffer          buf;
46         Page            pg;
47         BTMetaPageData *metad;
48         BTPageOpaque op;
49
50         if (RelationGetNumberOfBlocks(rel) != 0)
51                 elog(ERROR, "cannot initialize non-empty btree index \"%s\"",
52                          RelationGetRelationName(rel));
53
54         buf = ReadBuffer(rel, P_NEW);
55         Assert(BufferGetBlockNumber(buf) == BTREE_METAPAGE);
56         pg = BufferGetPage(buf);
57
58         /* NO ELOG(ERROR) from here till newmeta op is logged */
59         START_CRIT_SECTION();
60
61         _bt_pageinit(pg, BufferGetPageSize(buf));
62
63         metad = BTPageGetMeta(pg);
64         metad->btm_magic = markvalid ? BTREE_MAGIC : 0;
65         metad->btm_version = BTREE_VERSION;
66         metad->btm_root = P_NONE;
67         metad->btm_level = 0;
68         metad->btm_fastroot = P_NONE;
69         metad->btm_fastlevel = 0;
70
71         op = (BTPageOpaque) PageGetSpecialPointer(pg);
72         op->btpo_flags = BTP_META;
73
74         /* XLOG stuff */
75         if (!rel->rd_istemp)
76         {
77                 xl_btree_newmeta xlrec;
78                 XLogRecPtr      recptr;
79                 XLogRecData rdata[1];
80
81                 xlrec.node = rel->rd_node;
82                 xlrec.meta.root = metad->btm_root;
83                 xlrec.meta.level = metad->btm_level;
84                 xlrec.meta.fastroot = metad->btm_fastroot;
85                 xlrec.meta.fastlevel = metad->btm_fastlevel;
86
87                 rdata[0].buffer = InvalidBuffer;
88                 rdata[0].data = (char *) &xlrec;
89                 rdata[0].len = SizeOfBtreeNewmeta;
90                 rdata[0].next = NULL;
91
92                 recptr = XLogInsert(RM_BTREE_ID,
93                                                         markvalid ? XLOG_BTREE_NEWMETA : XLOG_BTREE_INVALIDMETA,
94                                                         rdata);
95
96                 PageSetLSN(pg, recptr);
97                 PageSetSUI(pg, ThisStartUpID);
98         }
99
100         END_CRIT_SECTION();
101
102         WriteBuffer(buf);
103 }
104
105 /*
106  *      _bt_getroot() -- Get the root page of the btree.
107  *
108  *              Since the root page can move around the btree file, we have to read
109  *              its location from the metadata page, and then read the root page
110  *              itself.  If no root page exists yet, we have to create one.  The
111  *              standard class of race conditions exists here; I think I covered
112  *              them all in the Hopi Indian rain dance of lock requests below.
113  *
114  *              The access type parameter (BT_READ or BT_WRITE) controls whether
115  *              a new root page will be created or not.  If access = BT_READ,
116  *              and no root page exists, we just return InvalidBuffer.  For
117  *              BT_WRITE, we try to create the root page if it doesn't exist.
118  *              NOTE that the returned root page will have only a read lock set
119  *              on it even if access = BT_WRITE!
120  *
121  *              The returned page is not necessarily the true root --- it could be
122  *              a "fast root" (a page that is alone in its level due to deletions).
123  *              Also, if the root page is split while we are "in flight" to it,
124  *              what we will return is the old root, which is now just the leftmost
125  *              page on a probably-not-very-wide level.  For most purposes this is
126  *              as good as or better than the true root, so we do not bother to
127  *              insist on finding the true root.  We do, however, guarantee to
128  *              return a live (not deleted or half-dead) page.
129  *
130  *              On successful return, the root page is pinned and read-locked.
131  *              The metadata page is not locked or pinned on exit.
132  */
133 Buffer
134 _bt_getroot(Relation rel, int access)
135 {
136         Buffer          metabuf;
137         Page            metapg;
138         BTPageOpaque metaopaque;
139         Buffer          rootbuf;
140         Page            rootpage;
141         BTPageOpaque rootopaque;
142         BlockNumber rootblkno;
143         uint32          rootlevel;
144         BTMetaPageData *metad;
145
146         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
147         metapg = BufferGetPage(metabuf);
148         metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg);
149         metad = BTPageGetMeta(metapg);
150
151         /* sanity-check the metapage */
152         if (!(metaopaque->btpo_flags & BTP_META) ||
153                 metad->btm_magic != BTREE_MAGIC)
154                 ereport(ERROR,
155                                 (errcode(ERRCODE_INDEX_CORRUPTED),
156                                  errmsg("index \"%s\" is not a btree",
157                                                 RelationGetRelationName(rel))));
158
159         if (metad->btm_version != BTREE_VERSION)
160                 ereport(ERROR,
161                                 (errcode(ERRCODE_INDEX_CORRUPTED),
162                                  errmsg("version mismatch in index \"%s\": file version %d, code version %d",
163                                                 RelationGetRelationName(rel),
164                                                 metad->btm_version, BTREE_VERSION)));
165
166         /* if no root page initialized yet, do it */
167         if (metad->btm_root == P_NONE)
168         {
169                 /* If access = BT_READ, caller doesn't want us to create root yet */
170                 if (access == BT_READ)
171                 {
172                         _bt_relbuf(rel, metabuf);
173                         return InvalidBuffer;
174                 }
175
176                 /* trade in our read lock for a write lock */
177                 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
178                 LockBuffer(metabuf, BT_WRITE);
179
180                 /*
181                  * Race condition:      if someone else initialized the metadata
182                  * between the time we released the read lock and acquired the
183                  * write lock, we must avoid doing it again.
184                  */
185                 if (metad->btm_root != P_NONE)
186                 {
187                         /*
188                          * Metadata initialized by someone else.  In order to
189                          * guarantee no deadlocks, we have to release the metadata
190                          * page and start all over again.  (Is that really true? But
191                          * it's hardly worth trying to optimize this case.)
192                          */
193                         _bt_relbuf(rel, metabuf);
194                         return _bt_getroot(rel, access);
195                 }
196
197                 /*
198                  * Get, initialize, write, and leave a lock of the appropriate
199                  * type on the new root page.  Since this is the first page in the
200                  * tree, it's a leaf as well as the root.
201                  */
202                 rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
203                 rootblkno = BufferGetBlockNumber(rootbuf);
204                 rootpage = BufferGetPage(rootbuf);
205
206                 _bt_pageinit(rootpage, BufferGetPageSize(rootbuf));
207                 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
208                 rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
209                 rootopaque->btpo_flags = (BTP_LEAF | BTP_ROOT);
210                 rootopaque->btpo.level = 0;
211
212                 /* NO ELOG(ERROR) till meta is updated */
213                 START_CRIT_SECTION();
214
215                 metad->btm_root = rootblkno;
216                 metad->btm_level = 0;
217                 metad->btm_fastroot = rootblkno;
218                 metad->btm_fastlevel = 0;
219
220                 /* XLOG stuff */
221                 if (!rel->rd_istemp)
222                 {
223                         xl_btree_newroot xlrec;
224                         XLogRecPtr      recptr;
225                         XLogRecData rdata;
226
227                         xlrec.node = rel->rd_node;
228                         xlrec.rootblk = rootblkno;
229                         xlrec.level = 0;
230
231                         rdata.buffer = InvalidBuffer;
232                         rdata.data = (char *) &xlrec;
233                         rdata.len = SizeOfBtreeNewroot;
234                         rdata.next = NULL;
235
236                         recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, &rdata);
237
238                         PageSetLSN(rootpage, recptr);
239                         PageSetSUI(rootpage, ThisStartUpID);
240                         PageSetLSN(metapg, recptr);
241                         PageSetSUI(metapg, ThisStartUpID);
242                 }
243
244                 END_CRIT_SECTION();
245
246                 _bt_wrtnorelbuf(rel, rootbuf);
247
248                 /*
249                  * swap root write lock for read lock.  There is no danger of
250                  * anyone else accessing the new root page while it's unlocked,
251                  * since no one else knows where it is yet.
252                  */
253                 LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK);
254                 LockBuffer(rootbuf, BT_READ);
255
256                 /* okay, metadata is correct, write and release it */
257                 _bt_wrtbuf(rel, metabuf);
258         }
259         else
260         {
261                 rootblkno = metad->btm_fastroot;
262                 Assert(rootblkno != P_NONE);
263                 rootlevel = metad->btm_fastlevel;
264
265                 /*
266                  * We are done with the metapage; arrange to release it via
267                  * first _bt_relandgetbuf call
268                  */
269                 rootbuf = metabuf;
270
271                 for (;;)
272                 {
273                         rootbuf = _bt_relandgetbuf(rel, rootbuf, rootblkno, BT_READ);
274                         rootpage = BufferGetPage(rootbuf);
275                         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
276
277                         if (!P_IGNORE(rootopaque))
278                                 break;
279
280                         /* it's dead, Jim.  step right one page */
281                         if (P_RIGHTMOST(rootopaque))
282                                 elog(ERROR, "no live root page found in \"%s\"",
283                                          RelationGetRelationName(rel));
284                         rootblkno = rootopaque->btpo_next;
285                 }
286
287                 /* Note: can't check btpo.level on deleted pages */
288                 if (rootopaque->btpo.level != rootlevel)
289                         elog(ERROR, "root page %u of \"%s\" has level %u, expected %u",
290                                  rootblkno, RelationGetRelationName(rel),
291                                  rootopaque->btpo.level, rootlevel);
292         }
293
294         /*
295          * By here, we have a pin and read lock on the root page, and no lock
296          * set on the metadata page.  Return the root page's buffer.
297          */
298         return rootbuf;
299 }
300
301 /*
302  *      _bt_gettrueroot() -- Get the true root page of the btree.
303  *
304  *              This is the same as the BT_READ case of _bt_getroot(), except
305  *              we follow the true-root link not the fast-root link.
306  *
307  * By the time we acquire lock on the root page, it might have been split and
308  * not be the true root anymore.  This is okay for the present uses of this
309  * routine; we only really need to be able to move up at least one tree level
310  * from whatever non-root page we were at.      If we ever do need to lock the
311  * one true root page, we could loop here, re-reading the metapage on each
312  * failure.  (Note that it wouldn't do to hold the lock on the metapage while
313  * moving to the root --- that'd deadlock against any concurrent root split.)
314  */
315 Buffer
316 _bt_gettrueroot(Relation rel)
317 {
318         Buffer          metabuf;
319         Page            metapg;
320         BTPageOpaque metaopaque;
321         Buffer          rootbuf;
322         Page            rootpage;
323         BTPageOpaque rootopaque;
324         BlockNumber rootblkno;
325         uint32          rootlevel;
326         BTMetaPageData *metad;
327
328         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
329         metapg = BufferGetPage(metabuf);
330         metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg);
331         metad = BTPageGetMeta(metapg);
332
333         if (!(metaopaque->btpo_flags & BTP_META) ||
334                 metad->btm_magic != BTREE_MAGIC)
335                 ereport(ERROR,
336                                 (errcode(ERRCODE_INDEX_CORRUPTED),
337                                  errmsg("index \"%s\" is not a btree",
338                                                 RelationGetRelationName(rel))));
339
340         if (metad->btm_version != BTREE_VERSION)
341                 ereport(ERROR,
342                                 (errcode(ERRCODE_INDEX_CORRUPTED),
343                                  errmsg("version mismatch in index \"%s\": file version %d, code version %d",
344                                                 RelationGetRelationName(rel),
345                                                 metad->btm_version, BTREE_VERSION)));
346
347         /* if no root page initialized yet, fail */
348         if (metad->btm_root == P_NONE)
349         {
350                 _bt_relbuf(rel, metabuf);
351                 return InvalidBuffer;
352         }
353
354         rootblkno = metad->btm_root;
355         rootlevel = metad->btm_level;
356
357         /*
358          * We are done with the metapage; arrange to release it via
359          * first _bt_relandgetbuf call
360          */
361         rootbuf = metabuf;
362
363         for (;;)
364         {
365                 rootbuf = _bt_relandgetbuf(rel, rootbuf, rootblkno, BT_READ);
366                 rootpage = BufferGetPage(rootbuf);
367                 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
368
369                 if (!P_IGNORE(rootopaque))
370                         break;
371
372                 /* it's dead, Jim.  step right one page */
373                 if (P_RIGHTMOST(rootopaque))
374                         elog(ERROR, "no live root page found in \"%s\"",
375                                  RelationGetRelationName(rel));
376                 rootblkno = rootopaque->btpo_next;
377         }
378
379         /* Note: can't check btpo.level on deleted pages */
380         if (rootopaque->btpo.level != rootlevel)
381                 elog(ERROR, "root page %u of \"%s\" has level %u, expected %u",
382                          rootblkno, RelationGetRelationName(rel),
383                          rootopaque->btpo.level, rootlevel);
384
385         return rootbuf;
386 }
387
388 /*
389  *      _bt_getbuf() -- Get a buffer by block number for read or write.
390  *
391  *              blkno == P_NEW means to get an unallocated index page.
392  *
393  *              When this routine returns, the appropriate lock is set on the
394  *              requested buffer and its reference count has been incremented
395  *              (ie, the buffer is "locked and pinned").
396  */
397 Buffer
398 _bt_getbuf(Relation rel, BlockNumber blkno, int access)
399 {
400         Buffer          buf;
401
402         if (blkno != P_NEW)
403         {
404                 /* Read an existing block of the relation */
405                 buf = ReadBuffer(rel, blkno);
406                 LockBuffer(buf, access);
407         }
408         else
409         {
410                 bool            needLock;
411                 Page            page;
412
413                 Assert(access == BT_WRITE);
414
415                 /*
416                  * First see if the FSM knows of any free pages.
417                  *
418                  * We can't trust the FSM's report unreservedly; we have to check
419                  * that the page is still free.  (For example, an already-free
420                  * page could have been re-used between the time the last VACUUM
421                  * scanned it and the time the VACUUM made its FSM updates.)
422                  *
423                  * In fact, it's worse than that: we can't even assume that it's
424                  * safe to take a lock on the reported page.  If somebody else
425                  * has a lock on it, or even worse our own caller does, we could
426                  * deadlock.  (The own-caller scenario is actually not improbable.
427                  * Consider an index on a serial or timestamp column.  Nearly all
428                  * splits will be at the rightmost page, so it's entirely likely
429                  * that _bt_split will call us while holding a lock on the page most
430                  * recently acquired from FSM.  A VACUUM running concurrently with
431                  * the previous split could well have placed that page back in FSM.)
432                  *
433                  * To get around that, we ask for only a conditional lock on the
434                  * reported page.  If we fail, then someone else is using the page,
435                  * and we may reasonably assume it's not free.  (If we happen to be
436                  * wrong, the worst consequence is the page will be lost to use till
437                  * the next VACUUM, which is no big problem.)
438                  */
439                 for (;;)
440                 {
441                         blkno = GetFreeIndexPage(&rel->rd_node);
442                         if (blkno == InvalidBlockNumber)
443                                 break;
444                         buf = ReadBuffer(rel, blkno);
445                         if (ConditionalLockBuffer(buf))
446                         {
447                                 page = BufferGetPage(buf);
448                                 if (_bt_page_recyclable(page))
449                                 {
450                                         /* Okay to use page.  Re-initialize and return it */
451                                         _bt_pageinit(page, BufferGetPageSize(buf));
452                                         return buf;
453                                 }
454                                 elog(DEBUG2, "FSM returned nonrecyclable page");
455                                 _bt_relbuf(rel, buf);
456                         }
457                         else
458                         {
459                                 elog(DEBUG2, "FSM returned nonlockable page");
460                                 /* couldn't get lock, so just drop pin */
461                                 ReleaseBuffer(buf);
462                         }
463                 }
464
465                 /*
466                  * Extend the relation by one page.
467                  *
468                  * We have to use a lock to ensure no one else is extending the rel
469                  * at the same time, else we will both try to initialize the same
470                  * new page.  We can skip locking for new or temp relations,
471                  * however, since no one else could be accessing them.
472                  */
473                 needLock = !(rel->rd_isnew || rel->rd_istemp);
474
475                 if (needLock)
476                         LockPage(rel, 0, ExclusiveLock);
477
478                 buf = ReadBuffer(rel, P_NEW);
479
480                 /*
481                  * Release the file-extension lock; it's now OK for someone else
482                  * to extend the relation some more.
483                  */
484                 if (needLock)
485                         UnlockPage(rel, 0, ExclusiveLock);
486
487                 /* Acquire appropriate buffer lock on new page */
488                 LockBuffer(buf, access);
489
490                 /* Initialize the new page before returning it */
491                 page = BufferGetPage(buf);
492                 _bt_pageinit(page, BufferGetPageSize(buf));
493         }
494
495         /* ref count and lock type are correct */
496         return buf;
497 }
498
499 /*
500  *      _bt_relandgetbuf() -- release a locked buffer and get another one.
501  *
502  * This is equivalent to _bt_relbuf followed by _bt_getbuf, with the
503  * exception that blkno may not be P_NEW.  Also, if obuf is InvalidBuffer
504  * then it reduces to just _bt_getbuf; allowing this case simplifies some
505  * callers. The motivation for using this is to avoid two entries to the
506  * bufmgr when one will do.
507  */
508 Buffer
509 _bt_relandgetbuf(Relation rel, Buffer obuf, BlockNumber blkno, int access)
510 {
511         Buffer          buf;
512
513         Assert(blkno != P_NEW);
514         if (BufferIsValid(obuf))
515                 LockBuffer(obuf, BUFFER_LOCK_UNLOCK);
516         buf = ReleaseAndReadBuffer(obuf, rel, blkno);
517         LockBuffer(buf, access);
518         return buf;
519 }
520
521 /*
522  *      _bt_relbuf() -- release a locked buffer.
523  *
524  * Lock and pin (refcount) are both dropped.  Note that either read or
525  * write lock can be dropped this way, but if we modified the buffer,
526  * this is NOT the right way to release a write lock.
527  */
528 void
529 _bt_relbuf(Relation rel, Buffer buf)
530 {
531         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
532         ReleaseBuffer(buf);
533 }
534
535 /*
536  *      _bt_wrtbuf() -- write a btree page to disk.
537  *
538  *              This routine releases the lock held on the buffer and our refcount
539  *              for it.  It is an error to call _bt_wrtbuf() without a write lock
540  *              and a pin on the buffer.
541  *
542  * NOTE: actually, the buffer manager just marks the shared buffer page
543  * dirty here; the real I/O happens later.      This is okay since we are not
544  * relying on write ordering anyway.  The WAL mechanism is responsible for
545  * guaranteeing correctness after a crash.
546  */
547 void
548 _bt_wrtbuf(Relation rel, Buffer buf)
549 {
550         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
551         WriteBuffer(buf);
552 }
553
554 /*
555  *      _bt_wrtnorelbuf() -- write a btree page to disk, but do not release
556  *                                               our reference or lock.
557  *
558  *              It is an error to call _bt_wrtnorelbuf() without a write lock
559  *              and a pin on the buffer.
560  *
561  * See above NOTE.
562  */
563 void
564 _bt_wrtnorelbuf(Relation rel, Buffer buf)
565 {
566         WriteNoReleaseBuffer(buf);
567 }
568
569 /*
570  *      _bt_pageinit() -- Initialize a new page.
571  *
572  * On return, the page header is initialized; data space is empty;
573  * special space is zeroed out.
574  */
575 void
576 _bt_pageinit(Page page, Size size)
577 {
578         PageInit(page, size, sizeof(BTPageOpaqueData));
579 }
580
581 /*
582  *      _bt_page_recyclable() -- Is an existing page recyclable?
583  *
584  * This exists to make sure _bt_getbuf and btvacuumcleanup have the same
585  * policy about whether a page is safe to re-use.
586  */
587 bool
588 _bt_page_recyclable(Page page)
589 {
590         BTPageOpaque opaque;
591
592         /*
593          * It's possible to find an all-zeroes page in an index --- for
594          * example, a backend might successfully extend the relation one page
595          * and then crash before it is able to make a WAL entry for adding the
596          * page. If we find a zeroed page then reclaim it.
597          */
598         if (PageIsNew(page))
599                 return true;
600
601         /*
602          * Otherwise, recycle if deleted and too old to have any processes
603          * interested in it.
604          */
605         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
606         if (P_ISDELETED(opaque) &&
607                 TransactionIdPrecedesOrEquals(opaque->btpo.xact, RecentXmin))
608                 return true;
609         return false;
610 }
611
612 /*
613  *      _bt_metaproot() -- Change the root page of the btree.
614  *
615  *              Lehman and Yao require that the root page move around in order to
616  *              guarantee deadlock-free short-term, fine-granularity locking.  When
617  *              we split the root page, we record the new parent in the metadata page
618  *              for the relation.  This routine does the work.
619  *
620  *              No direct preconditions, but if you don't have the write lock on
621  *              at least the old root page when you call this, you're making a big
622  *              mistake.  On exit, metapage data is correct and we no longer have
623  *              a pin or lock on the metapage.
624  *
625  * Actually this is not used for splitting on-the-fly anymore.  It's only used
626  * in nbtsort.c at the completion of btree building, where we know we have
627  * sole access to the index anyway.
628  */
629 void
630 _bt_metaproot(Relation rel, BlockNumber rootbknum, uint32 level)
631 {
632         Buffer          metabuf;
633         Page            metap;
634         BTPageOpaque metaopaque;
635         BTMetaPageData *metad;
636
637         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
638         metap = BufferGetPage(metabuf);
639         metaopaque = (BTPageOpaque) PageGetSpecialPointer(metap);
640         Assert(metaopaque->btpo_flags & BTP_META);
641
642         /* NO ELOG(ERROR) from here till newmeta op is logged */
643         START_CRIT_SECTION();
644
645         metad = BTPageGetMeta(metap);
646         Assert(metad->btm_magic == BTREE_MAGIC || metad->btm_magic == 0);
647         metad->btm_magic = BTREE_MAGIC;         /* it's valid now for sure */
648         metad->btm_root = rootbknum;
649         metad->btm_level = level;
650         metad->btm_fastroot = rootbknum;
651         metad->btm_fastlevel = level;
652
653         /* XLOG stuff */
654         if (!rel->rd_istemp)
655         {
656                 xl_btree_newmeta xlrec;
657                 XLogRecPtr      recptr;
658                 XLogRecData rdata[1];
659
660                 xlrec.node = rel->rd_node;
661                 xlrec.meta.root = metad->btm_root;
662                 xlrec.meta.level = metad->btm_level;
663                 xlrec.meta.fastroot = metad->btm_fastroot;
664                 xlrec.meta.fastlevel = metad->btm_fastlevel;
665
666                 rdata[0].buffer = InvalidBuffer;
667                 rdata[0].data = (char *) &xlrec;
668                 rdata[0].len = SizeOfBtreeNewmeta;
669                 rdata[0].next = NULL;
670
671                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWMETA, rdata);
672
673                 PageSetLSN(metap, recptr);
674                 PageSetSUI(metap, ThisStartUpID);
675         }
676
677         END_CRIT_SECTION();
678
679         _bt_wrtbuf(rel, metabuf);
680 }
681
682 /*
683  * Delete item(s) from a btree page.
684  *
685  * This must only be used for deleting leaf items.      Deleting an item on a
686  * non-leaf page has to be done as part of an atomic action that includes
687  * deleting the page it points to.
688  *
689  * This routine assumes that the caller has pinned and locked the buffer,
690  * and will write the buffer afterwards.  Also, the given itemnos *must*
691  * appear in increasing order in the array.
692  */
693 void
694 _bt_delitems(Relation rel, Buffer buf,
695                          OffsetNumber *itemnos, int nitems)
696 {
697         Page            page = BufferGetPage(buf);
698         int                     i;
699
700         /* No ereport(ERROR) until changes are logged */
701         START_CRIT_SECTION();
702
703         /*
704          * Delete the items in reverse order so we don't have to think about
705          * adjusting item numbers for previous deletions.
706          */
707         for (i = nitems - 1; i >= 0; i--)
708                 PageIndexTupleDelete(page, itemnos[i]);
709
710         /* XLOG stuff */
711         if (!rel->rd_istemp)
712         {
713                 xl_btree_delete xlrec;
714                 XLogRecPtr      recptr;
715                 XLogRecData rdata[2];
716
717                 xlrec.node = rel->rd_node;
718                 xlrec.block = BufferGetBlockNumber(buf);
719
720                 rdata[0].buffer = InvalidBuffer;
721                 rdata[0].data = (char *) &xlrec;
722                 rdata[0].len = SizeOfBtreeDelete;
723                 rdata[0].next = &(rdata[1]);
724
725                 /*
726                  * The target-offsets array is not in the buffer, but pretend that
727                  * it is.  When XLogInsert stores the whole buffer, the offsets
728                  * array need not be stored too.
729                  */
730                 rdata[1].buffer = buf;
731                 if (nitems > 0)
732                 {
733                         rdata[1].data = (char *) itemnos;
734                         rdata[1].len = nitems * sizeof(OffsetNumber);
735                 }
736                 else
737                 {
738                         rdata[1].data = NULL;
739                         rdata[1].len = 0;
740                 }
741                 rdata[1].next = NULL;
742
743                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata);
744
745                 PageSetLSN(page, recptr);
746                 PageSetSUI(page, ThisStartUpID);
747         }
748
749         END_CRIT_SECTION();
750 }
751
752 /*
753  * _bt_pagedel() -- Delete a page from the b-tree.
754  *
755  * This action unlinks the page from the b-tree structure, removing all
756  * pointers leading to it --- but not touching its own left and right links.
757  * The page cannot be physically reclaimed right away, since other processes
758  * may currently be trying to follow links leading to the page; they have to
759  * be allowed to use its right-link to recover.  See nbtree/README.
760  *
761  * On entry, the target buffer must be pinned and read-locked.  This lock and
762  * pin will be dropped before exiting.
763  *
764  * Returns the number of pages successfully deleted (zero on failure; could
765  * be more than one if parent blocks were deleted).
766  *
767  * NOTE: this leaks memory.  Rather than trying to clean up everything
768  * carefully, it's better to run it in a temp context that can be reset
769  * frequently.
770  */
771 int
772 _bt_pagedel(Relation rel, Buffer buf, bool vacuum_full)
773 {
774         BlockNumber target,
775                                 leftsib,
776                                 rightsib,
777                                 parent;
778         OffsetNumber poffset,
779                                 maxoff;
780         uint32          targetlevel,
781                                 ilevel;
782         ItemId          itemid;
783         BTItem          targetkey,
784                                 btitem;
785         ScanKey         itup_scankey;
786         BTStack         stack;
787         Buffer          lbuf,
788                                 rbuf,
789                                 pbuf;
790         bool            parent_half_dead;
791         bool            parent_one_child;
792         bool            rightsib_empty;
793         Buffer          metabuf = InvalidBuffer;
794         Page            metapg = NULL;
795         BTMetaPageData *metad = NULL;
796         Page            page;
797         BTPageOpaque opaque;
798
799         /*
800          * We can never delete rightmost pages nor root pages.  While at it,
801          * check that page is not already deleted and is empty.
802          */
803         page = BufferGetPage(buf);
804         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
805         if (P_RIGHTMOST(opaque) || P_ISROOT(opaque) || P_ISDELETED(opaque) ||
806                 P_FIRSTDATAKEY(opaque) <= PageGetMaxOffsetNumber(page))
807         {
808                 _bt_relbuf(rel, buf);
809                 return 0;
810         }
811
812         /*
813          * Save info about page, including a copy of its high key (it must
814          * have one, being non-rightmost).
815          */
816         target = BufferGetBlockNumber(buf);
817         targetlevel = opaque->btpo.level;
818         leftsib = opaque->btpo_prev;
819         itemid = PageGetItemId(page, P_HIKEY);
820         targetkey = CopyBTItem((BTItem) PageGetItem(page, itemid));
821
822         /*
823          * We need to get an approximate pointer to the page's parent page.
824          * Use the standard search mechanism to search for the page's high
825          * key; this will give us a link to either the current parent or
826          * someplace to its left (if there are multiple equal high keys).  To
827          * avoid deadlocks, we'd better drop the target page lock first.
828          */
829         _bt_relbuf(rel, buf);
830         /* we need a scan key to do our search, so build one */
831         itup_scankey = _bt_mkscankey(rel, &(targetkey->bti_itup));
832         /* find the leftmost leaf page containing this key */
833         stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey, false,
834                                            &lbuf, BT_READ);
835         /* don't need a pin on that either */
836         _bt_relbuf(rel, lbuf);
837
838         /*
839          * If we are trying to delete an interior page, _bt_search did more
840          * than we needed.      Locate the stack item pointing to our parent
841          * level.
842          */
843         ilevel = 0;
844         for (;;)
845         {
846                 if (stack == NULL)
847                         elog(ERROR, "not enough stack items");
848                 if (ilevel == targetlevel)
849                         break;
850                 stack = stack->bts_parent;
851                 ilevel++;
852         }
853
854         /*
855          * We have to lock the pages we need to modify in the standard order:
856          * moving right, then up.  Else we will deadlock against other
857          * writers.
858          *
859          * So, we need to find and write-lock the current left sibling of the
860          * target page.  The sibling that was current a moment ago could have
861          * split, so we may have to move right.  This search could fail if
862          * either the sibling or the target page was deleted by someone else
863          * meanwhile; if so, give up.  (Right now, that should never happen,
864          * since page deletion is only done in VACUUM and there shouldn't be
865          * multiple VACUUMs concurrently on the same table.)
866          */
867         if (leftsib != P_NONE)
868         {
869                 lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
870                 page = BufferGetPage(lbuf);
871                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
872                 while (P_ISDELETED(opaque) || opaque->btpo_next != target)
873                 {
874                         /* step right one page */
875                         leftsib = opaque->btpo_next;
876                         _bt_relbuf(rel, lbuf);
877                         if (leftsib == P_NONE)
878                         {
879                                 elog(LOG, "no left sibling (concurrent deletion?)");
880                                 return 0;
881                         }
882                         lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
883                         page = BufferGetPage(lbuf);
884                         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
885                 }
886         }
887         else
888                 lbuf = InvalidBuffer;
889
890         /*
891          * Next write-lock the target page itself.      It should be okay to take
892          * just a write lock not a superexclusive lock, since no scans would
893          * stop on an empty page.
894          */
895         buf = _bt_getbuf(rel, target, BT_WRITE);
896         page = BufferGetPage(buf);
897         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
898
899         /*
900          * Check page is still empty etc, else abandon deletion.  The empty
901          * check is necessary since someone else might have inserted into it
902          * while we didn't have it locked; the others are just for paranoia's
903          * sake.
904          */
905         if (P_RIGHTMOST(opaque) || P_ISROOT(opaque) || P_ISDELETED(opaque) ||
906                 P_FIRSTDATAKEY(opaque) <= PageGetMaxOffsetNumber(page))
907         {
908                 _bt_relbuf(rel, buf);
909                 if (BufferIsValid(lbuf))
910                         _bt_relbuf(rel, lbuf);
911                 return 0;
912         }
913         if (opaque->btpo_prev != leftsib)
914                 elog(ERROR, "left link changed unexpectedly");
915
916         /*
917          * And next write-lock the (current) right sibling.
918          */
919         rightsib = opaque->btpo_next;
920         rbuf = _bt_getbuf(rel, rightsib, BT_WRITE);
921
922         /*
923          * Next find and write-lock the current parent of the target page.
924          * This is essentially the same as the corresponding step of
925          * splitting.
926          */
927         ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
928                                    target, P_HIKEY);
929         pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
930         if (pbuf == InvalidBuffer)
931                 elog(ERROR, "failed to re-find parent key in \"%s\"",
932                          RelationGetRelationName(rel));
933         parent = stack->bts_blkno;
934         poffset = stack->bts_offset;
935
936         /*
937          * If the target is the rightmost child of its parent, then we can't
938          * delete, unless it's also the only child --- in which case the
939          * parent changes to half-dead status.
940          */
941         page = BufferGetPage(pbuf);
942         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
943         maxoff = PageGetMaxOffsetNumber(page);
944         parent_half_dead = false;
945         parent_one_child = false;
946         if (poffset >= maxoff)
947         {
948                 if (poffset == P_FIRSTDATAKEY(opaque))
949                         parent_half_dead = true;
950                 else
951                 {
952                         _bt_relbuf(rel, pbuf);
953                         _bt_relbuf(rel, rbuf);
954                         _bt_relbuf(rel, buf);
955                         if (BufferIsValid(lbuf))
956                                 _bt_relbuf(rel, lbuf);
957                         return 0;
958                 }
959         }
960         else
961         {
962                 /* Will there be exactly one child left in this parent? */
963                 if (OffsetNumberNext(P_FIRSTDATAKEY(opaque)) == maxoff)
964                         parent_one_child = true;
965         }
966
967         /*
968          * If we are deleting the next-to-last page on the target's level,
969          * then the rightsib is a candidate to become the new fast root. (In
970          * theory, it might be possible to push the fast root even further
971          * down, but the odds of doing so are slim, and the locking
972          * considerations daunting.)
973          *
974          * We can safely acquire a lock on the metapage here --- see comments for
975          * _bt_newroot().
976          */
977         if (leftsib == P_NONE)
978         {
979                 page = BufferGetPage(rbuf);
980                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
981                 Assert(opaque->btpo.level == targetlevel);
982                 if (P_RIGHTMOST(opaque))
983                 {
984                         /* rightsib will be the only one left on the level */
985                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
986                         metapg = BufferGetPage(metabuf);
987                         metad = BTPageGetMeta(metapg);
988
989                         /*
990                          * The expected case here is btm_fastlevel == targetlevel+1;
991                          * if the fastlevel is <= targetlevel, something is wrong, and
992                          * we choose to overwrite it to fix it.
993                          */
994                         if (metad->btm_fastlevel > targetlevel + 1)
995                         {
996                                 /* no update wanted */
997                                 _bt_relbuf(rel, metabuf);
998                                 metabuf = InvalidBuffer;
999                         }
1000                 }
1001         }
1002
1003         /*
1004          * Here we begin doing the deletion.
1005          */
1006
1007         /* No ereport(ERROR) until changes are logged */
1008         START_CRIT_SECTION();
1009
1010         /*
1011          * Update parent.  The normal case is a tad tricky because we want to
1012          * delete the target's downlink and the *following* key.  Easiest way
1013          * is to copy the right sibling's downlink over the target downlink,
1014          * and then delete the following item.
1015          */
1016         page = BufferGetPage(pbuf);
1017         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1018         if (parent_half_dead)
1019         {
1020                 PageIndexTupleDelete(page, poffset);
1021                 opaque->btpo_flags |= BTP_HALF_DEAD;
1022         }
1023         else
1024         {
1025                 OffsetNumber nextoffset;
1026
1027                 itemid = PageGetItemId(page, poffset);
1028                 btitem = (BTItem) PageGetItem(page, itemid);
1029                 Assert(ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid)) == target);
1030                 ItemPointerSet(&(btitem->bti_itup.t_tid), rightsib, P_HIKEY);
1031
1032                 nextoffset = OffsetNumberNext(poffset);
1033                 /* This part is just for double-checking */
1034                 itemid = PageGetItemId(page, nextoffset);
1035                 btitem = (BTItem) PageGetItem(page, itemid);
1036                 if (ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid)) != rightsib)
1037                         elog(PANIC, "right sibling is not next child");
1038
1039                 PageIndexTupleDelete(page, nextoffset);
1040         }
1041
1042         /*
1043          * Update siblings' side-links.  Note the target page's side-links
1044          * will continue to point to the siblings.
1045          */
1046         if (BufferIsValid(lbuf))
1047         {
1048                 page = BufferGetPage(lbuf);
1049                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1050                 Assert(opaque->btpo_next == target);
1051                 opaque->btpo_next = rightsib;
1052         }
1053         page = BufferGetPage(rbuf);
1054         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1055         Assert(opaque->btpo_prev == target);
1056         opaque->btpo_prev = leftsib;
1057         rightsib_empty = (P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page));
1058
1059         /*
1060          * Mark the page itself deleted.  It can be recycled when all current
1061          * transactions are gone; or immediately if we're doing VACUUM FULL.
1062          */
1063         page = BufferGetPage(buf);
1064         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1065         opaque->btpo_flags |= BTP_DELETED;
1066         opaque->btpo.xact =
1067                 vacuum_full ? FrozenTransactionId : ReadNewTransactionId();
1068
1069         /* And update the metapage, if needed */
1070         if (BufferIsValid(metabuf))
1071         {
1072                 metad->btm_fastroot = rightsib;
1073                 metad->btm_fastlevel = targetlevel;
1074         }
1075
1076         /* XLOG stuff */
1077         if (!rel->rd_istemp)
1078         {
1079                 xl_btree_delete_page xlrec;
1080                 xl_btree_metadata xlmeta;
1081                 uint8           xlinfo;
1082                 XLogRecPtr      recptr;
1083                 XLogRecData rdata[5];
1084                 XLogRecData *nextrdata;
1085
1086                 xlrec.target.node = rel->rd_node;
1087                 ItemPointerSet(&(xlrec.target.tid), parent, poffset);
1088                 xlrec.deadblk = target;
1089                 xlrec.leftblk = leftsib;
1090                 xlrec.rightblk = rightsib;
1091
1092                 rdata[0].buffer = InvalidBuffer;
1093                 rdata[0].data = (char *) &xlrec;
1094                 rdata[0].len = SizeOfBtreeDeletePage;
1095                 rdata[0].next = nextrdata = &(rdata[1]);
1096
1097                 if (BufferIsValid(metabuf))
1098                 {
1099                         xlmeta.root = metad->btm_root;
1100                         xlmeta.level = metad->btm_level;
1101                         xlmeta.fastroot = metad->btm_fastroot;
1102                         xlmeta.fastlevel = metad->btm_fastlevel;
1103
1104                         nextrdata->buffer = InvalidBuffer;
1105                         nextrdata->data = (char *) &xlmeta;
1106                         nextrdata->len = sizeof(xl_btree_metadata);
1107                         nextrdata->next = nextrdata + 1;
1108                         nextrdata++;
1109                         xlinfo = XLOG_BTREE_DELETE_PAGE_META;
1110                 }
1111                 else
1112                         xlinfo = XLOG_BTREE_DELETE_PAGE;
1113
1114                 nextrdata->buffer = pbuf;
1115                 nextrdata->data = NULL;
1116                 nextrdata->len = 0;
1117                 nextrdata->next = nextrdata + 1;
1118                 nextrdata++;
1119
1120                 nextrdata->buffer = rbuf;
1121                 nextrdata->data = NULL;
1122                 nextrdata->len = 0;
1123                 nextrdata->next = NULL;
1124
1125                 if (BufferIsValid(lbuf))
1126                 {
1127                         nextrdata->next = nextrdata + 1;
1128                         nextrdata++;
1129                         nextrdata->buffer = lbuf;
1130                         nextrdata->data = NULL;
1131                         nextrdata->len = 0;
1132                         nextrdata->next = NULL;
1133                 }
1134
1135                 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
1136
1137                 if (BufferIsValid(metabuf))
1138                 {
1139                         PageSetLSN(metapg, recptr);
1140                         PageSetSUI(metapg, ThisStartUpID);
1141                 }
1142                 page = BufferGetPage(pbuf);
1143                 PageSetLSN(page, recptr);
1144                 PageSetSUI(page, ThisStartUpID);
1145                 page = BufferGetPage(rbuf);
1146                 PageSetLSN(page, recptr);
1147                 PageSetSUI(page, ThisStartUpID);
1148                 page = BufferGetPage(buf);
1149                 PageSetLSN(page, recptr);
1150                 PageSetSUI(page, ThisStartUpID);
1151                 if (BufferIsValid(lbuf))
1152                 {
1153                         page = BufferGetPage(lbuf);
1154                         PageSetLSN(page, recptr);
1155                         PageSetSUI(page, ThisStartUpID);
1156                 }
1157         }
1158
1159         END_CRIT_SECTION();
1160
1161         /* Write and release buffers */
1162         if (BufferIsValid(metabuf))
1163                 _bt_wrtbuf(rel, metabuf);
1164         _bt_wrtbuf(rel, pbuf);
1165         _bt_wrtbuf(rel, rbuf);
1166         _bt_wrtbuf(rel, buf);
1167         if (BufferIsValid(lbuf))
1168                 _bt_wrtbuf(rel, lbuf);
1169
1170         /*
1171          * If parent became half dead, recurse to try to delete it. Otherwise,
1172          * if right sibling is empty and is now the last child of the parent,
1173          * recurse to try to delete it.  (These cases cannot apply at the same
1174          * time, though the second case might itself recurse to the first.)
1175          */
1176         if (parent_half_dead)
1177         {
1178                 buf = _bt_getbuf(rel, parent, BT_READ);
1179                 return _bt_pagedel(rel, buf, vacuum_full) + 1;
1180         }
1181         if (parent_one_child && rightsib_empty)
1182         {
1183                 buf = _bt_getbuf(rel, rightsib, BT_READ);
1184                 return _bt_pagedel(rel, buf, vacuum_full) + 1;
1185         }
1186
1187         return 1;
1188 }