OSDN Git Service

Standard pgindent run for 8.1.
[pg-rex/syncrep.git] / src / backend / access / nbtree / nbtpage.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtpage.c
4  *        BTree-specific page management code for the Postgres btree access
5  *        method.
6  *
7  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtpage.c,v 1.88 2005/10/15 02:49:09 momjian Exp $
13  *
14  *      NOTES
15  *         Postgres btree pages look like ordinary relation pages.      The opaque
16  *         data at high addresses includes pointers to left and right siblings
17  *         and flag data describing page state.  The first page in a btree, page
18  *         zero, is special -- it stores meta-information describing the tree.
19  *         Pages one and higher store the actual tree data.
20  *
21  *-------------------------------------------------------------------------
22  */
23 #include "postgres.h"
24
25 #include "access/nbtree.h"
26 #include "miscadmin.h"
27 #include "storage/freespace.h"
28 #include "storage/lmgr.h"
29
30
31 /*
32  *      _bt_metapinit() -- Initialize the metadata page of a new btree.
33  *
34  * Note: this is actually not used for standard btree index building;
35  * nbtsort.c prefers not to make the metadata page valid until completion
36  * of build.
37  *
38  * Note: there's no real need for any locking here.  Since the transaction
39  * creating the index hasn't committed yet, no one else can even see the index
40  * much less be trying to use it.  (In a REINDEX-in-place scenario, that's
41  * not true, but we assume the caller holds sufficient locks on the index.)
42  */
43 void
44 _bt_metapinit(Relation rel)
45 {
46         Buffer          buf;
47         Page            pg;
48         BTMetaPageData *metad;
49
50         if (RelationGetNumberOfBlocks(rel) != 0)
51                 elog(ERROR, "cannot initialize non-empty btree index \"%s\"",
52                          RelationGetRelationName(rel));
53
54         buf = ReadBuffer(rel, P_NEW);
55         Assert(BufferGetBlockNumber(buf) == BTREE_METAPAGE);
56         pg = BufferGetPage(buf);
57
58         _bt_initmetapage(pg, P_NONE, 0);
59         metad = BTPageGetMeta(pg);
60
61         /* NO ELOG(ERROR) from here till newmeta op is logged */
62         START_CRIT_SECTION();
63
64         /* XLOG stuff */
65         if (!rel->rd_istemp)
66         {
67                 xl_btree_newmeta xlrec;
68                 XLogRecPtr      recptr;
69                 XLogRecData rdata[1];
70
71                 xlrec.node = rel->rd_node;
72                 xlrec.meta.root = metad->btm_root;
73                 xlrec.meta.level = metad->btm_level;
74                 xlrec.meta.fastroot = metad->btm_fastroot;
75                 xlrec.meta.fastlevel = metad->btm_fastlevel;
76
77                 rdata[0].data = (char *) &xlrec;
78                 rdata[0].len = SizeOfBtreeNewmeta;
79                 rdata[0].buffer = InvalidBuffer;
80                 rdata[0].next = NULL;
81
82                 recptr = XLogInsert(RM_BTREE_ID,
83                                                         XLOG_BTREE_NEWMETA,
84                                                         rdata);
85
86                 PageSetLSN(pg, recptr);
87                 PageSetTLI(pg, ThisTimeLineID);
88         }
89
90         END_CRIT_SECTION();
91
92         WriteBuffer(buf);
93 }
94
95 /*
96  *      _bt_initmetapage() -- Fill a page buffer with a correct metapage image
97  */
98 void
99 _bt_initmetapage(Page page, BlockNumber rootbknum, uint32 level)
100 {
101         BTMetaPageData *metad;
102         BTPageOpaque metaopaque;
103
104         _bt_pageinit(page, BLCKSZ);
105
106         metad = BTPageGetMeta(page);
107         metad->btm_magic = BTREE_MAGIC;
108         metad->btm_version = BTREE_VERSION;
109         metad->btm_root = rootbknum;
110         metad->btm_level = level;
111         metad->btm_fastroot = rootbknum;
112         metad->btm_fastlevel = level;
113
114         metaopaque = (BTPageOpaque) PageGetSpecialPointer(page);
115         metaopaque->btpo_flags = BTP_META;
116
117         /*
118          * Set pd_lower just past the end of the metadata.      This is not essential
119          * but it makes the page look compressible to xlog.c.
120          */
121         ((PageHeader) page)->pd_lower =
122                 ((char *) metad + sizeof(BTMetaPageData)) - (char *) page;
123 }
124
125 /*
126  *      _bt_getroot() -- Get the root page of the btree.
127  *
128  *              Since the root page can move around the btree file, we have to read
129  *              its location from the metadata page, and then read the root page
130  *              itself.  If no root page exists yet, we have to create one.  The
131  *              standard class of race conditions exists here; I think I covered
132  *              them all in the Hopi Indian rain dance of lock requests below.
133  *
134  *              The access type parameter (BT_READ or BT_WRITE) controls whether
135  *              a new root page will be created or not.  If access = BT_READ,
136  *              and no root page exists, we just return InvalidBuffer.  For
137  *              BT_WRITE, we try to create the root page if it doesn't exist.
138  *              NOTE that the returned root page will have only a read lock set
139  *              on it even if access = BT_WRITE!
140  *
141  *              The returned page is not necessarily the true root --- it could be
142  *              a "fast root" (a page that is alone in its level due to deletions).
143  *              Also, if the root page is split while we are "in flight" to it,
144  *              what we will return is the old root, which is now just the leftmost
145  *              page on a probably-not-very-wide level.  For most purposes this is
146  *              as good as or better than the true root, so we do not bother to
147  *              insist on finding the true root.  We do, however, guarantee to
148  *              return a live (not deleted or half-dead) page.
149  *
150  *              On successful return, the root page is pinned and read-locked.
151  *              The metadata page is not locked or pinned on exit.
152  */
153 Buffer
154 _bt_getroot(Relation rel, int access)
155 {
156         Buffer          metabuf;
157         Page            metapg;
158         BTPageOpaque metaopaque;
159         Buffer          rootbuf;
160         Page            rootpage;
161         BTPageOpaque rootopaque;
162         BlockNumber rootblkno;
163         uint32          rootlevel;
164         BTMetaPageData *metad;
165
166         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
167         metapg = BufferGetPage(metabuf);
168         metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg);
169         metad = BTPageGetMeta(metapg);
170
171         /* sanity-check the metapage */
172         if (!(metaopaque->btpo_flags & BTP_META) ||
173                 metad->btm_magic != BTREE_MAGIC)
174                 ereport(ERROR,
175                                 (errcode(ERRCODE_INDEX_CORRUPTED),
176                                  errmsg("index \"%s\" is not a btree",
177                                                 RelationGetRelationName(rel))));
178
179         if (metad->btm_version != BTREE_VERSION)
180                 ereport(ERROR,
181                                 (errcode(ERRCODE_INDEX_CORRUPTED),
182                                  errmsg("version mismatch in index \"%s\": file version %d, code version %d",
183                                                 RelationGetRelationName(rel),
184                                                 metad->btm_version, BTREE_VERSION)));
185
186         /* if no root page initialized yet, do it */
187         if (metad->btm_root == P_NONE)
188         {
189                 /* If access = BT_READ, caller doesn't want us to create root yet */
190                 if (access == BT_READ)
191                 {
192                         _bt_relbuf(rel, metabuf);
193                         return InvalidBuffer;
194                 }
195
196                 /* trade in our read lock for a write lock */
197                 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
198                 LockBuffer(metabuf, BT_WRITE);
199
200                 /*
201                  * Race condition:      if someone else initialized the metadata between
202                  * the time we released the read lock and acquired the write lock, we
203                  * must avoid doing it again.
204                  */
205                 if (metad->btm_root != P_NONE)
206                 {
207                         /*
208                          * Metadata initialized by someone else.  In order to guarantee no
209                          * deadlocks, we have to release the metadata page and start all
210                          * over again.  (Is that really true? But it's hardly worth trying
211                          * to optimize this case.)
212                          */
213                         _bt_relbuf(rel, metabuf);
214                         return _bt_getroot(rel, access);
215                 }
216
217                 /*
218                  * Get, initialize, write, and leave a lock of the appropriate type on
219                  * the new root page.  Since this is the first page in the tree, it's
220                  * a leaf as well as the root.
221                  */
222                 rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
223                 rootblkno = BufferGetBlockNumber(rootbuf);
224                 rootpage = BufferGetPage(rootbuf);
225
226                 _bt_pageinit(rootpage, BufferGetPageSize(rootbuf));
227                 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
228                 rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
229                 rootopaque->btpo_flags = (BTP_LEAF | BTP_ROOT);
230                 rootopaque->btpo.level = 0;
231
232                 /* NO ELOG(ERROR) till meta is updated */
233                 START_CRIT_SECTION();
234
235                 metad->btm_root = rootblkno;
236                 metad->btm_level = 0;
237                 metad->btm_fastroot = rootblkno;
238                 metad->btm_fastlevel = 0;
239
240                 /* XLOG stuff */
241                 if (!rel->rd_istemp)
242                 {
243                         xl_btree_newroot xlrec;
244                         XLogRecPtr      recptr;
245                         XLogRecData rdata;
246
247                         xlrec.node = rel->rd_node;
248                         xlrec.rootblk = rootblkno;
249                         xlrec.level = 0;
250
251                         rdata.data = (char *) &xlrec;
252                         rdata.len = SizeOfBtreeNewroot;
253                         rdata.buffer = InvalidBuffer;
254                         rdata.next = NULL;
255
256                         recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, &rdata);
257
258                         PageSetLSN(rootpage, recptr);
259                         PageSetTLI(rootpage, ThisTimeLineID);
260                         PageSetLSN(metapg, recptr);
261                         PageSetTLI(metapg, ThisTimeLineID);
262                 }
263
264                 END_CRIT_SECTION();
265
266                 _bt_wrtnorelbuf(rel, rootbuf);
267
268                 /*
269                  * swap root write lock for read lock.  There is no danger of anyone
270                  * else accessing the new root page while it's unlocked, since no one
271                  * else knows where it is yet.
272                  */
273                 LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK);
274                 LockBuffer(rootbuf, BT_READ);
275
276                 /* okay, metadata is correct, write and release it */
277                 _bt_wrtbuf(rel, metabuf);
278         }
279         else
280         {
281                 rootblkno = metad->btm_fastroot;
282                 Assert(rootblkno != P_NONE);
283                 rootlevel = metad->btm_fastlevel;
284
285                 /*
286                  * We are done with the metapage; arrange to release it via first
287                  * _bt_relandgetbuf call
288                  */
289                 rootbuf = metabuf;
290
291                 for (;;)
292                 {
293                         rootbuf = _bt_relandgetbuf(rel, rootbuf, rootblkno, BT_READ);
294                         rootpage = BufferGetPage(rootbuf);
295                         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
296
297                         if (!P_IGNORE(rootopaque))
298                                 break;
299
300                         /* it's dead, Jim.  step right one page */
301                         if (P_RIGHTMOST(rootopaque))
302                                 elog(ERROR, "no live root page found in \"%s\"",
303                                          RelationGetRelationName(rel));
304                         rootblkno = rootopaque->btpo_next;
305                 }
306
307                 /* Note: can't check btpo.level on deleted pages */
308                 if (rootopaque->btpo.level != rootlevel)
309                         elog(ERROR, "root page %u of \"%s\" has level %u, expected %u",
310                                  rootblkno, RelationGetRelationName(rel),
311                                  rootopaque->btpo.level, rootlevel);
312         }
313
314         /*
315          * By here, we have a pin and read lock on the root page, and no lock set
316          * on the metadata page.  Return the root page's buffer.
317          */
318         return rootbuf;
319 }
320
321 /*
322  *      _bt_gettrueroot() -- Get the true root page of the btree.
323  *
324  *              This is the same as the BT_READ case of _bt_getroot(), except
325  *              we follow the true-root link not the fast-root link.
326  *
327  * By the time we acquire lock on the root page, it might have been split and
328  * not be the true root anymore.  This is okay for the present uses of this
329  * routine; we only really need to be able to move up at least one tree level
330  * from whatever non-root page we were at.      If we ever do need to lock the
331  * one true root page, we could loop here, re-reading the metapage on each
332  * failure.  (Note that it wouldn't do to hold the lock on the metapage while
333  * moving to the root --- that'd deadlock against any concurrent root split.)
334  */
335 Buffer
336 _bt_gettrueroot(Relation rel)
337 {
338         Buffer          metabuf;
339         Page            metapg;
340         BTPageOpaque metaopaque;
341         Buffer          rootbuf;
342         Page            rootpage;
343         BTPageOpaque rootopaque;
344         BlockNumber rootblkno;
345         uint32          rootlevel;
346         BTMetaPageData *metad;
347
348         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
349         metapg = BufferGetPage(metabuf);
350         metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg);
351         metad = BTPageGetMeta(metapg);
352
353         if (!(metaopaque->btpo_flags & BTP_META) ||
354                 metad->btm_magic != BTREE_MAGIC)
355                 ereport(ERROR,
356                                 (errcode(ERRCODE_INDEX_CORRUPTED),
357                                  errmsg("index \"%s\" is not a btree",
358                                                 RelationGetRelationName(rel))));
359
360         if (metad->btm_version != BTREE_VERSION)
361                 ereport(ERROR,
362                                 (errcode(ERRCODE_INDEX_CORRUPTED),
363                                  errmsg("version mismatch in index \"%s\": file version %d, code version %d",
364                                                 RelationGetRelationName(rel),
365                                                 metad->btm_version, BTREE_VERSION)));
366
367         /* if no root page initialized yet, fail */
368         if (metad->btm_root == P_NONE)
369         {
370                 _bt_relbuf(rel, metabuf);
371                 return InvalidBuffer;
372         }
373
374         rootblkno = metad->btm_root;
375         rootlevel = metad->btm_level;
376
377         /*
378          * We are done with the metapage; arrange to release it via first
379          * _bt_relandgetbuf call
380          */
381         rootbuf = metabuf;
382
383         for (;;)
384         {
385                 rootbuf = _bt_relandgetbuf(rel, rootbuf, rootblkno, BT_READ);
386                 rootpage = BufferGetPage(rootbuf);
387                 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
388
389                 if (!P_IGNORE(rootopaque))
390                         break;
391
392                 /* it's dead, Jim.  step right one page */
393                 if (P_RIGHTMOST(rootopaque))
394                         elog(ERROR, "no live root page found in \"%s\"",
395                                  RelationGetRelationName(rel));
396                 rootblkno = rootopaque->btpo_next;
397         }
398
399         /* Note: can't check btpo.level on deleted pages */
400         if (rootopaque->btpo.level != rootlevel)
401                 elog(ERROR, "root page %u of \"%s\" has level %u, expected %u",
402                          rootblkno, RelationGetRelationName(rel),
403                          rootopaque->btpo.level, rootlevel);
404
405         return rootbuf;
406 }
407
408 /*
409  *      _bt_getbuf() -- Get a buffer by block number for read or write.
410  *
411  *              blkno == P_NEW means to get an unallocated index page.
412  *
413  *              When this routine returns, the appropriate lock is set on the
414  *              requested buffer and its reference count has been incremented
415  *              (ie, the buffer is "locked and pinned").
416  */
417 Buffer
418 _bt_getbuf(Relation rel, BlockNumber blkno, int access)
419 {
420         Buffer          buf;
421
422         if (blkno != P_NEW)
423         {
424                 /* Read an existing block of the relation */
425                 buf = ReadBuffer(rel, blkno);
426                 LockBuffer(buf, access);
427         }
428         else
429         {
430                 bool            needLock;
431                 Page            page;
432
433                 Assert(access == BT_WRITE);
434
435                 /*
436                  * First see if the FSM knows of any free pages.
437                  *
438                  * We can't trust the FSM's report unreservedly; we have to check that
439                  * the page is still free.      (For example, an already-free page could
440                  * have been re-used between the time the last VACUUM scanned it and
441                  * the time the VACUUM made its FSM updates.)
442                  *
443                  * In fact, it's worse than that: we can't even assume that it's safe to
444                  * take a lock on the reported page.  If somebody else has a lock on
445                  * it, or even worse our own caller does, we could deadlock.  (The
446                  * own-caller scenario is actually not improbable. Consider an index
447                  * on a serial or timestamp column.  Nearly all splits will be at the
448                  * rightmost page, so it's entirely likely that _bt_split will call us
449                  * while holding a lock on the page most recently acquired from FSM.
450                  * A VACUUM running concurrently with the previous split could well
451                  * have placed that page back in FSM.)
452                  *
453                  * To get around that, we ask for only a conditional lock on the reported
454                  * page.  If we fail, then someone else is using the page, and we may
455                  * reasonably assume it's not free.  (If we happen to be wrong, the
456                  * worst consequence is the page will be lost to use till the next
457                  * VACUUM, which is no big problem.)
458                  */
459                 for (;;)
460                 {
461                         blkno = GetFreeIndexPage(&rel->rd_node);
462                         if (blkno == InvalidBlockNumber)
463                                 break;
464                         buf = ReadBuffer(rel, blkno);
465                         if (ConditionalLockBuffer(buf))
466                         {
467                                 page = BufferGetPage(buf);
468                                 if (_bt_page_recyclable(page))
469                                 {
470                                         /* Okay to use page.  Re-initialize and return it */
471                                         _bt_pageinit(page, BufferGetPageSize(buf));
472                                         return buf;
473                                 }
474                                 elog(DEBUG2, "FSM returned nonrecyclable page");
475                                 _bt_relbuf(rel, buf);
476                         }
477                         else
478                         {
479                                 elog(DEBUG2, "FSM returned nonlockable page");
480                                 /* couldn't get lock, so just drop pin */
481                                 ReleaseBuffer(buf);
482                         }
483                 }
484
485                 /*
486                  * Extend the relation by one page.
487                  *
488                  * We have to use a lock to ensure no one else is extending the rel at
489                  * the same time, else we will both try to initialize the same new
490                  * page.  We can skip locking for new or temp relations, however,
491                  * since no one else could be accessing them.
492                  */
493                 needLock = !RELATION_IS_LOCAL(rel);
494
495                 if (needLock)
496                         LockRelationForExtension(rel, ExclusiveLock);
497
498                 buf = ReadBuffer(rel, P_NEW);
499
500                 /* Acquire buffer lock on new page */
501                 LockBuffer(buf, BT_WRITE);
502
503                 /*
504                  * Release the file-extension lock; it's now OK for someone else to
505                  * extend the relation some more.  Note that we cannot release this
506                  * lock before we have buffer lock on the new page, or we risk a race
507                  * condition against btvacuumcleanup --- see comments therein.
508                  */
509                 if (needLock)
510                         UnlockRelationForExtension(rel, ExclusiveLock);
511
512                 /* Initialize the new page before returning it */
513                 page = BufferGetPage(buf);
514                 Assert(PageIsNew((PageHeader) page));
515                 _bt_pageinit(page, BufferGetPageSize(buf));
516         }
517
518         /* ref count and lock type are correct */
519         return buf;
520 }
521
522 /*
523  *      _bt_relandgetbuf() -- release a locked buffer and get another one.
524  *
525  * This is equivalent to _bt_relbuf followed by _bt_getbuf, with the
526  * exception that blkno may not be P_NEW.  Also, if obuf is InvalidBuffer
527  * then it reduces to just _bt_getbuf; allowing this case simplifies some
528  * callers. The motivation for using this is to avoid two entries to the
529  * bufmgr when one will do.
530  */
531 Buffer
532 _bt_relandgetbuf(Relation rel, Buffer obuf, BlockNumber blkno, int access)
533 {
534         Buffer          buf;
535
536         Assert(blkno != P_NEW);
537         if (BufferIsValid(obuf))
538                 LockBuffer(obuf, BUFFER_LOCK_UNLOCK);
539         buf = ReleaseAndReadBuffer(obuf, rel, blkno);
540         LockBuffer(buf, access);
541         return buf;
542 }
543
544 /*
545  *      _bt_relbuf() -- release a locked buffer.
546  *
547  * Lock and pin (refcount) are both dropped.  Note that either read or
548  * write lock can be dropped this way, but if we modified the buffer,
549  * this is NOT the right way to release a write lock.
550  */
551 void
552 _bt_relbuf(Relation rel, Buffer buf)
553 {
554         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
555         ReleaseBuffer(buf);
556 }
557
558 /*
559  *      _bt_wrtbuf() -- write a btree page to disk.
560  *
561  *              This routine releases the lock held on the buffer and our refcount
562  *              for it.  It is an error to call _bt_wrtbuf() without a write lock
563  *              and a pin on the buffer.
564  *
565  * NOTE: actually, the buffer manager just marks the shared buffer page
566  * dirty here; the real I/O happens later.      This is okay since we are not
567  * relying on write ordering anyway.  The WAL mechanism is responsible for
568  * guaranteeing correctness after a crash.
569  */
570 void
571 _bt_wrtbuf(Relation rel, Buffer buf)
572 {
573         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
574         WriteBuffer(buf);
575 }
576
577 /*
578  *      _bt_wrtnorelbuf() -- write a btree page to disk, but do not release
579  *                                               our reference or lock.
580  *
581  *              It is an error to call _bt_wrtnorelbuf() without a write lock
582  *              and a pin on the buffer.
583  *
584  * See above NOTE.
585  */
586 void
587 _bt_wrtnorelbuf(Relation rel, Buffer buf)
588 {
589         WriteNoReleaseBuffer(buf);
590 }
591
592 /*
593  *      _bt_pageinit() -- Initialize a new page.
594  *
595  * On return, the page header is initialized; data space is empty;
596  * special space is zeroed out.
597  */
598 void
599 _bt_pageinit(Page page, Size size)
600 {
601         PageInit(page, size, sizeof(BTPageOpaqueData));
602 }
603
604 /*
605  *      _bt_page_recyclable() -- Is an existing page recyclable?
606  *
607  * This exists to make sure _bt_getbuf and btvacuumcleanup have the same
608  * policy about whether a page is safe to re-use.
609  */
610 bool
611 _bt_page_recyclable(Page page)
612 {
613         BTPageOpaque opaque;
614
615         /*
616          * It's possible to find an all-zeroes page in an index --- for example, a
617          * backend might successfully extend the relation one page and then crash
618          * before it is able to make a WAL entry for adding the page. If we find a
619          * zeroed page then reclaim it.
620          */
621         if (PageIsNew(page))
622                 return true;
623
624         /*
625          * Otherwise, recycle if deleted and too old to have any processes
626          * interested in it.
627          */
628         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
629         if (P_ISDELETED(opaque) &&
630                 TransactionIdPrecedesOrEquals(opaque->btpo.xact, RecentXmin))
631                 return true;
632         return false;
633 }
634
635 /*
636  * Delete item(s) from a btree page.
637  *
638  * This must only be used for deleting leaf items.      Deleting an item on a
639  * non-leaf page has to be done as part of an atomic action that includes
640  * deleting the page it points to.
641  *
642  * This routine assumes that the caller has pinned and locked the buffer,
643  * and will write the buffer afterwards.  Also, the given itemnos *must*
644  * appear in increasing order in the array.
645  */
646 void
647 _bt_delitems(Relation rel, Buffer buf,
648                          OffsetNumber *itemnos, int nitems)
649 {
650         Page            page = BufferGetPage(buf);
651
652         /* No ereport(ERROR) until changes are logged */
653         START_CRIT_SECTION();
654
655         /* Fix the page */
656         PageIndexMultiDelete(page, itemnos, nitems);
657
658         /* XLOG stuff */
659         if (!rel->rd_istemp)
660         {
661                 xl_btree_delete xlrec;
662                 XLogRecPtr      recptr;
663                 XLogRecData rdata[2];
664
665                 xlrec.node = rel->rd_node;
666                 xlrec.block = BufferGetBlockNumber(buf);
667
668                 rdata[0].data = (char *) &xlrec;
669                 rdata[0].len = SizeOfBtreeDelete;
670                 rdata[0].buffer = InvalidBuffer;
671                 rdata[0].next = &(rdata[1]);
672
673                 /*
674                  * The target-offsets array is not in the buffer, but pretend that it
675                  * is.  When XLogInsert stores the whole buffer, the offsets array
676                  * need not be stored too.
677                  */
678                 if (nitems > 0)
679                 {
680                         rdata[1].data = (char *) itemnos;
681                         rdata[1].len = nitems * sizeof(OffsetNumber);
682                 }
683                 else
684                 {
685                         rdata[1].data = NULL;
686                         rdata[1].len = 0;
687                 }
688                 rdata[1].buffer = buf;
689                 rdata[1].buffer_std = true;
690                 rdata[1].next = NULL;
691
692                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata);
693
694                 PageSetLSN(page, recptr);
695                 PageSetTLI(page, ThisTimeLineID);
696         }
697
698         END_CRIT_SECTION();
699 }
700
701 /*
702  * _bt_pagedel() -- Delete a page from the b-tree.
703  *
704  * This action unlinks the page from the b-tree structure, removing all
705  * pointers leading to it --- but not touching its own left and right links.
706  * The page cannot be physically reclaimed right away, since other processes
707  * may currently be trying to follow links leading to the page; they have to
708  * be allowed to use its right-link to recover.  See nbtree/README.
709  *
710  * On entry, the target buffer must be pinned and read-locked.  This lock and
711  * pin will be dropped before exiting.
712  *
713  * Returns the number of pages successfully deleted (zero on failure; could
714  * be more than one if parent blocks were deleted).
715  *
716  * NOTE: this leaks memory.  Rather than trying to clean up everything
717  * carefully, it's better to run it in a temp context that can be reset
718  * frequently.
719  */
720 int
721 _bt_pagedel(Relation rel, Buffer buf, bool vacuum_full)
722 {
723         BlockNumber target,
724                                 leftsib,
725                                 rightsib,
726                                 parent;
727         OffsetNumber poffset,
728                                 maxoff;
729         uint32          targetlevel,
730                                 ilevel;
731         ItemId          itemid;
732         BTItem          targetkey,
733                                 btitem;
734         ScanKey         itup_scankey;
735         BTStack         stack;
736         Buffer          lbuf,
737                                 rbuf,
738                                 pbuf;
739         bool            parent_half_dead;
740         bool            parent_one_child;
741         bool            rightsib_empty;
742         Buffer          metabuf = InvalidBuffer;
743         Page            metapg = NULL;
744         BTMetaPageData *metad = NULL;
745         Page            page;
746         BTPageOpaque opaque;
747
748         /*
749          * We can never delete rightmost pages nor root pages.  While at it, check
750          * that page is not already deleted and is empty.
751          */
752         page = BufferGetPage(buf);
753         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
754         if (P_RIGHTMOST(opaque) || P_ISROOT(opaque) || P_ISDELETED(opaque) ||
755                 P_FIRSTDATAKEY(opaque) <= PageGetMaxOffsetNumber(page))
756         {
757                 _bt_relbuf(rel, buf);
758                 return 0;
759         }
760
761         /*
762          * Save info about page, including a copy of its high key (it must have
763          * one, being non-rightmost).
764          */
765         target = BufferGetBlockNumber(buf);
766         targetlevel = opaque->btpo.level;
767         leftsib = opaque->btpo_prev;
768         itemid = PageGetItemId(page, P_HIKEY);
769         targetkey = CopyBTItem((BTItem) PageGetItem(page, itemid));
770
771         /*
772          * We need to get an approximate pointer to the page's parent page. Use
773          * the standard search mechanism to search for the page's high key; this
774          * will give us a link to either the current parent or someplace to its
775          * left (if there are multiple equal high keys).  To avoid deadlocks, we'd
776          * better drop the target page lock first.
777          */
778         _bt_relbuf(rel, buf);
779         /* we need a scan key to do our search, so build one */
780         itup_scankey = _bt_mkscankey(rel, &(targetkey->bti_itup));
781         /* find the leftmost leaf page containing this key */
782         stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey, false,
783                                            &lbuf, BT_READ);
784         /* don't need a pin on that either */
785         _bt_relbuf(rel, lbuf);
786
787         /*
788          * If we are trying to delete an interior page, _bt_search did more than
789          * we needed.  Locate the stack item pointing to our parent level.
790          */
791         ilevel = 0;
792         for (;;)
793         {
794                 if (stack == NULL)
795                         elog(ERROR, "not enough stack items");
796                 if (ilevel == targetlevel)
797                         break;
798                 stack = stack->bts_parent;
799                 ilevel++;
800         }
801
802         /*
803          * We have to lock the pages we need to modify in the standard order:
804          * moving right, then up.  Else we will deadlock against other writers.
805          *
806          * So, we need to find and write-lock the current left sibling of the target
807          * page.  The sibling that was current a moment ago could have split, so
808          * we may have to move right.  This search could fail if either the
809          * sibling or the target page was deleted by someone else meanwhile; if
810          * so, give up.  (Right now, that should never happen, since page deletion
811          * is only done in VACUUM and there shouldn't be multiple VACUUMs
812          * concurrently on the same table.)
813          */
814         if (leftsib != P_NONE)
815         {
816                 lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
817                 page = BufferGetPage(lbuf);
818                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
819                 while (P_ISDELETED(opaque) || opaque->btpo_next != target)
820                 {
821                         /* step right one page */
822                         leftsib = opaque->btpo_next;
823                         _bt_relbuf(rel, lbuf);
824                         if (leftsib == P_NONE)
825                         {
826                                 elog(LOG, "no left sibling (concurrent deletion?) in \"%s\"",
827                                          RelationGetRelationName(rel));
828                                 return 0;
829                         }
830                         lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
831                         page = BufferGetPage(lbuf);
832                         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
833                 }
834         }
835         else
836                 lbuf = InvalidBuffer;
837
838         /*
839          * Next write-lock the target page itself.      It should be okay to take just
840          * a write lock not a superexclusive lock, since no scans would stop on an
841          * empty page.
842          */
843         buf = _bt_getbuf(rel, target, BT_WRITE);
844         page = BufferGetPage(buf);
845         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
846
847         /*
848          * Check page is still empty etc, else abandon deletion.  The empty check
849          * is necessary since someone else might have inserted into it while we
850          * didn't have it locked; the others are just for paranoia's sake.
851          */
852         if (P_RIGHTMOST(opaque) || P_ISROOT(opaque) || P_ISDELETED(opaque) ||
853                 P_FIRSTDATAKEY(opaque) <= PageGetMaxOffsetNumber(page))
854         {
855                 _bt_relbuf(rel, buf);
856                 if (BufferIsValid(lbuf))
857                         _bt_relbuf(rel, lbuf);
858                 return 0;
859         }
860         if (opaque->btpo_prev != leftsib)
861                 elog(ERROR, "left link changed unexpectedly in block %u of \"%s\"",
862                          target, RelationGetRelationName(rel));
863
864         /*
865          * And next write-lock the (current) right sibling.
866          */
867         rightsib = opaque->btpo_next;
868         rbuf = _bt_getbuf(rel, rightsib, BT_WRITE);
869
870         /*
871          * Next find and write-lock the current parent of the target page. This is
872          * essentially the same as the corresponding step of splitting.
873          */
874         ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
875                                    target, P_HIKEY);
876         pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
877         if (pbuf == InvalidBuffer)
878                 elog(ERROR, "failed to re-find parent key in \"%s\"",
879                          RelationGetRelationName(rel));
880         parent = stack->bts_blkno;
881         poffset = stack->bts_offset;
882
883         /*
884          * If the target is the rightmost child of its parent, then we can't
885          * delete, unless it's also the only child --- in which case the parent
886          * changes to half-dead status.
887          */
888         page = BufferGetPage(pbuf);
889         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
890         maxoff = PageGetMaxOffsetNumber(page);
891         parent_half_dead = false;
892         parent_one_child = false;
893         if (poffset >= maxoff)
894         {
895                 if (poffset == P_FIRSTDATAKEY(opaque))
896                         parent_half_dead = true;
897                 else
898                 {
899                         _bt_relbuf(rel, pbuf);
900                         _bt_relbuf(rel, rbuf);
901                         _bt_relbuf(rel, buf);
902                         if (BufferIsValid(lbuf))
903                                 _bt_relbuf(rel, lbuf);
904                         return 0;
905                 }
906         }
907         else
908         {
909                 /* Will there be exactly one child left in this parent? */
910                 if (OffsetNumberNext(P_FIRSTDATAKEY(opaque)) == maxoff)
911                         parent_one_child = true;
912         }
913
914         /*
915          * If we are deleting the next-to-last page on the target's level, then
916          * the rightsib is a candidate to become the new fast root. (In theory, it
917          * might be possible to push the fast root even further down, but the odds
918          * of doing so are slim, and the locking considerations daunting.)
919          *
920          * We can safely acquire a lock on the metapage here --- see comments for
921          * _bt_newroot().
922          */
923         if (leftsib == P_NONE)
924         {
925                 page = BufferGetPage(rbuf);
926                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
927                 Assert(opaque->btpo.level == targetlevel);
928                 if (P_RIGHTMOST(opaque))
929                 {
930                         /* rightsib will be the only one left on the level */
931                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
932                         metapg = BufferGetPage(metabuf);
933                         metad = BTPageGetMeta(metapg);
934
935                         /*
936                          * The expected case here is btm_fastlevel == targetlevel+1; if
937                          * the fastlevel is <= targetlevel, something is wrong, and we
938                          * choose to overwrite it to fix it.
939                          */
940                         if (metad->btm_fastlevel > targetlevel + 1)
941                         {
942                                 /* no update wanted */
943                                 _bt_relbuf(rel, metabuf);
944                                 metabuf = InvalidBuffer;
945                         }
946                 }
947         }
948
949         /*
950          * Here we begin doing the deletion.
951          */
952
953         /* No ereport(ERROR) until changes are logged */
954         START_CRIT_SECTION();
955
956         /*
957          * Update parent.  The normal case is a tad tricky because we want to
958          * delete the target's downlink and the *following* key.  Easiest way is
959          * to copy the right sibling's downlink over the target downlink, and then
960          * delete the following item.
961          */
962         page = BufferGetPage(pbuf);
963         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
964         if (parent_half_dead)
965         {
966                 PageIndexTupleDelete(page, poffset);
967                 opaque->btpo_flags |= BTP_HALF_DEAD;
968         }
969         else
970         {
971                 OffsetNumber nextoffset;
972
973                 itemid = PageGetItemId(page, poffset);
974                 btitem = (BTItem) PageGetItem(page, itemid);
975                 Assert(ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid)) == target);
976                 ItemPointerSet(&(btitem->bti_itup.t_tid), rightsib, P_HIKEY);
977
978                 nextoffset = OffsetNumberNext(poffset);
979                 /* This part is just for double-checking */
980                 itemid = PageGetItemId(page, nextoffset);
981                 btitem = (BTItem) PageGetItem(page, itemid);
982                 if (ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid)) != rightsib)
983                         elog(PANIC, "right sibling is not next child in \"%s\"",
984                                  RelationGetRelationName(rel));
985                 PageIndexTupleDelete(page, nextoffset);
986         }
987
988         /*
989          * Update siblings' side-links.  Note the target page's side-links will
990          * continue to point to the siblings.
991          */
992         if (BufferIsValid(lbuf))
993         {
994                 page = BufferGetPage(lbuf);
995                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
996                 Assert(opaque->btpo_next == target);
997                 opaque->btpo_next = rightsib;
998         }
999         page = BufferGetPage(rbuf);
1000         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1001         Assert(opaque->btpo_prev == target);
1002         opaque->btpo_prev = leftsib;
1003         rightsib_empty = (P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page));
1004
1005         /*
1006          * Mark the page itself deleted.  It can be recycled when all current
1007          * transactions are gone; or immediately if we're doing VACUUM FULL.
1008          */
1009         page = BufferGetPage(buf);
1010         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1011         opaque->btpo_flags |= BTP_DELETED;
1012         opaque->btpo.xact =
1013                 vacuum_full ? FrozenTransactionId : ReadNewTransactionId();
1014
1015         /* And update the metapage, if needed */
1016         if (BufferIsValid(metabuf))
1017         {
1018                 metad->btm_fastroot = rightsib;
1019                 metad->btm_fastlevel = targetlevel;
1020         }
1021
1022         /* XLOG stuff */
1023         if (!rel->rd_istemp)
1024         {
1025                 xl_btree_delete_page xlrec;
1026                 xl_btree_metadata xlmeta;
1027                 uint8           xlinfo;
1028                 XLogRecPtr      recptr;
1029                 XLogRecData rdata[5];
1030                 XLogRecData *nextrdata;
1031
1032                 xlrec.target.node = rel->rd_node;
1033                 ItemPointerSet(&(xlrec.target.tid), parent, poffset);
1034                 xlrec.deadblk = target;
1035                 xlrec.leftblk = leftsib;
1036                 xlrec.rightblk = rightsib;
1037
1038                 rdata[0].data = (char *) &xlrec;
1039                 rdata[0].len = SizeOfBtreeDeletePage;
1040                 rdata[0].buffer = InvalidBuffer;
1041                 rdata[0].next = nextrdata = &(rdata[1]);
1042
1043                 if (BufferIsValid(metabuf))
1044                 {
1045                         xlmeta.root = metad->btm_root;
1046                         xlmeta.level = metad->btm_level;
1047                         xlmeta.fastroot = metad->btm_fastroot;
1048                         xlmeta.fastlevel = metad->btm_fastlevel;
1049
1050                         nextrdata->data = (char *) &xlmeta;
1051                         nextrdata->len = sizeof(xl_btree_metadata);
1052                         nextrdata->buffer = InvalidBuffer;
1053                         nextrdata->next = nextrdata + 1;
1054                         nextrdata++;
1055                         xlinfo = XLOG_BTREE_DELETE_PAGE_META;
1056                 }
1057                 else
1058                         xlinfo = XLOG_BTREE_DELETE_PAGE;
1059
1060                 nextrdata->data = NULL;
1061                 nextrdata->len = 0;
1062                 nextrdata->next = nextrdata + 1;
1063                 nextrdata->buffer = pbuf;
1064                 nextrdata->buffer_std = true;
1065                 nextrdata++;
1066
1067                 nextrdata->data = NULL;
1068                 nextrdata->len = 0;
1069                 nextrdata->buffer = rbuf;
1070                 nextrdata->buffer_std = true;
1071                 nextrdata->next = NULL;
1072
1073                 if (BufferIsValid(lbuf))
1074                 {
1075                         nextrdata->next = nextrdata + 1;
1076                         nextrdata++;
1077                         nextrdata->data = NULL;
1078                         nextrdata->len = 0;
1079                         nextrdata->buffer = lbuf;
1080                         nextrdata->buffer_std = true;
1081                         nextrdata->next = NULL;
1082                 }
1083
1084                 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
1085
1086                 if (BufferIsValid(metabuf))
1087                 {
1088                         PageSetLSN(metapg, recptr);
1089                         PageSetTLI(metapg, ThisTimeLineID);
1090                 }
1091                 page = BufferGetPage(pbuf);
1092                 PageSetLSN(page, recptr);
1093                 PageSetTLI(page, ThisTimeLineID);
1094                 page = BufferGetPage(rbuf);
1095                 PageSetLSN(page, recptr);
1096                 PageSetTLI(page, ThisTimeLineID);
1097                 page = BufferGetPage(buf);
1098                 PageSetLSN(page, recptr);
1099                 PageSetTLI(page, ThisTimeLineID);
1100                 if (BufferIsValid(lbuf))
1101                 {
1102                         page = BufferGetPage(lbuf);
1103                         PageSetLSN(page, recptr);
1104                         PageSetTLI(page, ThisTimeLineID);
1105                 }
1106         }
1107
1108         END_CRIT_SECTION();
1109
1110         /* Write and release buffers */
1111         if (BufferIsValid(metabuf))
1112                 _bt_wrtbuf(rel, metabuf);
1113         _bt_wrtbuf(rel, pbuf);
1114         _bt_wrtbuf(rel, rbuf);
1115         _bt_wrtbuf(rel, buf);
1116         if (BufferIsValid(lbuf))
1117                 _bt_wrtbuf(rel, lbuf);
1118
1119         /*
1120          * If parent became half dead, recurse to try to delete it. Otherwise, if
1121          * right sibling is empty and is now the last child of the parent, recurse
1122          * to try to delete it.  (These cases cannot apply at the same time,
1123          * though the second case might itself recurse to the first.)
1124          */
1125         if (parent_half_dead)
1126         {
1127                 buf = _bt_getbuf(rel, parent, BT_READ);
1128                 return _bt_pagedel(rel, buf, vacuum_full) + 1;
1129         }
1130         if (parent_one_child && rightsib_empty)
1131         {
1132                 buf = _bt_getbuf(rel, rightsib, BT_READ);
1133                 return _bt_pagedel(rel, buf, vacuum_full) + 1;
1134         }
1135
1136         return 1;
1137 }