OSDN Git Service

Reduce impact of btree page reuse on Hot Standby by fixing off-by-1 error.
[pg-rex/syncrep.git] / src / backend / access / nbtree / nbtpage.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtpage.c
4  *        BTree-specific page management code for the Postgres btree access
5  *        method.
6  *
7  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        src/backend/access/nbtree/nbtpage.c
13  *
14  *      NOTES
15  *         Postgres btree pages look like ordinary relation pages.      The opaque
16  *         data at high addresses includes pointers to left and right siblings
17  *         and flag data describing page state.  The first page in a btree, page
18  *         zero, is special -- it stores meta-information describing the tree.
19  *         Pages one and higher store the actual tree data.
20  *
21  *-------------------------------------------------------------------------
22  */
23 #include "postgres.h"
24
25 #include "access/nbtree.h"
26 #include "access/transam.h"
27 #include "miscadmin.h"
28 #include "storage/bufmgr.h"
29 #include "storage/freespace.h"
30 #include "storage/indexfsm.h"
31 #include "storage/lmgr.h"
32 #include "storage/predicate.h"
33 #include "utils/inval.h"
34 #include "utils/snapmgr.h"
35
36
37 /*
38  *      _bt_initmetapage() -- Fill a page buffer with a correct metapage image
39  */
40 void
41 _bt_initmetapage(Page page, BlockNumber rootbknum, uint32 level)
42 {
43         BTMetaPageData *metad;
44         BTPageOpaque metaopaque;
45
46         _bt_pageinit(page, BLCKSZ);
47
48         metad = BTPageGetMeta(page);
49         metad->btm_magic = BTREE_MAGIC;
50         metad->btm_version = BTREE_VERSION;
51         metad->btm_root = rootbknum;
52         metad->btm_level = level;
53         metad->btm_fastroot = rootbknum;
54         metad->btm_fastlevel = level;
55
56         metaopaque = (BTPageOpaque) PageGetSpecialPointer(page);
57         metaopaque->btpo_flags = BTP_META;
58
59         /*
60          * Set pd_lower just past the end of the metadata.      This is not essential
61          * but it makes the page look compressible to xlog.c.
62          */
63         ((PageHeader) page)->pd_lower =
64                 ((char *) metad + sizeof(BTMetaPageData)) - (char *) page;
65 }
66
67 /*
68  *      _bt_getroot() -- Get the root page of the btree.
69  *
70  *              Since the root page can move around the btree file, we have to read
71  *              its location from the metadata page, and then read the root page
72  *              itself.  If no root page exists yet, we have to create one.  The
73  *              standard class of race conditions exists here; I think I covered
74  *              them all in the Hopi Indian rain dance of lock requests below.
75  *
76  *              The access type parameter (BT_READ or BT_WRITE) controls whether
77  *              a new root page will be created or not.  If access = BT_READ,
78  *              and no root page exists, we just return InvalidBuffer.  For
79  *              BT_WRITE, we try to create the root page if it doesn't exist.
80  *              NOTE that the returned root page will have only a read lock set
81  *              on it even if access = BT_WRITE!
82  *
83  *              The returned page is not necessarily the true root --- it could be
84  *              a "fast root" (a page that is alone in its level due to deletions).
85  *              Also, if the root page is split while we are "in flight" to it,
86  *              what we will return is the old root, which is now just the leftmost
87  *              page on a probably-not-very-wide level.  For most purposes this is
88  *              as good as or better than the true root, so we do not bother to
89  *              insist on finding the true root.  We do, however, guarantee to
90  *              return a live (not deleted or half-dead) page.
91  *
92  *              On successful return, the root page is pinned and read-locked.
93  *              The metadata page is not locked or pinned on exit.
94  */
95 Buffer
96 _bt_getroot(Relation rel, int access)
97 {
98         Buffer          metabuf;
99         Page            metapg;
100         BTPageOpaque metaopaque;
101         Buffer          rootbuf;
102         Page            rootpage;
103         BTPageOpaque rootopaque;
104         BlockNumber rootblkno;
105         uint32          rootlevel;
106         BTMetaPageData *metad;
107
108         /*
109          * Try to use previously-cached metapage data to find the root.  This
110          * normally saves one buffer access per index search, which is a very
111          * helpful savings in bufmgr traffic and hence contention.
112          */
113         if (rel->rd_amcache != NULL)
114         {
115                 metad = (BTMetaPageData *) rel->rd_amcache;
116                 /* We shouldn't have cached it if any of these fail */
117                 Assert(metad->btm_magic == BTREE_MAGIC);
118                 Assert(metad->btm_version == BTREE_VERSION);
119                 Assert(metad->btm_root != P_NONE);
120
121                 rootblkno = metad->btm_fastroot;
122                 Assert(rootblkno != P_NONE);
123                 rootlevel = metad->btm_fastlevel;
124
125                 rootbuf = _bt_getbuf(rel, rootblkno, BT_READ);
126                 rootpage = BufferGetPage(rootbuf);
127                 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
128
129                 /*
130                  * Since the cache might be stale, we check the page more carefully
131                  * here than normal.  We *must* check that it's not deleted. If it's
132                  * not alone on its level, then we reject too --- this may be overly
133                  * paranoid but better safe than sorry.  Note we don't check P_ISROOT,
134                  * because that's not set in a "fast root".
135                  */
136                 if (!P_IGNORE(rootopaque) &&
137                         rootopaque->btpo.level == rootlevel &&
138                         P_LEFTMOST(rootopaque) &&
139                         P_RIGHTMOST(rootopaque))
140                 {
141                         /* OK, accept cached page as the root */
142                         return rootbuf;
143                 }
144                 _bt_relbuf(rel, rootbuf);
145                 /* Cache is stale, throw it away */
146                 if (rel->rd_amcache)
147                         pfree(rel->rd_amcache);
148                 rel->rd_amcache = NULL;
149         }
150
151         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
152         metapg = BufferGetPage(metabuf);
153         metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg);
154         metad = BTPageGetMeta(metapg);
155
156         /* sanity-check the metapage */
157         if (!(metaopaque->btpo_flags & BTP_META) ||
158                 metad->btm_magic != BTREE_MAGIC)
159                 ereport(ERROR,
160                                 (errcode(ERRCODE_INDEX_CORRUPTED),
161                                  errmsg("index \"%s\" is not a btree",
162                                                 RelationGetRelationName(rel))));
163
164         if (metad->btm_version != BTREE_VERSION)
165                 ereport(ERROR,
166                                 (errcode(ERRCODE_INDEX_CORRUPTED),
167                                  errmsg("version mismatch in index \"%s\": file version %d, code version %d",
168                                                 RelationGetRelationName(rel),
169                                                 metad->btm_version, BTREE_VERSION)));
170
171         /* if no root page initialized yet, do it */
172         if (metad->btm_root == P_NONE)
173         {
174                 /* If access = BT_READ, caller doesn't want us to create root yet */
175                 if (access == BT_READ)
176                 {
177                         _bt_relbuf(rel, metabuf);
178                         return InvalidBuffer;
179                 }
180
181                 /* trade in our read lock for a write lock */
182                 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
183                 LockBuffer(metabuf, BT_WRITE);
184
185                 /*
186                  * Race condition:      if someone else initialized the metadata between
187                  * the time we released the read lock and acquired the write lock, we
188                  * must avoid doing it again.
189                  */
190                 if (metad->btm_root != P_NONE)
191                 {
192                         /*
193                          * Metadata initialized by someone else.  In order to guarantee no
194                          * deadlocks, we have to release the metadata page and start all
195                          * over again.  (Is that really true? But it's hardly worth trying
196                          * to optimize this case.)
197                          */
198                         _bt_relbuf(rel, metabuf);
199                         return _bt_getroot(rel, access);
200                 }
201
202                 /*
203                  * Get, initialize, write, and leave a lock of the appropriate type on
204                  * the new root page.  Since this is the first page in the tree, it's
205                  * a leaf as well as the root.
206                  */
207                 rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
208                 rootblkno = BufferGetBlockNumber(rootbuf);
209                 rootpage = BufferGetPage(rootbuf);
210                 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
211                 rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
212                 rootopaque->btpo_flags = (BTP_LEAF | BTP_ROOT);
213                 rootopaque->btpo.level = 0;
214                 rootopaque->btpo_cycleid = 0;
215
216                 /* NO ELOG(ERROR) till meta is updated */
217                 START_CRIT_SECTION();
218
219                 metad->btm_root = rootblkno;
220                 metad->btm_level = 0;
221                 metad->btm_fastroot = rootblkno;
222                 metad->btm_fastlevel = 0;
223
224                 MarkBufferDirty(rootbuf);
225                 MarkBufferDirty(metabuf);
226
227                 /* XLOG stuff */
228                 if (RelationNeedsWAL(rel))
229                 {
230                         xl_btree_newroot xlrec;
231                         XLogRecPtr      recptr;
232                         XLogRecData rdata;
233
234                         xlrec.node = rel->rd_node;
235                         xlrec.rootblk = rootblkno;
236                         xlrec.level = 0;
237
238                         rdata.data = (char *) &xlrec;
239                         rdata.len = SizeOfBtreeNewroot;
240                         rdata.buffer = InvalidBuffer;
241                         rdata.next = NULL;
242
243                         recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, &rdata);
244
245                         PageSetLSN(rootpage, recptr);
246                         PageSetTLI(rootpage, ThisTimeLineID);
247                         PageSetLSN(metapg, recptr);
248                         PageSetTLI(metapg, ThisTimeLineID);
249                 }
250
251                 END_CRIT_SECTION();
252
253                 /*
254                  * Send out relcache inval for metapage change (probably unnecessary
255                  * here, but let's be safe).
256                  */
257                 CacheInvalidateRelcache(rel);
258
259                 /*
260                  * swap root write lock for read lock.  There is no danger of anyone
261                  * else accessing the new root page while it's unlocked, since no one
262                  * else knows where it is yet.
263                  */
264                 LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK);
265                 LockBuffer(rootbuf, BT_READ);
266
267                 /* okay, metadata is correct, release lock on it */
268                 _bt_relbuf(rel, metabuf);
269         }
270         else
271         {
272                 rootblkno = metad->btm_fastroot;
273                 Assert(rootblkno != P_NONE);
274                 rootlevel = metad->btm_fastlevel;
275
276                 /*
277                  * Cache the metapage data for next time
278                  */
279                 rel->rd_amcache = MemoryContextAlloc(rel->rd_indexcxt,
280                                                                                          sizeof(BTMetaPageData));
281                 memcpy(rel->rd_amcache, metad, sizeof(BTMetaPageData));
282
283                 /*
284                  * We are done with the metapage; arrange to release it via first
285                  * _bt_relandgetbuf call
286                  */
287                 rootbuf = metabuf;
288
289                 for (;;)
290                 {
291                         rootbuf = _bt_relandgetbuf(rel, rootbuf, rootblkno, BT_READ);
292                         rootpage = BufferGetPage(rootbuf);
293                         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
294
295                         if (!P_IGNORE(rootopaque))
296                                 break;
297
298                         /* it's dead, Jim.  step right one page */
299                         if (P_RIGHTMOST(rootopaque))
300                                 elog(ERROR, "no live root page found in index \"%s\"",
301                                          RelationGetRelationName(rel));
302                         rootblkno = rootopaque->btpo_next;
303                 }
304
305                 /* Note: can't check btpo.level on deleted pages */
306                 if (rootopaque->btpo.level != rootlevel)
307                         elog(ERROR, "root page %u of index \"%s\" has level %u, expected %u",
308                                  rootblkno, RelationGetRelationName(rel),
309                                  rootopaque->btpo.level, rootlevel);
310         }
311
312         /*
313          * By here, we have a pin and read lock on the root page, and no lock set
314          * on the metadata page.  Return the root page's buffer.
315          */
316         return rootbuf;
317 }
318
319 /*
320  *      _bt_gettrueroot() -- Get the true root page of the btree.
321  *
322  *              This is the same as the BT_READ case of _bt_getroot(), except
323  *              we follow the true-root link not the fast-root link.
324  *
325  * By the time we acquire lock on the root page, it might have been split and
326  * not be the true root anymore.  This is okay for the present uses of this
327  * routine; we only really need to be able to move up at least one tree level
328  * from whatever non-root page we were at.      If we ever do need to lock the
329  * one true root page, we could loop here, re-reading the metapage on each
330  * failure.  (Note that it wouldn't do to hold the lock on the metapage while
331  * moving to the root --- that'd deadlock against any concurrent root split.)
332  */
333 Buffer
334 _bt_gettrueroot(Relation rel)
335 {
336         Buffer          metabuf;
337         Page            metapg;
338         BTPageOpaque metaopaque;
339         Buffer          rootbuf;
340         Page            rootpage;
341         BTPageOpaque rootopaque;
342         BlockNumber rootblkno;
343         uint32          rootlevel;
344         BTMetaPageData *metad;
345
346         /*
347          * We don't try to use cached metapage data here, since (a) this path is
348          * not performance-critical, and (b) if we are here it suggests our cache
349          * is out-of-date anyway.  In light of point (b), it's probably safest to
350          * actively flush any cached metapage info.
351          */
352         if (rel->rd_amcache)
353                 pfree(rel->rd_amcache);
354         rel->rd_amcache = NULL;
355
356         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
357         metapg = BufferGetPage(metabuf);
358         metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg);
359         metad = BTPageGetMeta(metapg);
360
361         if (!(metaopaque->btpo_flags & BTP_META) ||
362                 metad->btm_magic != BTREE_MAGIC)
363                 ereport(ERROR,
364                                 (errcode(ERRCODE_INDEX_CORRUPTED),
365                                  errmsg("index \"%s\" is not a btree",
366                                                 RelationGetRelationName(rel))));
367
368         if (metad->btm_version != BTREE_VERSION)
369                 ereport(ERROR,
370                                 (errcode(ERRCODE_INDEX_CORRUPTED),
371                                  errmsg("version mismatch in index \"%s\": file version %d, code version %d",
372                                                 RelationGetRelationName(rel),
373                                                 metad->btm_version, BTREE_VERSION)));
374
375         /* if no root page initialized yet, fail */
376         if (metad->btm_root == P_NONE)
377         {
378                 _bt_relbuf(rel, metabuf);
379                 return InvalidBuffer;
380         }
381
382         rootblkno = metad->btm_root;
383         rootlevel = metad->btm_level;
384
385         /*
386          * We are done with the metapage; arrange to release it via first
387          * _bt_relandgetbuf call
388          */
389         rootbuf = metabuf;
390
391         for (;;)
392         {
393                 rootbuf = _bt_relandgetbuf(rel, rootbuf, rootblkno, BT_READ);
394                 rootpage = BufferGetPage(rootbuf);
395                 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
396
397                 if (!P_IGNORE(rootopaque))
398                         break;
399
400                 /* it's dead, Jim.  step right one page */
401                 if (P_RIGHTMOST(rootopaque))
402                         elog(ERROR, "no live root page found in index \"%s\"",
403                                  RelationGetRelationName(rel));
404                 rootblkno = rootopaque->btpo_next;
405         }
406
407         /* Note: can't check btpo.level on deleted pages */
408         if (rootopaque->btpo.level != rootlevel)
409                 elog(ERROR, "root page %u of index \"%s\" has level %u, expected %u",
410                          rootblkno, RelationGetRelationName(rel),
411                          rootopaque->btpo.level, rootlevel);
412
413         return rootbuf;
414 }
415
416 /*
417  *      _bt_checkpage() -- Verify that a freshly-read page looks sane.
418  */
419 void
420 _bt_checkpage(Relation rel, Buffer buf)
421 {
422         Page            page = BufferGetPage(buf);
423
424         /*
425          * ReadBuffer verifies that every newly-read page passes
426          * PageHeaderIsValid, which means it either contains a reasonably sane
427          * page header or is all-zero.  We have to defend against the all-zero
428          * case, however.
429          */
430         if (PageIsNew(page))
431                 ereport(ERROR,
432                                 (errcode(ERRCODE_INDEX_CORRUPTED),
433                          errmsg("index \"%s\" contains unexpected zero page at block %u",
434                                         RelationGetRelationName(rel),
435                                         BufferGetBlockNumber(buf)),
436                                  errhint("Please REINDEX it.")));
437
438         /*
439          * Additionally check that the special area looks sane.
440          */
441         if (PageGetSpecialSize(page) != MAXALIGN(sizeof(BTPageOpaqueData)))
442                 ereport(ERROR,
443                                 (errcode(ERRCODE_INDEX_CORRUPTED),
444                                  errmsg("index \"%s\" contains corrupted page at block %u",
445                                                 RelationGetRelationName(rel),
446                                                 BufferGetBlockNumber(buf)),
447                                  errhint("Please REINDEX it.")));
448 }
449
450 /*
451  * Log the reuse of a page from the FSM.
452  */
453 static void
454 _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedXid)
455 {
456         if (!RelationNeedsWAL(rel))
457                 return;
458
459         /* No ereport(ERROR) until changes are logged */
460         START_CRIT_SECTION();
461
462         /*
463          * We don't do MarkBufferDirty here because we're about initialise the
464          * page, and nobody else can see it yet.
465          */
466
467         /* XLOG stuff */
468         {
469                 XLogRecData rdata[1];
470                 xl_btree_reuse_page xlrec_reuse;
471
472                 xlrec_reuse.node = rel->rd_node;
473                 xlrec_reuse.block = blkno;
474                 xlrec_reuse.latestRemovedXid = latestRemovedXid;
475                 rdata[0].data = (char *) &xlrec_reuse;
476                 rdata[0].len = SizeOfBtreeReusePage;
477                 rdata[0].buffer = InvalidBuffer;
478                 rdata[0].next = NULL;
479
480                 XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE, rdata);
481
482                 /*
483                  * We don't do PageSetLSN or PageSetTLI here because we're about
484                  * initialise the page, so no need.
485                  */
486         }
487
488         END_CRIT_SECTION();
489 }
490
491 /*
492  *      _bt_getbuf() -- Get a buffer by block number for read or write.
493  *
494  *              blkno == P_NEW means to get an unallocated index page.  The page
495  *              will be initialized before returning it.
496  *
497  *              When this routine returns, the appropriate lock is set on the
498  *              requested buffer and its reference count has been incremented
499  *              (ie, the buffer is "locked and pinned").  Also, we apply
500  *              _bt_checkpage to sanity-check the page (except in P_NEW case).
501  */
502 Buffer
503 _bt_getbuf(Relation rel, BlockNumber blkno, int access)
504 {
505         Buffer          buf;
506
507         if (blkno != P_NEW)
508         {
509                 /* Read an existing block of the relation */
510                 buf = ReadBuffer(rel, blkno);
511                 LockBuffer(buf, access);
512                 _bt_checkpage(rel, buf);
513         }
514         else
515         {
516                 bool            needLock;
517                 Page            page;
518
519                 Assert(access == BT_WRITE);
520
521                 /*
522                  * First see if the FSM knows of any free pages.
523                  *
524                  * We can't trust the FSM's report unreservedly; we have to check that
525                  * the page is still free.      (For example, an already-free page could
526                  * have been re-used between the time the last VACUUM scanned it and
527                  * the time the VACUUM made its FSM updates.)
528                  *
529                  * In fact, it's worse than that: we can't even assume that it's safe
530                  * to take a lock on the reported page.  If somebody else has a lock
531                  * on it, or even worse our own caller does, we could deadlock.  (The
532                  * own-caller scenario is actually not improbable. Consider an index
533                  * on a serial or timestamp column.  Nearly all splits will be at the
534                  * rightmost page, so it's entirely likely that _bt_split will call us
535                  * while holding a lock on the page most recently acquired from FSM. A
536                  * VACUUM running concurrently with the previous split could well have
537                  * placed that page back in FSM.)
538                  *
539                  * To get around that, we ask for only a conditional lock on the
540                  * reported page.  If we fail, then someone else is using the page,
541                  * and we may reasonably assume it's not free.  (If we happen to be
542                  * wrong, the worst consequence is the page will be lost to use till
543                  * the next VACUUM, which is no big problem.)
544                  */
545                 for (;;)
546                 {
547                         blkno = GetFreeIndexPage(rel);
548                         if (blkno == InvalidBlockNumber)
549                                 break;
550                         buf = ReadBuffer(rel, blkno);
551                         if (ConditionalLockBuffer(buf))
552                         {
553                                 page = BufferGetPage(buf);
554                                 if (_bt_page_recyclable(page))
555                                 {
556                                         /*
557                                          * If we are generating WAL for Hot Standby then create a
558                                          * WAL record that will allow us to conflict with queries
559                                          * running on standby.
560                                          */
561                                         if (XLogStandbyInfoActive())
562                                         {
563                                                 TransactionId latestRemovedXid;
564
565                                                 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
566
567                                                 /*
568                                                  * opaque->btpo.xact is the threshold value not the
569                                                  * value to measure conflicts against. We must retreat
570                                                  * by one from it to get the correct conflict xid.
571                                                  */
572                                                 latestRemovedXid = opaque->btpo.xact;
573                                                 TransactionIdRetreat(latestRemovedXid);
574
575                                                 _bt_log_reuse_page(rel, blkno, latestRemovedXid);
576                                         }
577
578                                         /* Okay to use page.  Re-initialize and return it */
579                                         _bt_pageinit(page, BufferGetPageSize(buf));
580                                         return buf;
581                                 }
582                                 elog(DEBUG2, "FSM returned nonrecyclable page");
583                                 _bt_relbuf(rel, buf);
584                         }
585                         else
586                         {
587                                 elog(DEBUG2, "FSM returned nonlockable page");
588                                 /* couldn't get lock, so just drop pin */
589                                 ReleaseBuffer(buf);
590                         }
591                 }
592
593                 /*
594                  * Extend the relation by one page.
595                  *
596                  * We have to use a lock to ensure no one else is extending the rel at
597                  * the same time, else we will both try to initialize the same new
598                  * page.  We can skip locking for new or temp relations, however,
599                  * since no one else could be accessing them.
600                  */
601                 needLock = !RELATION_IS_LOCAL(rel);
602
603                 if (needLock)
604                         LockRelationForExtension(rel, ExclusiveLock);
605
606                 buf = ReadBuffer(rel, P_NEW);
607
608                 /* Acquire buffer lock on new page */
609                 LockBuffer(buf, BT_WRITE);
610
611                 /*
612                  * Release the file-extension lock; it's now OK for someone else to
613                  * extend the relation some more.  Note that we cannot release this
614                  * lock before we have buffer lock on the new page, or we risk a race
615                  * condition against btvacuumscan --- see comments therein.
616                  */
617                 if (needLock)
618                         UnlockRelationForExtension(rel, ExclusiveLock);
619
620                 /* Initialize the new page before returning it */
621                 page = BufferGetPage(buf);
622                 Assert(PageIsNew(page));
623                 _bt_pageinit(page, BufferGetPageSize(buf));
624         }
625
626         /* ref count and lock type are correct */
627         return buf;
628 }
629
630 /*
631  *      _bt_relandgetbuf() -- release a locked buffer and get another one.
632  *
633  * This is equivalent to _bt_relbuf followed by _bt_getbuf, with the
634  * exception that blkno may not be P_NEW.  Also, if obuf is InvalidBuffer
635  * then it reduces to just _bt_getbuf; allowing this case simplifies some
636  * callers.
637  *
638  * The original motivation for using this was to avoid two entries to the
639  * bufmgr when one would do.  However, now it's mainly just a notational
640  * convenience.  The only case where it saves work over _bt_relbuf/_bt_getbuf
641  * is when the target page is the same one already in the buffer.
642  */
643 Buffer
644 _bt_relandgetbuf(Relation rel, Buffer obuf, BlockNumber blkno, int access)
645 {
646         Buffer          buf;
647
648         Assert(blkno != P_NEW);
649         if (BufferIsValid(obuf))
650                 LockBuffer(obuf, BUFFER_LOCK_UNLOCK);
651         buf = ReleaseAndReadBuffer(obuf, rel, blkno);
652         LockBuffer(buf, access);
653         _bt_checkpage(rel, buf);
654         return buf;
655 }
656
657 /*
658  *      _bt_relbuf() -- release a locked buffer.
659  *
660  * Lock and pin (refcount) are both dropped.
661  */
662 void
663 _bt_relbuf(Relation rel, Buffer buf)
664 {
665         UnlockReleaseBuffer(buf);
666 }
667
668 /*
669  *      _bt_pageinit() -- Initialize a new page.
670  *
671  * On return, the page header is initialized; data space is empty;
672  * special space is zeroed out.
673  */
674 void
675 _bt_pageinit(Page page, Size size)
676 {
677         PageInit(page, size, sizeof(BTPageOpaqueData));
678 }
679
680 /*
681  *      _bt_page_recyclable() -- Is an existing page recyclable?
682  *
683  * This exists to make sure _bt_getbuf and btvacuumscan have the same
684  * policy about whether a page is safe to re-use.
685  */
686 bool
687 _bt_page_recyclable(Page page)
688 {
689         BTPageOpaque opaque;
690         TransactionId cutoff;
691
692         /*
693          * It's possible to find an all-zeroes page in an index --- for example, a
694          * backend might successfully extend the relation one page and then crash
695          * before it is able to make a WAL entry for adding the page. If we find a
696          * zeroed page then reclaim it.
697          */
698         if (PageIsNew(page))
699                 return true;
700
701         /*
702          * Otherwise, recycle if deleted and too old to have any processes
703          * interested in it.  If we are generating records for Hot Standby
704          * defer page recycling until RecentGlobalXmin to respect user
705          * controls specified by vacuum_defer_cleanup_age or hot_standby_feedback.
706          */
707         if (XLogStandbyInfoActive())
708                 cutoff = RecentGlobalXmin;
709         else
710                 cutoff = RecentXmin;
711
712         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
713         if (P_ISDELETED(opaque) &&
714                 TransactionIdPrecedesOrEquals(opaque->btpo.xact, cutoff))
715                 return true;
716         return false;
717 }
718
719 /*
720  * Delete item(s) from a btree page.
721  *
722  * This must only be used for deleting leaf items.      Deleting an item on a
723  * non-leaf page has to be done as part of an atomic action that includes
724  * deleting the page it points to.
725  *
726  * This routine assumes that the caller has pinned and locked the buffer.
727  * Also, the given itemnos *must* appear in increasing order in the array.
728  *
729  * We record VACUUMs and b-tree deletes differently in WAL. InHotStandby
730  * we need to be able to pin all of the blocks in the btree in physical
731  * order when replaying the effects of a VACUUM, just as we do for the
732  * original VACUUM itself. lastBlockVacuumed allows us to tell whether an
733  * intermediate range of blocks has had no changes at all by VACUUM,
734  * and so must be scanned anyway during replay. We always write a WAL record
735  * for the last block in the index, whether or not it contained any items
736  * to be removed. This allows us to scan right up to end of index to
737  * ensure correct locking.
738  */
739 void
740 _bt_delitems_vacuum(Relation rel, Buffer buf,
741                         OffsetNumber *itemnos, int nitems, BlockNumber lastBlockVacuumed)
742 {
743         Page            page = BufferGetPage(buf);
744         BTPageOpaque opaque;
745
746         /* No ereport(ERROR) until changes are logged */
747         START_CRIT_SECTION();
748
749         /* Fix the page */
750         if (nitems > 0)
751                 PageIndexMultiDelete(page, itemnos, nitems);
752
753         /*
754          * We can clear the vacuum cycle ID since this page has certainly been
755          * processed by the current vacuum scan.
756          */
757         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
758         opaque->btpo_cycleid = 0;
759
760         /*
761          * Mark the page as not containing any LP_DEAD items.  This is not
762          * certainly true (there might be some that have recently been marked, but
763          * weren't included in our target-item list), but it will almost always be
764          * true and it doesn't seem worth an additional page scan to check it.
765          * Remember that BTP_HAS_GARBAGE is only a hint anyway.
766          */
767         opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
768
769         MarkBufferDirty(buf);
770
771         /* XLOG stuff */
772         if (RelationNeedsWAL(rel))
773         {
774                 XLogRecPtr      recptr;
775                 XLogRecData rdata[2];
776
777                 xl_btree_vacuum xlrec_vacuum;
778
779                 xlrec_vacuum.node = rel->rd_node;
780                 xlrec_vacuum.block = BufferGetBlockNumber(buf);
781
782                 xlrec_vacuum.lastBlockVacuumed = lastBlockVacuumed;
783                 rdata[0].data = (char *) &xlrec_vacuum;
784                 rdata[0].len = SizeOfBtreeVacuum;
785                 rdata[0].buffer = InvalidBuffer;
786                 rdata[0].next = &(rdata[1]);
787
788                 /*
789                  * The target-offsets array is not in the buffer, but pretend that it
790                  * is.  When XLogInsert stores the whole buffer, the offsets array
791                  * need not be stored too.
792                  */
793                 if (nitems > 0)
794                 {
795                         rdata[1].data = (char *) itemnos;
796                         rdata[1].len = nitems * sizeof(OffsetNumber);
797                 }
798                 else
799                 {
800                         rdata[1].data = NULL;
801                         rdata[1].len = 0;
802                 }
803                 rdata[1].buffer = buf;
804                 rdata[1].buffer_std = true;
805                 rdata[1].next = NULL;
806
807                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_VACUUM, rdata);
808
809                 PageSetLSN(page, recptr);
810                 PageSetTLI(page, ThisTimeLineID);
811         }
812
813         END_CRIT_SECTION();
814 }
815
816 void
817 _bt_delitems_delete(Relation rel, Buffer buf,
818                                         OffsetNumber *itemnos, int nitems, Relation heapRel)
819 {
820         Page            page = BufferGetPage(buf);
821         BTPageOpaque opaque;
822
823         Assert(nitems > 0);
824
825         /* No ereport(ERROR) until changes are logged */
826         START_CRIT_SECTION();
827
828         /* Fix the page */
829         PageIndexMultiDelete(page, itemnos, nitems);
830
831         /*
832          * We can clear the vacuum cycle ID since this page has certainly been
833          * processed by the current vacuum scan.
834          */
835         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
836         opaque->btpo_cycleid = 0;
837
838         /*
839          * Mark the page as not containing any LP_DEAD items.  This is not
840          * certainly true (there might be some that have recently been marked, but
841          * weren't included in our target-item list), but it will almost always be
842          * true and it doesn't seem worth an additional page scan to check it.
843          * Remember that BTP_HAS_GARBAGE is only a hint anyway.
844          */
845         opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
846
847         MarkBufferDirty(buf);
848
849         /* XLOG stuff */
850         if (RelationNeedsWAL(rel))
851         {
852                 XLogRecPtr      recptr;
853                 XLogRecData rdata[3];
854
855                 xl_btree_delete xlrec_delete;
856
857                 xlrec_delete.node = rel->rd_node;
858                 xlrec_delete.hnode = heapRel->rd_node;
859                 xlrec_delete.block = BufferGetBlockNumber(buf);
860                 xlrec_delete.nitems = nitems;
861
862                 rdata[0].data = (char *) &xlrec_delete;
863                 rdata[0].len = SizeOfBtreeDelete;
864                 rdata[0].buffer = InvalidBuffer;
865                 rdata[0].next = &(rdata[1]);
866
867                 /*
868                  * We need the target-offsets array whether or not we store the to
869                  * allow us to find the latestRemovedXid on a standby server.
870                  */
871                 rdata[1].data = (char *) itemnos;
872                 rdata[1].len = nitems * sizeof(OffsetNumber);
873                 rdata[1].buffer = InvalidBuffer;
874                 rdata[1].next = &(rdata[2]);
875
876                 rdata[2].data = NULL;
877                 rdata[2].len = 0;
878                 rdata[2].buffer = buf;
879                 rdata[2].buffer_std = true;
880                 rdata[2].next = NULL;
881
882                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata);
883
884                 PageSetLSN(page, recptr);
885                 PageSetTLI(page, ThisTimeLineID);
886         }
887
888         END_CRIT_SECTION();
889 }
890
891 /*
892  * Subroutine to pre-check whether a page deletion is safe, that is, its
893  * parent page would be left in a valid or deletable state.
894  *
895  * "target" is the page we wish to delete, and "stack" is a search stack
896  * leading to it (approximately).  Note that we will update the stack
897  * entry(s) to reflect current downlink positions --- this is harmless and
898  * indeed saves later search effort in _bt_pagedel.
899  *
900  * Note: it's OK to release page locks after checking, because a safe
901  * deletion can't become unsafe due to concurrent activity.  A non-rightmost
902  * page cannot become rightmost unless there's a concurrent page deletion,
903  * but only VACUUM does page deletion and we only allow one VACUUM on an index
904  * at a time.  An only child could acquire a sibling (of the same parent) only
905  * by being split ... but that would make it a non-rightmost child so the
906  * deletion is still safe.
907  */
908 static bool
909 _bt_parent_deletion_safe(Relation rel, BlockNumber target, BTStack stack)
910 {
911         BlockNumber parent;
912         OffsetNumber poffset,
913                                 maxoff;
914         Buffer          pbuf;
915         Page            page;
916         BTPageOpaque opaque;
917
918         /*
919          * In recovery mode, assume the deletion being replayed is valid.  We
920          * can't always check it because we won't have a full search stack, and we
921          * should complain if there's a problem, anyway.
922          */
923         if (InRecovery)
924                 return true;
925
926         /* Locate the parent's downlink (updating the stack entry if needed) */
927         ItemPointerSet(&(stack->bts_btentry.t_tid), target, P_HIKEY);
928         pbuf = _bt_getstackbuf(rel, stack, BT_READ);
929         if (pbuf == InvalidBuffer)
930                 elog(ERROR, "failed to re-find parent key in index \"%s\" for deletion target page %u",
931                          RelationGetRelationName(rel), target);
932         parent = stack->bts_blkno;
933         poffset = stack->bts_offset;
934
935         page = BufferGetPage(pbuf);
936         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
937         maxoff = PageGetMaxOffsetNumber(page);
938
939         /*
940          * If the target is the rightmost child of its parent, then we can't
941          * delete, unless it's also the only child.
942          */
943         if (poffset >= maxoff)
944         {
945                 /* It's rightmost child... */
946                 if (poffset == P_FIRSTDATAKEY(opaque))
947                 {
948                         /*
949                          * It's only child, so safe if parent would itself be removable.
950                          * We have to check the parent itself, and then recurse to test
951                          * the conditions at the parent's parent.
952                          */
953                         if (P_RIGHTMOST(opaque) || P_ISROOT(opaque))
954                         {
955                                 _bt_relbuf(rel, pbuf);
956                                 return false;
957                         }
958
959                         _bt_relbuf(rel, pbuf);
960                         return _bt_parent_deletion_safe(rel, parent, stack->bts_parent);
961                 }
962                 else
963                 {
964                         /* Unsafe to delete */
965                         _bt_relbuf(rel, pbuf);
966                         return false;
967                 }
968         }
969         else
970         {
971                 /* Not rightmost child, so safe to delete */
972                 _bt_relbuf(rel, pbuf);
973                 return true;
974         }
975 }
976
977 /*
978  * _bt_pagedel() -- Delete a page from the b-tree, if legal to do so.
979  *
980  * This action unlinks the page from the b-tree structure, removing all
981  * pointers leading to it --- but not touching its own left and right links.
982  * The page cannot be physically reclaimed right away, since other processes
983  * may currently be trying to follow links leading to the page; they have to
984  * be allowed to use its right-link to recover.  See nbtree/README.
985  *
986  * On entry, the target buffer must be pinned and locked (either read or write
987  * lock is OK).  This lock and pin will be dropped before exiting.
988  *
989  * The "stack" argument can be a search stack leading (approximately) to the
990  * target page, or NULL --- outside callers typically pass NULL since they
991  * have not done such a search, but internal recursion cases pass the stack
992  * to avoid duplicated search effort.
993  *
994  * Returns the number of pages successfully deleted (zero if page cannot
995  * be deleted now; could be more than one if parent pages were deleted too).
996  *
997  * NOTE: this leaks memory.  Rather than trying to clean up everything
998  * carefully, it's better to run it in a temp context that can be reset
999  * frequently.
1000  */
1001 int
1002 _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
1003 {
1004         int                     result;
1005         BlockNumber target,
1006                                 leftsib,
1007                                 rightsib,
1008                                 parent;
1009         OffsetNumber poffset,
1010                                 maxoff;
1011         uint32          targetlevel,
1012                                 ilevel;
1013         ItemId          itemid;
1014         IndexTuple      targetkey,
1015                                 itup;
1016         ScanKey         itup_scankey;
1017         Buffer          lbuf,
1018                                 rbuf,
1019                                 pbuf;
1020         bool            parent_half_dead;
1021         bool            parent_one_child;
1022         bool            rightsib_empty;
1023         Buffer          metabuf = InvalidBuffer;
1024         Page            metapg = NULL;
1025         BTMetaPageData *metad = NULL;
1026         Page            page;
1027         BTPageOpaque opaque;
1028
1029         /*
1030          * We can never delete rightmost pages nor root pages.  While at it, check
1031          * that page is not already deleted and is empty.
1032          */
1033         page = BufferGetPage(buf);
1034         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1035         if (P_RIGHTMOST(opaque) || P_ISROOT(opaque) || P_ISDELETED(opaque) ||
1036                 P_FIRSTDATAKEY(opaque) <= PageGetMaxOffsetNumber(page))
1037         {
1038                 /* Should never fail to delete a half-dead page */
1039                 Assert(!P_ISHALFDEAD(opaque));
1040
1041                 _bt_relbuf(rel, buf);
1042                 return 0;
1043         }
1044
1045         /*
1046          * Save info about page, including a copy of its high key (it must have
1047          * one, being non-rightmost).
1048          */
1049         target = BufferGetBlockNumber(buf);
1050         targetlevel = opaque->btpo.level;
1051         leftsib = opaque->btpo_prev;
1052         itemid = PageGetItemId(page, P_HIKEY);
1053         targetkey = CopyIndexTuple((IndexTuple) PageGetItem(page, itemid));
1054
1055         /*
1056          * To avoid deadlocks, we'd better drop the target page lock before going
1057          * further.
1058          */
1059         _bt_relbuf(rel, buf);
1060
1061         /*
1062          * We need an approximate pointer to the page's parent page.  We use the
1063          * standard search mechanism to search for the page's high key; this will
1064          * give us a link to either the current parent or someplace to its left
1065          * (if there are multiple equal high keys).  In recursion cases, the
1066          * caller already generated a search stack and we can just re-use that
1067          * work.
1068          */
1069         if (stack == NULL)
1070         {
1071                 if (!InRecovery)
1072                 {
1073                         /* we need an insertion scan key to do our search, so build one */
1074                         itup_scankey = _bt_mkscankey(rel, targetkey);
1075                         /* find the leftmost leaf page containing this key */
1076                         stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey, false,
1077                                                            &lbuf, BT_READ);
1078                         /* don't need a pin on that either */
1079                         _bt_relbuf(rel, lbuf);
1080
1081                         /*
1082                          * If we are trying to delete an interior page, _bt_search did
1083                          * more than we needed.  Locate the stack item pointing to our
1084                          * parent level.
1085                          */
1086                         ilevel = 0;
1087                         for (;;)
1088                         {
1089                                 if (stack == NULL)
1090                                         elog(ERROR, "not enough stack items");
1091                                 if (ilevel == targetlevel)
1092                                         break;
1093                                 stack = stack->bts_parent;
1094                                 ilevel++;
1095                         }
1096                 }
1097                 else
1098                 {
1099                         /*
1100                          * During WAL recovery, we can't use _bt_search (for one reason,
1101                          * it might invoke user-defined comparison functions that expect
1102                          * facilities not available in recovery mode).  Instead, just set
1103                          * up a dummy stack pointing to the left end of the parent tree
1104                          * level, from which _bt_getstackbuf will walk right to the parent
1105                          * page.  Painful, but we don't care too much about performance in
1106                          * this scenario.
1107                          */
1108                         pbuf = _bt_get_endpoint(rel, targetlevel + 1, false);
1109                         stack = (BTStack) palloc(sizeof(BTStackData));
1110                         stack->bts_blkno = BufferGetBlockNumber(pbuf);
1111                         stack->bts_offset = InvalidOffsetNumber;
1112                         /* bts_btentry will be initialized below */
1113                         stack->bts_parent = NULL;
1114                         _bt_relbuf(rel, pbuf);
1115                 }
1116         }
1117
1118         /*
1119          * We cannot delete a page that is the rightmost child of its immediate
1120          * parent, unless it is the only child --- in which case the parent has to
1121          * be deleted too, and the same condition applies recursively to it. We
1122          * have to check this condition all the way up before trying to delete. We
1123          * don't need to re-test when deleting a non-leaf page, though.
1124          */
1125         if (targetlevel == 0 &&
1126                 !_bt_parent_deletion_safe(rel, target, stack))
1127                 return 0;
1128
1129         /*
1130          * We have to lock the pages we need to modify in the standard order:
1131          * moving right, then up.  Else we will deadlock against other writers.
1132          *
1133          * So, we need to find and write-lock the current left sibling of the
1134          * target page.  The sibling that was current a moment ago could have
1135          * split, so we may have to move right.  This search could fail if either
1136          * the sibling or the target page was deleted by someone else meanwhile;
1137          * if so, give up.      (Right now, that should never happen, since page
1138          * deletion is only done in VACUUM and there shouldn't be multiple VACUUMs
1139          * concurrently on the same table.)
1140          */
1141         if (leftsib != P_NONE)
1142         {
1143                 lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
1144                 page = BufferGetPage(lbuf);
1145                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1146                 while (P_ISDELETED(opaque) || opaque->btpo_next != target)
1147                 {
1148                         /* step right one page */
1149                         leftsib = opaque->btpo_next;
1150                         _bt_relbuf(rel, lbuf);
1151                         if (leftsib == P_NONE)
1152                         {
1153                                 elog(LOG, "no left sibling (concurrent deletion?) in \"%s\"",
1154                                          RelationGetRelationName(rel));
1155                                 return 0;
1156                         }
1157                         lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
1158                         page = BufferGetPage(lbuf);
1159                         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1160                 }
1161         }
1162         else
1163                 lbuf = InvalidBuffer;
1164
1165         /*
1166          * Next write-lock the target page itself.      It should be okay to take just
1167          * a write lock not a superexclusive lock, since no scans would stop on an
1168          * empty page.
1169          */
1170         buf = _bt_getbuf(rel, target, BT_WRITE);
1171         page = BufferGetPage(buf);
1172         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1173
1174         /*
1175          * Check page is still empty etc, else abandon deletion.  The empty check
1176          * is necessary since someone else might have inserted into it while we
1177          * didn't have it locked; the others are just for paranoia's sake.
1178          */
1179         if (P_RIGHTMOST(opaque) || P_ISROOT(opaque) || P_ISDELETED(opaque) ||
1180                 P_FIRSTDATAKEY(opaque) <= PageGetMaxOffsetNumber(page))
1181         {
1182                 _bt_relbuf(rel, buf);
1183                 if (BufferIsValid(lbuf))
1184                         _bt_relbuf(rel, lbuf);
1185                 return 0;
1186         }
1187         if (opaque->btpo_prev != leftsib)
1188                 elog(ERROR, "left link changed unexpectedly in block %u of index \"%s\"",
1189                          target, RelationGetRelationName(rel));
1190
1191         /*
1192          * And next write-lock the (current) right sibling.
1193          */
1194         rightsib = opaque->btpo_next;
1195         rbuf = _bt_getbuf(rel, rightsib, BT_WRITE);
1196         page = BufferGetPage(rbuf);
1197         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1198         if (opaque->btpo_prev != target)
1199                 elog(ERROR, "right sibling's left-link doesn't match: "
1200                          "block %u links to %u instead of expected %u in index \"%s\"",
1201                          rightsib, opaque->btpo_prev, target,
1202                          RelationGetRelationName(rel));
1203
1204         /*
1205          * Any insert which would have gone on the target block will now go to the
1206          * right sibling block.
1207          */
1208         PredicateLockPageCombine(rel, target, rightsib);
1209
1210         /*
1211          * Next find and write-lock the current parent of the target page. This is
1212          * essentially the same as the corresponding step of splitting.
1213          */
1214         ItemPointerSet(&(stack->bts_btentry.t_tid), target, P_HIKEY);
1215         pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1216         if (pbuf == InvalidBuffer)
1217                 elog(ERROR, "failed to re-find parent key in index \"%s\" for deletion target page %u",
1218                          RelationGetRelationName(rel), target);
1219         parent = stack->bts_blkno;
1220         poffset = stack->bts_offset;
1221
1222         /*
1223          * If the target is the rightmost child of its parent, then we can't
1224          * delete, unless it's also the only child --- in which case the parent
1225          * changes to half-dead status.  The "can't delete" case should have been
1226          * detected by _bt_parent_deletion_safe, so complain if we see it now.
1227          */
1228         page = BufferGetPage(pbuf);
1229         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1230         maxoff = PageGetMaxOffsetNumber(page);
1231         parent_half_dead = false;
1232         parent_one_child = false;
1233         if (poffset >= maxoff)
1234         {
1235                 if (poffset == P_FIRSTDATAKEY(opaque))
1236                         parent_half_dead = true;
1237                 else
1238                         elog(ERROR, "failed to delete rightmost child %u of block %u in index \"%s\"",
1239                                  target, parent, RelationGetRelationName(rel));
1240         }
1241         else
1242         {
1243                 /* Will there be exactly one child left in this parent? */
1244                 if (OffsetNumberNext(P_FIRSTDATAKEY(opaque)) == maxoff)
1245                         parent_one_child = true;
1246         }
1247
1248         /*
1249          * If we are deleting the next-to-last page on the target's level, then
1250          * the rightsib is a candidate to become the new fast root. (In theory, it
1251          * might be possible to push the fast root even further down, but the odds
1252          * of doing so are slim, and the locking considerations daunting.)
1253          *
1254          * We don't support handling this in the case where the parent is becoming
1255          * half-dead, even though it theoretically could occur.
1256          *
1257          * We can safely acquire a lock on the metapage here --- see comments for
1258          * _bt_newroot().
1259          */
1260         if (leftsib == P_NONE && !parent_half_dead)
1261         {
1262                 page = BufferGetPage(rbuf);
1263                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1264                 Assert(opaque->btpo.level == targetlevel);
1265                 if (P_RIGHTMOST(opaque))
1266                 {
1267                         /* rightsib will be the only one left on the level */
1268                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1269                         metapg = BufferGetPage(metabuf);
1270                         metad = BTPageGetMeta(metapg);
1271
1272                         /*
1273                          * The expected case here is btm_fastlevel == targetlevel+1; if
1274                          * the fastlevel is <= targetlevel, something is wrong, and we
1275                          * choose to overwrite it to fix it.
1276                          */
1277                         if (metad->btm_fastlevel > targetlevel + 1)
1278                         {
1279                                 /* no update wanted */
1280                                 _bt_relbuf(rel, metabuf);
1281                                 metabuf = InvalidBuffer;
1282                         }
1283                 }
1284         }
1285
1286         /*
1287          * Check that the parent-page index items we're about to delete/overwrite
1288          * contain what we expect.      This can fail if the index has become corrupt
1289          * for some reason.  We want to throw any error before entering the
1290          * critical section --- otherwise it'd be a PANIC.
1291          *
1292          * The test on the target item is just an Assert because _bt_getstackbuf
1293          * should have guaranteed it has the expected contents.  The test on the
1294          * next-child downlink is known to sometimes fail in the field, though.
1295          */
1296         page = BufferGetPage(pbuf);
1297         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1298
1299 #ifdef USE_ASSERT_CHECKING
1300         itemid = PageGetItemId(page, poffset);
1301         itup = (IndexTuple) PageGetItem(page, itemid);
1302         Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target);
1303 #endif
1304
1305         if (!parent_half_dead)
1306         {
1307                 OffsetNumber nextoffset;
1308
1309                 nextoffset = OffsetNumberNext(poffset);
1310                 itemid = PageGetItemId(page, nextoffset);
1311                 itup = (IndexTuple) PageGetItem(page, itemid);
1312                 if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib)
1313                         elog(ERROR, "right sibling %u of block %u is not next child %u of block %u in index \"%s\"",
1314                                  rightsib, target, ItemPointerGetBlockNumber(&(itup->t_tid)),
1315                                  parent, RelationGetRelationName(rel));
1316         }
1317
1318         /*
1319          * Here we begin doing the deletion.
1320          */
1321
1322         /* No ereport(ERROR) until changes are logged */
1323         START_CRIT_SECTION();
1324
1325         /*
1326          * Update parent.  The normal case is a tad tricky because we want to
1327          * delete the target's downlink and the *following* key.  Easiest way is
1328          * to copy the right sibling's downlink over the target downlink, and then
1329          * delete the following item.
1330          */
1331         if (parent_half_dead)
1332         {
1333                 PageIndexTupleDelete(page, poffset);
1334                 opaque->btpo_flags |= BTP_HALF_DEAD;
1335         }
1336         else
1337         {
1338                 OffsetNumber nextoffset;
1339
1340                 itemid = PageGetItemId(page, poffset);
1341                 itup = (IndexTuple) PageGetItem(page, itemid);
1342                 ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
1343
1344                 nextoffset = OffsetNumberNext(poffset);
1345                 PageIndexTupleDelete(page, nextoffset);
1346         }
1347
1348         /*
1349          * Update siblings' side-links.  Note the target page's side-links will
1350          * continue to point to the siblings.  Asserts here are just rechecking
1351          * things we already verified above.
1352          */
1353         if (BufferIsValid(lbuf))
1354         {
1355                 page = BufferGetPage(lbuf);
1356                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1357                 Assert(opaque->btpo_next == target);
1358                 opaque->btpo_next = rightsib;
1359         }
1360         page = BufferGetPage(rbuf);
1361         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1362         Assert(opaque->btpo_prev == target);
1363         opaque->btpo_prev = leftsib;
1364         rightsib_empty = (P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page));
1365
1366         /*
1367          * Mark the page itself deleted.  It can be recycled when all current
1368          * transactions are gone.
1369          */
1370         page = BufferGetPage(buf);
1371         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1372         opaque->btpo_flags &= ~BTP_HALF_DEAD;
1373         opaque->btpo_flags |= BTP_DELETED;
1374         opaque->btpo.xact = ReadNewTransactionId();
1375
1376         /* And update the metapage, if needed */
1377         if (BufferIsValid(metabuf))
1378         {
1379                 metad->btm_fastroot = rightsib;
1380                 metad->btm_fastlevel = targetlevel;
1381                 MarkBufferDirty(metabuf);
1382         }
1383
1384         /* Must mark buffers dirty before XLogInsert */
1385         MarkBufferDirty(pbuf);
1386         MarkBufferDirty(rbuf);
1387         MarkBufferDirty(buf);
1388         if (BufferIsValid(lbuf))
1389                 MarkBufferDirty(lbuf);
1390
1391         /* XLOG stuff */
1392         if (RelationNeedsWAL(rel))
1393         {
1394                 xl_btree_delete_page xlrec;
1395                 xl_btree_metadata xlmeta;
1396                 uint8           xlinfo;
1397                 XLogRecPtr      recptr;
1398                 XLogRecData rdata[5];
1399                 XLogRecData *nextrdata;
1400
1401                 xlrec.target.node = rel->rd_node;
1402                 ItemPointerSet(&(xlrec.target.tid), parent, poffset);
1403                 xlrec.deadblk = target;
1404                 xlrec.leftblk = leftsib;
1405                 xlrec.rightblk = rightsib;
1406                 xlrec.btpo_xact = opaque->btpo.xact;
1407
1408                 rdata[0].data = (char *) &xlrec;
1409                 rdata[0].len = SizeOfBtreeDeletePage;
1410                 rdata[0].buffer = InvalidBuffer;
1411                 rdata[0].next = nextrdata = &(rdata[1]);
1412
1413                 if (BufferIsValid(metabuf))
1414                 {
1415                         xlmeta.root = metad->btm_root;
1416                         xlmeta.level = metad->btm_level;
1417                         xlmeta.fastroot = metad->btm_fastroot;
1418                         xlmeta.fastlevel = metad->btm_fastlevel;
1419
1420                         nextrdata->data = (char *) &xlmeta;
1421                         nextrdata->len = sizeof(xl_btree_metadata);
1422                         nextrdata->buffer = InvalidBuffer;
1423                         nextrdata->next = nextrdata + 1;
1424                         nextrdata++;
1425                         xlinfo = XLOG_BTREE_DELETE_PAGE_META;
1426                 }
1427                 else if (parent_half_dead)
1428                         xlinfo = XLOG_BTREE_DELETE_PAGE_HALF;
1429                 else
1430                         xlinfo = XLOG_BTREE_DELETE_PAGE;
1431
1432                 nextrdata->data = NULL;
1433                 nextrdata->len = 0;
1434                 nextrdata->next = nextrdata + 1;
1435                 nextrdata->buffer = pbuf;
1436                 nextrdata->buffer_std = true;
1437                 nextrdata++;
1438
1439                 nextrdata->data = NULL;
1440                 nextrdata->len = 0;
1441                 nextrdata->buffer = rbuf;
1442                 nextrdata->buffer_std = true;
1443                 nextrdata->next = NULL;
1444
1445                 if (BufferIsValid(lbuf))
1446                 {
1447                         nextrdata->next = nextrdata + 1;
1448                         nextrdata++;
1449                         nextrdata->data = NULL;
1450                         nextrdata->len = 0;
1451                         nextrdata->buffer = lbuf;
1452                         nextrdata->buffer_std = true;
1453                         nextrdata->next = NULL;
1454                 }
1455
1456                 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
1457
1458                 if (BufferIsValid(metabuf))
1459                 {
1460                         PageSetLSN(metapg, recptr);
1461                         PageSetTLI(metapg, ThisTimeLineID);
1462                 }
1463                 page = BufferGetPage(pbuf);
1464                 PageSetLSN(page, recptr);
1465                 PageSetTLI(page, ThisTimeLineID);
1466                 page = BufferGetPage(rbuf);
1467                 PageSetLSN(page, recptr);
1468                 PageSetTLI(page, ThisTimeLineID);
1469                 page = BufferGetPage(buf);
1470                 PageSetLSN(page, recptr);
1471                 PageSetTLI(page, ThisTimeLineID);
1472                 if (BufferIsValid(lbuf))
1473                 {
1474                         page = BufferGetPage(lbuf);
1475                         PageSetLSN(page, recptr);
1476                         PageSetTLI(page, ThisTimeLineID);
1477                 }
1478         }
1479
1480         END_CRIT_SECTION();
1481
1482         /* release metapage; send out relcache inval if metapage changed */
1483         if (BufferIsValid(metabuf))
1484         {
1485                 CacheInvalidateRelcache(rel);
1486                 _bt_relbuf(rel, metabuf);
1487         }
1488         /* can always release leftsib immediately */
1489         if (BufferIsValid(lbuf))
1490                 _bt_relbuf(rel, lbuf);
1491
1492         /*
1493          * If parent became half dead, recurse to delete it. Otherwise, if right
1494          * sibling is empty and is now the last child of the parent, recurse to
1495          * try to delete it.  (These cases cannot apply at the same time, though
1496          * the second case might itself recurse to the first.)
1497          *
1498          * When recursing to parent, we hold the lock on the target page until
1499          * done.  This delays any insertions into the keyspace that was just
1500          * effectively reassigned to the parent's right sibling.  If we allowed
1501          * that, and there were enough such insertions before we finish deleting
1502          * the parent, page splits within that keyspace could lead to inserting
1503          * out-of-order keys into the grandparent level.  It is thought that that
1504          * wouldn't have any serious consequences, but it still seems like a
1505          * pretty bad idea.
1506          */
1507         if (parent_half_dead)
1508         {
1509                 /* recursive call will release pbuf */
1510                 _bt_relbuf(rel, rbuf);
1511                 result = _bt_pagedel(rel, pbuf, stack->bts_parent) + 1;
1512                 _bt_relbuf(rel, buf);
1513         }
1514         else if (parent_one_child && rightsib_empty)
1515         {
1516                 _bt_relbuf(rel, pbuf);
1517                 _bt_relbuf(rel, buf);
1518                 /* recursive call will release rbuf */
1519                 result = _bt_pagedel(rel, rbuf, stack) + 1;
1520         }
1521         else
1522         {
1523                 _bt_relbuf(rel, pbuf);
1524                 _bt_relbuf(rel, buf);
1525                 _bt_relbuf(rel, rbuf);
1526                 result = 1;
1527         }
1528
1529         return result;
1530 }