OSDN Git Service

Merge branch 'pgrex90-base' into pgrex90
[pg-rex/syncrep.git] / src / backend / access / nbtree / nbtxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtxlog.c
4  *        WAL replay logic for btrees.
5  *
6  *
7  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.69 2010/07/06 19:18:55 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/nbtree.h"
18 #include "access/transam.h"
19 #include "access/xact.h"
20 #include "storage/bufmgr.h"
21 #include "storage/procarray.h"
22 #include "storage/standby.h"
23 #include "miscadmin.h"
24
25 /*
26  * We must keep track of expected insertions due to page splits, and apply
27  * them manually if they are not seen in the WAL log during replay.  This
28  * makes it safe for page insertion to be a multiple-WAL-action process.
29  *
30  * Similarly, deletion of an only child page and deletion of its parent page
31  * form multiple WAL log entries, and we have to be prepared to follow through
32  * with the deletion if the log ends between.
33  *
34  * The data structure is a simple linked list --- this should be good enough,
35  * since we don't expect a page split or multi deletion to remain incomplete
36  * for long.  In any case we need to respect the order of operations.
37  */
38 typedef struct bt_incomplete_action
39 {
40         RelFileNode node;                       /* the index */
41         bool            is_split;               /* T = pending split, F = pending delete */
42         /* these fields are for a split: */
43         bool            is_root;                /* we split the root */
44         BlockNumber leftblk;            /* left half of split */
45         BlockNumber rightblk;           /* right half of split */
46         /* these fields are for a delete: */
47         BlockNumber delblk;                     /* parent block to be deleted */
48 } bt_incomplete_action;
49
50 static List *incomplete_actions;
51
52
53 static void
54 log_incomplete_split(RelFileNode node, BlockNumber leftblk,
55                                          BlockNumber rightblk, bool is_root)
56 {
57         bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
58
59         action->node = node;
60         action->is_split = true;
61         action->is_root = is_root;
62         action->leftblk = leftblk;
63         action->rightblk = rightblk;
64         incomplete_actions = lappend(incomplete_actions, action);
65 }
66
67 static void
68 forget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root)
69 {
70         ListCell   *l;
71
72         foreach(l, incomplete_actions)
73         {
74                 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
75
76                 if (RelFileNodeEquals(node, action->node) &&
77                         action->is_split &&
78                         downlink == action->rightblk)
79                 {
80                         if (is_root != action->is_root)
81                                 elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)",
82                                          action->is_root, is_root);
83                         incomplete_actions = list_delete_ptr(incomplete_actions, action);
84                         pfree(action);
85                         break;                          /* need not look further */
86                 }
87         }
88 }
89
90 static void
91 log_incomplete_deletion(RelFileNode node, BlockNumber delblk)
92 {
93         bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
94
95         action->node = node;
96         action->is_split = false;
97         action->delblk = delblk;
98         incomplete_actions = lappend(incomplete_actions, action);
99 }
100
101 static void
102 forget_matching_deletion(RelFileNode node, BlockNumber delblk)
103 {
104         ListCell   *l;
105
106         foreach(l, incomplete_actions)
107         {
108                 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
109
110                 if (RelFileNodeEquals(node, action->node) &&
111                         !action->is_split &&
112                         delblk == action->delblk)
113                 {
114                         incomplete_actions = list_delete_ptr(incomplete_actions, action);
115                         pfree(action);
116                         break;                          /* need not look further */
117                 }
118         }
119 }
120
121 /*
122  * _bt_restore_page -- re-enter all the index tuples on a page
123  *
124  * The page is freshly init'd, and *from (length len) is a copy of what
125  * had been its upper part (pd_upper to pd_special).  We assume that the
126  * tuples had been added to the page in item-number order, and therefore
127  * the one with highest item number appears first (lowest on the page).
128  *
129  * NOTE: the way this routine is coded, the rebuilt page will have the items
130  * in correct itemno sequence, but physically the opposite order from the
131  * original, because we insert them in the opposite of itemno order.  This
132  * does not matter in any current btree code, but it's something to keep an
133  * eye on.      Is it worth changing just on general principles?  See also the
134  * notes in btree_xlog_split().
135  */
136 static void
137 _bt_restore_page(Page page, char *from, int len)
138 {
139         IndexTupleData itupdata;
140         Size            itemsz;
141         char       *end = from + len;
142
143         for (; from < end;)
144         {
145                 /* Need to copy tuple header due to alignment considerations */
146                 memcpy(&itupdata, from, sizeof(IndexTupleData));
147                 itemsz = IndexTupleDSize(itupdata);
148                 itemsz = MAXALIGN(itemsz);
149                 if (PageAddItem(page, (Item) from, itemsz, FirstOffsetNumber,
150                                                 false, false) == InvalidOffsetNumber)
151                         elog(PANIC, "_bt_restore_page: cannot add item to page");
152                 from += itemsz;
153         }
154 }
155
156 static void
157 _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
158                                  BlockNumber root, uint32 level,
159                                  BlockNumber fastroot, uint32 fastlevel)
160 {
161         Buffer          metabuf;
162         Page            metapg;
163         BTMetaPageData *md;
164         BTPageOpaque pageop;
165
166         metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true);
167         Assert(BufferIsValid(metabuf));
168         metapg = BufferGetPage(metabuf);
169
170         _bt_pageinit(metapg, BufferGetPageSize(metabuf));
171
172         md = BTPageGetMeta(metapg);
173         md->btm_magic = BTREE_MAGIC;
174         md->btm_version = BTREE_VERSION;
175         md->btm_root = root;
176         md->btm_level = level;
177         md->btm_fastroot = fastroot;
178         md->btm_fastlevel = fastlevel;
179
180         pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
181         pageop->btpo_flags = BTP_META;
182
183         /*
184          * Set pd_lower just past the end of the metadata.      This is not essential
185          * but it makes the page look compressible to xlog.c.
186          */
187         ((PageHeader) metapg)->pd_lower =
188                 ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
189
190         PageSetLSN(metapg, lsn);
191         PageSetTLI(metapg, ThisTimeLineID);
192         MarkBufferDirty(metabuf);
193         UnlockReleaseBuffer(metabuf);
194 }
195
196 static void
197 btree_xlog_insert(bool isleaf, bool ismeta,
198                                   XLogRecPtr lsn, XLogRecord *record)
199 {
200         xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
201         Buffer          buffer;
202         Page            page;
203         char       *datapos;
204         int                     datalen;
205         xl_btree_metadata md;
206         BlockNumber downlink = 0;
207
208         datapos = (char *) xlrec + SizeOfBtreeInsert;
209         datalen = record->xl_len - SizeOfBtreeInsert;
210         if (!isleaf)
211         {
212                 memcpy(&downlink, datapos, sizeof(BlockNumber));
213                 datapos += sizeof(BlockNumber);
214                 datalen -= sizeof(BlockNumber);
215         }
216         if (ismeta)
217         {
218                 memcpy(&md, datapos, sizeof(xl_btree_metadata));
219                 datapos += sizeof(xl_btree_metadata);
220                 datalen -= sizeof(xl_btree_metadata);
221         }
222
223         if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
224                 return;                                 /* nothing to do */
225
226         if (!(record->xl_info & XLR_BKP_BLOCK_1))
227         {
228                 buffer = XLogReadBuffer(xlrec->target.node,
229                                                          ItemPointerGetBlockNumber(&(xlrec->target.tid)),
230                                                                 false);
231                 if (BufferIsValid(buffer))
232                 {
233                         page = (Page) BufferGetPage(buffer);
234
235                         if (XLByteLE(lsn, PageGetLSN(page)))
236                         {
237                                 UnlockReleaseBuffer(buffer);
238                         }
239                         else
240                         {
241                                 if (PageAddItem(page, (Item) datapos, datalen,
242                                                         ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
243                                                                 false, false) == InvalidOffsetNumber)
244                                         elog(PANIC, "btree_insert_redo: failed to add item");
245
246                                 PageSetLSN(page, lsn);
247                                 PageSetTLI(page, ThisTimeLineID);
248                                 MarkBufferDirty(buffer);
249                                 UnlockReleaseBuffer(buffer);
250                         }
251                 }
252         }
253
254         if (ismeta)
255                 _bt_restore_meta(xlrec->target.node, lsn,
256                                                  md.root, md.level,
257                                                  md.fastroot, md.fastlevel);
258
259         /* Forget any split this insertion completes */
260         if (!isleaf)
261                 forget_matching_split(xlrec->target.node, downlink, false);
262 }
263
264 static void
265 btree_xlog_split(bool onleft, bool isroot,
266                                  XLogRecPtr lsn, XLogRecord *record)
267 {
268         xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
269         Buffer          rbuf;
270         Page            rpage;
271         BTPageOpaque ropaque;
272         char       *datapos;
273         int                     datalen;
274         OffsetNumber newitemoff = 0;
275         Item            newitem = NULL;
276         Size            newitemsz = 0;
277         Item            left_hikey = NULL;
278         Size            left_hikeysz = 0;
279
280         datapos = (char *) xlrec + SizeOfBtreeSplit;
281         datalen = record->xl_len - SizeOfBtreeSplit;
282
283         /* Forget any split this insertion completes */
284         if (xlrec->level > 0)
285         {
286                 /* we assume SizeOfBtreeSplit is at least 16-bit aligned */
287                 BlockNumber downlink = BlockIdGetBlockNumber((BlockId) datapos);
288
289                 datapos += sizeof(BlockIdData);
290                 datalen -= sizeof(BlockIdData);
291
292                 forget_matching_split(xlrec->node, downlink, false);
293
294                 /* Extract left hikey and its size (still assuming 16-bit alignment) */
295                 if (!(record->xl_info & XLR_BKP_BLOCK_1))
296                 {
297                         /* We assume 16-bit alignment is enough for IndexTupleSize */
298                         left_hikey = (Item) datapos;
299                         left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
300
301                         datapos += left_hikeysz;
302                         datalen -= left_hikeysz;
303                 }
304         }
305
306         /* Extract newitem and newitemoff, if present */
307         if (onleft)
308         {
309                 /* Extract the offset (still assuming 16-bit alignment) */
310                 memcpy(&newitemoff, datapos, sizeof(OffsetNumber));
311                 datapos += sizeof(OffsetNumber);
312                 datalen -= sizeof(OffsetNumber);
313         }
314
315         if (onleft && !(record->xl_info & XLR_BKP_BLOCK_1))
316         {
317                 /*
318                  * We assume that 16-bit alignment is enough to apply IndexTupleSize
319                  * (since it's fetching from a uint16 field) and also enough for
320                  * PageAddItem to insert the tuple.
321                  */
322                 newitem = (Item) datapos;
323                 newitemsz = MAXALIGN(IndexTupleSize(newitem));
324                 datapos += newitemsz;
325                 datalen -= newitemsz;
326         }
327
328         /* Reconstruct right (new) sibling from scratch */
329         rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
330         Assert(BufferIsValid(rbuf));
331         rpage = (Page) BufferGetPage(rbuf);
332
333         _bt_pageinit(rpage, BufferGetPageSize(rbuf));
334         ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
335
336         ropaque->btpo_prev = xlrec->leftsib;
337         ropaque->btpo_next = xlrec->rnext;
338         ropaque->btpo.level = xlrec->level;
339         ropaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
340         ropaque->btpo_cycleid = 0;
341
342         _bt_restore_page(rpage, datapos, datalen);
343
344         /*
345          * On leaf level, the high key of the left page is equal to the first key
346          * on the right page.
347          */
348         if (xlrec->level == 0)
349         {
350                 ItemId          hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
351
352                 left_hikey = PageGetItem(rpage, hiItemId);
353                 left_hikeysz = ItemIdGetLength(hiItemId);
354         }
355
356         PageSetLSN(rpage, lsn);
357         PageSetTLI(rpage, ThisTimeLineID);
358         MarkBufferDirty(rbuf);
359
360         /* don't release the buffer yet; we touch right page's first item below */
361
362         /*
363          * Reconstruct left (original) sibling if needed.  Note that this code
364          * ensures that the items remaining on the left page are in the correct
365          * item number order, but it does not reproduce the physical order they
366          * would have had.      Is this worth changing?  See also _bt_restore_page().
367          */
368         if (!(record->xl_info & XLR_BKP_BLOCK_1))
369         {
370                 Buffer          lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
371
372                 if (BufferIsValid(lbuf))
373                 {
374                         Page            lpage = (Page) BufferGetPage(lbuf);
375                         BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
376
377                         if (!XLByteLE(lsn, PageGetLSN(lpage)))
378                         {
379                                 OffsetNumber off;
380                                 OffsetNumber maxoff = PageGetMaxOffsetNumber(lpage);
381                                 OffsetNumber deletable[MaxOffsetNumber];
382                                 int                     ndeletable = 0;
383
384                                 /*
385                                  * Remove the items from the left page that were copied to the
386                                  * right page.  Also remove the old high key, if any. (We must
387                                  * remove everything before trying to insert any items, else
388                                  * we risk not having enough space.)
389                                  */
390                                 if (!P_RIGHTMOST(lopaque))
391                                 {
392                                         deletable[ndeletable++] = P_HIKEY;
393
394                                         /*
395                                          * newitemoff is given to us relative to the original
396                                          * page's item numbering, so adjust it for this deletion.
397                                          */
398                                         newitemoff--;
399                                 }
400                                 for (off = xlrec->firstright; off <= maxoff; off++)
401                                         deletable[ndeletable++] = off;
402                                 if (ndeletable > 0)
403                                         PageIndexMultiDelete(lpage, deletable, ndeletable);
404
405                                 /*
406                                  * Add the new item if it was inserted on left page.
407                                  */
408                                 if (onleft)
409                                 {
410                                         if (PageAddItem(lpage, newitem, newitemsz, newitemoff,
411                                                                         false, false) == InvalidOffsetNumber)
412                                                 elog(PANIC, "failed to add new item to left page after split");
413                                 }
414
415                                 /* Set high key */
416                                 if (PageAddItem(lpage, left_hikey, left_hikeysz,
417                                                                 P_HIKEY, false, false) == InvalidOffsetNumber)
418                                         elog(PANIC, "failed to add high key to left page after split");
419
420                                 /* Fix opaque fields */
421                                 lopaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
422                                 lopaque->btpo_next = xlrec->rightsib;
423                                 lopaque->btpo_cycleid = 0;
424
425                                 PageSetLSN(lpage, lsn);
426                                 PageSetTLI(lpage, ThisTimeLineID);
427                                 MarkBufferDirty(lbuf);
428                         }
429
430                         UnlockReleaseBuffer(lbuf);
431                 }
432         }
433
434         /* We no longer need the right buffer */
435         UnlockReleaseBuffer(rbuf);
436
437         /* Fix left-link of the page to the right of the new right sibling */
438         if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
439         {
440                 Buffer          buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
441
442                 if (BufferIsValid(buffer))
443                 {
444                         Page            page = (Page) BufferGetPage(buffer);
445
446                         if (!XLByteLE(lsn, PageGetLSN(page)))
447                         {
448                                 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
449
450                                 pageop->btpo_prev = xlrec->rightsib;
451
452                                 PageSetLSN(page, lsn);
453                                 PageSetTLI(page, ThisTimeLineID);
454                                 MarkBufferDirty(buffer);
455                         }
456                         UnlockReleaseBuffer(buffer);
457                 }
458         }
459
460         /* The job ain't done till the parent link is inserted... */
461         log_incomplete_split(xlrec->node,
462                                                  xlrec->leftsib, xlrec->rightsib, isroot);
463 }
464
465 static void
466 btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
467 {
468         xl_btree_vacuum *xlrec;
469         Buffer          buffer;
470         Page            page;
471         BTPageOpaque opaque;
472
473         xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
474
475         /*
476          * If queries might be active then we need to ensure every block is
477          * unpinned between the lastBlockVacuumed and the current block, if there
478          * are any. This ensures that every block in the index is touched during
479          * VACUUM as required to ensure scans work correctly.
480          */
481         if (standbyState == STANDBY_SNAPSHOT_READY &&
482                 (xlrec->lastBlockVacuumed + 1) != xlrec->block)
483         {
484                 BlockNumber blkno = xlrec->lastBlockVacuumed + 1;
485
486                 for (; blkno < xlrec->block; blkno++)
487                 {
488                         /*
489                          * XXX we don't actually need to read the block, we just need to
490                          * confirm it is unpinned. If we had a special call into the
491                          * buffer manager we could optimise this so that if the block is
492                          * not in shared_buffers we confirm it as unpinned.
493                          *
494                          * Another simple optimization would be to check if there's any
495                          * backends running; if not, we could just skip this.
496                          */
497                         buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, blkno, RBM_NORMAL);
498                         if (BufferIsValid(buffer))
499                         {
500                                 LockBufferForCleanup(buffer);
501                                 UnlockReleaseBuffer(buffer);
502                         }
503                 }
504         }
505
506         /*
507          * If the block was restored from a full page image, nothing more to do.
508          * The RestoreBkpBlocks() call already pinned and took cleanup lock on it.
509          * XXX: Perhaps we should call RestoreBkpBlocks() *after* the loop above,
510          * to make the disk access more sequential.
511          */
512         if (record->xl_info & XLR_BKP_BLOCK_1)
513                 return;
514
515         /*
516          * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
517          * page. See nbtree/README for details.
518          */
519         buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
520         if (!BufferIsValid(buffer))
521                 return;
522         LockBufferForCleanup(buffer);
523         page = (Page) BufferGetPage(buffer);
524
525         if (XLByteLE(lsn, PageGetLSN(page)))
526         {
527                 UnlockReleaseBuffer(buffer);
528                 return;
529         }
530
531         if (record->xl_len > SizeOfBtreeVacuum)
532         {
533                 OffsetNumber *unused;
534                 OffsetNumber *unend;
535
536                 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum);
537                 unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
538
539                 if ((unend - unused) > 0)
540                         PageIndexMultiDelete(page, unused, unend - unused);
541         }
542
543         /*
544          * Mark the page as not containing any LP_DEAD items --- see comments in
545          * _bt_delitems().
546          */
547         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
548         opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
549
550         PageSetLSN(page, lsn);
551         PageSetTLI(page, ThisTimeLineID);
552         MarkBufferDirty(buffer);
553         UnlockReleaseBuffer(buffer);
554 }
555
556 /*
557  * Get the latestRemovedXid from the heap pages pointed at by the index
558  * tuples being deleted. This puts the work for calculating latestRemovedXid
559  * into the recovery path rather than the primary path.
560  *
561  * It's possible that this generates a fair amount of I/O, since an index
562  * block may have hundreds of tuples being deleted. Repeat accesses to the
563  * same heap blocks are common, though are not yet optimised.
564  *
565  * XXX optimise later with something like XLogPrefetchBuffer()
566  */
567 static TransactionId
568 btree_xlog_delete_get_latestRemovedXid(XLogRecord *record)
569 {
570         xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
571         OffsetNumber *unused;
572         Buffer          ibuffer,
573                                 hbuffer;
574         Page            ipage,
575                                 hpage;
576         ItemId          iitemid,
577                                 hitemid;
578         IndexTuple      itup;
579         HeapTupleHeader htuphdr;
580         BlockNumber hblkno;
581         OffsetNumber hoffnum;
582         TransactionId latestRemovedXid = InvalidTransactionId;
583         int                     i;
584
585         /*
586          * If there's nothing running on the standby we don't need to derive a
587          * full latestRemovedXid value, so use a fast path out of here. That
588          * returns InvalidTransactionId, and so will conflict with users, but
589          * since we just worked out that's zero people, its OK.
590          */
591         if (CountDBBackends(InvalidOid) == 0)
592                 return latestRemovedXid;
593
594         /*
595          * Get index page
596          */
597         ibuffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
598         if (!BufferIsValid(ibuffer))
599                 return InvalidTransactionId;
600         ipage = (Page) BufferGetPage(ibuffer);
601
602         /*
603          * Loop through the deleted index items to obtain the TransactionId from
604          * the heap items they point to.
605          */
606         unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
607
608         for (i = 0; i < xlrec->nitems; i++)
609         {
610                 /*
611                  * Identify the index tuple about to be deleted
612                  */
613                 iitemid = PageGetItemId(ipage, unused[i]);
614                 itup = (IndexTuple) PageGetItem(ipage, iitemid);
615
616                 /*
617                  * Locate the heap page that the index tuple points at
618                  */
619                 hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
620                 hbuffer = XLogReadBuffer(xlrec->hnode, hblkno, false);
621                 if (!BufferIsValid(hbuffer))
622                 {
623                         UnlockReleaseBuffer(ibuffer);
624                         return InvalidTransactionId;
625                 }
626                 hpage = (Page) BufferGetPage(hbuffer);
627
628                 /*
629                  * Look up the heap tuple header that the index tuple points at by
630                  * using the heap node supplied with the xlrec. We can't use
631                  * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
632                  * Note that we are not looking at tuple data here, just headers.
633                  */
634                 hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
635                 hitemid = PageGetItemId(hpage, hoffnum);
636
637                 /*
638                  * Follow any redirections until we find something useful.
639                  */
640                 while (ItemIdIsRedirected(hitemid))
641                 {
642                         hoffnum = ItemIdGetRedirect(hitemid);
643                         hitemid = PageGetItemId(hpage, hoffnum);
644                         CHECK_FOR_INTERRUPTS();
645                 }
646
647                 /*
648                  * If the heap item has storage, then read the header and use that to
649                  * set latestRemovedXid.
650                  *
651                  * Some LP_DEAD items may not be accessible, so we ignore them.
652                  */
653                 if (ItemIdHasStorage(hitemid))
654                 {
655                         htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
656
657                         HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
658                 }
659                 else if (ItemIdIsDead(hitemid))
660                 {
661                         /*
662                          * Conjecture: if hitemid is dead then it had xids before the xids
663                          * marked on LP_NORMAL items. So we just ignore this item and move
664                          * onto the next, for the purposes of calculating
665                          * latestRemovedxids.
666                          */
667                 }
668                 else
669                         Assert(!ItemIdIsUsed(hitemid));
670
671                 UnlockReleaseBuffer(hbuffer);
672         }
673
674         UnlockReleaseBuffer(ibuffer);
675
676         /*
677          * Note that if all heap tuples were LP_DEAD then we will be returning
678          * InvalidTransactionId here. That can happen if we are re-replaying this
679          * record type, though that will be before the consistency point and will
680          * not cause problems. It should happen very rarely after the consistency
681          * point, though note that we can't tell the difference between this and
682          * the fast path exit above. May need to change that in future.
683          */
684         return latestRemovedXid;
685 }
686
687 static void
688 btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
689 {
690         xl_btree_delete *xlrec;
691         Buffer          buffer;
692         Page            page;
693         BTPageOpaque opaque;
694
695         if (record->xl_info & XLR_BKP_BLOCK_1)
696                 return;
697
698         xlrec = (xl_btree_delete *) XLogRecGetData(record);
699
700         /*
701          * We don't need to take a cleanup lock to apply these changes. See
702          * nbtree/README for details.
703          */
704         buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
705         if (!BufferIsValid(buffer))
706                 return;
707         page = (Page) BufferGetPage(buffer);
708
709         if (XLByteLE(lsn, PageGetLSN(page)))
710         {
711                 UnlockReleaseBuffer(buffer);
712                 return;
713         }
714
715         if (record->xl_len > SizeOfBtreeDelete)
716         {
717                 OffsetNumber *unused;
718
719                 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
720
721                 PageIndexMultiDelete(page, unused, xlrec->nitems);
722         }
723
724         /*
725          * Mark the page as not containing any LP_DEAD items --- see comments in
726          * _bt_delitems().
727          */
728         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
729         opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
730
731         PageSetLSN(page, lsn);
732         PageSetTLI(page, ThisTimeLineID);
733         MarkBufferDirty(buffer);
734         UnlockReleaseBuffer(buffer);
735 }
736
737 static void
738 btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
739 {
740         xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);
741         BlockNumber parent;
742         BlockNumber target;
743         BlockNumber leftsib;
744         BlockNumber rightsib;
745         Buffer          buffer;
746         Page            page;
747         BTPageOpaque pageop;
748
749         parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
750         target = xlrec->deadblk;
751         leftsib = xlrec->leftblk;
752         rightsib = xlrec->rightblk;
753
754         /* parent page */
755         if (!(record->xl_info & XLR_BKP_BLOCK_1))
756         {
757                 buffer = XLogReadBuffer(xlrec->target.node, parent, false);
758                 if (BufferIsValid(buffer))
759                 {
760                         page = (Page) BufferGetPage(buffer);
761                         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
762                         if (XLByteLE(lsn, PageGetLSN(page)))
763                         {
764                                 UnlockReleaseBuffer(buffer);
765                         }
766                         else
767                         {
768                                 OffsetNumber poffset;
769
770                                 poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
771                                 if (poffset >= PageGetMaxOffsetNumber(page))
772                                 {
773                                         Assert(info == XLOG_BTREE_DELETE_PAGE_HALF);
774                                         Assert(poffset == P_FIRSTDATAKEY(pageop));
775                                         PageIndexTupleDelete(page, poffset);
776                                         pageop->btpo_flags |= BTP_HALF_DEAD;
777                                 }
778                                 else
779                                 {
780                                         ItemId          itemid;
781                                         IndexTuple      itup;
782                                         OffsetNumber nextoffset;
783
784                                         Assert(info != XLOG_BTREE_DELETE_PAGE_HALF);
785                                         itemid = PageGetItemId(page, poffset);
786                                         itup = (IndexTuple) PageGetItem(page, itemid);
787                                         ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
788                                         nextoffset = OffsetNumberNext(poffset);
789                                         PageIndexTupleDelete(page, nextoffset);
790                                 }
791
792                                 PageSetLSN(page, lsn);
793                                 PageSetTLI(page, ThisTimeLineID);
794                                 MarkBufferDirty(buffer);
795                                 UnlockReleaseBuffer(buffer);
796                         }
797                 }
798         }
799
800         /* Fix left-link of right sibling */
801         if (!(record->xl_info & XLR_BKP_BLOCK_2))
802         {
803                 buffer = XLogReadBuffer(xlrec->target.node, rightsib, false);
804                 if (BufferIsValid(buffer))
805                 {
806                         page = (Page) BufferGetPage(buffer);
807                         if (XLByteLE(lsn, PageGetLSN(page)))
808                         {
809                                 UnlockReleaseBuffer(buffer);
810                         }
811                         else
812                         {
813                                 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
814                                 pageop->btpo_prev = leftsib;
815
816                                 PageSetLSN(page, lsn);
817                                 PageSetTLI(page, ThisTimeLineID);
818                                 MarkBufferDirty(buffer);
819                                 UnlockReleaseBuffer(buffer);
820                         }
821                 }
822         }
823
824         /* Fix right-link of left sibling, if any */
825         if (!(record->xl_info & XLR_BKP_BLOCK_3))
826         {
827                 if (leftsib != P_NONE)
828                 {
829                         buffer = XLogReadBuffer(xlrec->target.node, leftsib, false);
830                         if (BufferIsValid(buffer))
831                         {
832                                 page = (Page) BufferGetPage(buffer);
833                                 if (XLByteLE(lsn, PageGetLSN(page)))
834                                 {
835                                         UnlockReleaseBuffer(buffer);
836                                 }
837                                 else
838                                 {
839                                         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
840                                         pageop->btpo_next = rightsib;
841
842                                         PageSetLSN(page, lsn);
843                                         PageSetTLI(page, ThisTimeLineID);
844                                         MarkBufferDirty(buffer);
845                                         UnlockReleaseBuffer(buffer);
846                                 }
847                         }
848                 }
849         }
850
851         /* Rewrite target page as empty deleted page */
852         buffer = XLogReadBuffer(xlrec->target.node, target, true);
853         Assert(BufferIsValid(buffer));
854         page = (Page) BufferGetPage(buffer);
855
856         _bt_pageinit(page, BufferGetPageSize(buffer));
857         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
858
859         pageop->btpo_prev = leftsib;
860         pageop->btpo_next = rightsib;
861         pageop->btpo.xact = xlrec->btpo_xact;
862         pageop->btpo_flags = BTP_DELETED;
863         pageop->btpo_cycleid = 0;
864
865         PageSetLSN(page, lsn);
866         PageSetTLI(page, ThisTimeLineID);
867         MarkBufferDirty(buffer);
868         UnlockReleaseBuffer(buffer);
869
870         /* Update metapage if needed */
871         if (info == XLOG_BTREE_DELETE_PAGE_META)
872         {
873                 xl_btree_metadata md;
874
875                 memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
876                            sizeof(xl_btree_metadata));
877                 _bt_restore_meta(xlrec->target.node, lsn,
878                                                  md.root, md.level,
879                                                  md.fastroot, md.fastlevel);
880         }
881
882         /* Forget any completed deletion */
883         forget_matching_deletion(xlrec->target.node, target);
884
885         /* If parent became half-dead, remember it for deletion */
886         if (info == XLOG_BTREE_DELETE_PAGE_HALF)
887                 log_incomplete_deletion(xlrec->target.node, parent);
888 }
889
890 static void
891 btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
892 {
893         xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
894         Buffer          buffer;
895         Page            page;
896         BTPageOpaque pageop;
897         BlockNumber downlink = 0;
898
899         buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
900         Assert(BufferIsValid(buffer));
901         page = (Page) BufferGetPage(buffer);
902
903         _bt_pageinit(page, BufferGetPageSize(buffer));
904         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
905
906         pageop->btpo_flags = BTP_ROOT;
907         pageop->btpo_prev = pageop->btpo_next = P_NONE;
908         pageop->btpo.level = xlrec->level;
909         if (xlrec->level == 0)
910                 pageop->btpo_flags |= BTP_LEAF;
911         pageop->btpo_cycleid = 0;
912
913         if (record->xl_len > SizeOfBtreeNewroot)
914         {
915                 IndexTuple      itup;
916
917                 _bt_restore_page(page,
918                                                  (char *) xlrec + SizeOfBtreeNewroot,
919                                                  record->xl_len - SizeOfBtreeNewroot);
920                 /* extract downlink to the right-hand split page */
921                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
922                 downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
923                 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
924         }
925
926         PageSetLSN(page, lsn);
927         PageSetTLI(page, ThisTimeLineID);
928         MarkBufferDirty(buffer);
929         UnlockReleaseBuffer(buffer);
930
931         _bt_restore_meta(xlrec->node, lsn,
932                                          xlrec->rootblk, xlrec->level,
933                                          xlrec->rootblk, xlrec->level);
934
935         /* Check to see if this satisfies any incomplete insertions */
936         if (record->xl_len > SizeOfBtreeNewroot)
937                 forget_matching_split(xlrec->node, downlink, true);
938 }
939
940
941 void
942 btree_redo(XLogRecPtr lsn, XLogRecord *record)
943 {
944         uint8           info = record->xl_info & ~XLR_INFO_MASK;
945
946         if (InHotStandby)
947         {
948                 switch (info)
949                 {
950                         case XLOG_BTREE_DELETE:
951
952                                 /*
953                                  * Btree delete records can conflict with standby queries. You
954                                  * might think that vacuum records would conflict as well, but
955                                  * we've handled that already. XLOG_HEAP2_CLEANUP_INFO records
956                                  * provide the highest xid cleaned by the vacuum of the heap
957                                  * and so we can resolve any conflicts just once when that
958                                  * arrives. After that any we know that no conflicts exist
959                                  * from individual btree vacuum records on that index.
960                                  */
961                                 {
962                                         TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(record);
963                                         xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
964
965                                         ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
966                                 }
967                                 break;
968
969                         case XLOG_BTREE_REUSE_PAGE:
970
971                                 /*
972                                  * Btree reuse page records exist to provide a conflict point
973                                  * when we reuse pages in the index via the FSM. That's all it
974                                  * does though.
975                                  */
976                                 {
977                                         xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
978
979                                         ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
980                                 }
981                                 return;
982
983                         default:
984                                 break;
985                 }
986         }
987
988         /*
989          * Vacuum needs to pin and take cleanup lock on every leaf page, a regular
990          * exclusive lock is enough for all other purposes.
991          */
992         RestoreBkpBlocks(lsn, record, (info == XLOG_BTREE_VACUUM));
993
994         switch (info)
995         {
996                 case XLOG_BTREE_INSERT_LEAF:
997                         btree_xlog_insert(true, false, lsn, record);
998                         break;
999                 case XLOG_BTREE_INSERT_UPPER:
1000                         btree_xlog_insert(false, false, lsn, record);
1001                         break;
1002                 case XLOG_BTREE_INSERT_META:
1003                         btree_xlog_insert(false, true, lsn, record);
1004                         break;
1005                 case XLOG_BTREE_SPLIT_L:
1006                         btree_xlog_split(true, false, lsn, record);
1007                         break;
1008                 case XLOG_BTREE_SPLIT_R:
1009                         btree_xlog_split(false, false, lsn, record);
1010                         break;
1011                 case XLOG_BTREE_SPLIT_L_ROOT:
1012                         btree_xlog_split(true, true, lsn, record);
1013                         break;
1014                 case XLOG_BTREE_SPLIT_R_ROOT:
1015                         btree_xlog_split(false, true, lsn, record);
1016                         break;
1017                 case XLOG_BTREE_VACUUM:
1018                         btree_xlog_vacuum(lsn, record);
1019                         break;
1020                 case XLOG_BTREE_DELETE:
1021                         btree_xlog_delete(lsn, record);
1022                         break;
1023                 case XLOG_BTREE_DELETE_PAGE:
1024                 case XLOG_BTREE_DELETE_PAGE_META:
1025                 case XLOG_BTREE_DELETE_PAGE_HALF:
1026                         btree_xlog_delete_page(info, lsn, record);
1027                         break;
1028                 case XLOG_BTREE_NEWROOT:
1029                         btree_xlog_newroot(lsn, record);
1030                         break;
1031                 case XLOG_BTREE_REUSE_PAGE:
1032                         /* Handled above before restoring bkp block */
1033                         break;
1034                 default:
1035                         elog(PANIC, "btree_redo: unknown op code %u", info);
1036         }
1037 }
1038
1039 static void
1040 out_target(StringInfo buf, xl_btreetid *target)
1041 {
1042         appendStringInfo(buf, "rel %u/%u/%u; tid %u/%u",
1043                          target->node.spcNode, target->node.dbNode, target->node.relNode,
1044                                          ItemPointerGetBlockNumber(&(target->tid)),
1045                                          ItemPointerGetOffsetNumber(&(target->tid)));
1046 }
1047
1048 void
1049 btree_desc(StringInfo buf, uint8 xl_info, char *rec)
1050 {
1051         uint8           info = xl_info & ~XLR_INFO_MASK;
1052
1053         switch (info)
1054         {
1055                 case XLOG_BTREE_INSERT_LEAF:
1056                         {
1057                                 xl_btree_insert *xlrec = (xl_btree_insert *) rec;
1058
1059                                 appendStringInfo(buf, "insert: ");
1060                                 out_target(buf, &(xlrec->target));
1061                                 break;
1062                         }
1063                 case XLOG_BTREE_INSERT_UPPER:
1064                         {
1065                                 xl_btree_insert *xlrec = (xl_btree_insert *) rec;
1066
1067                                 appendStringInfo(buf, "insert_upper: ");
1068                                 out_target(buf, &(xlrec->target));
1069                                 break;
1070                         }
1071                 case XLOG_BTREE_INSERT_META:
1072                         {
1073                                 xl_btree_insert *xlrec = (xl_btree_insert *) rec;
1074
1075                                 appendStringInfo(buf, "insert_meta: ");
1076                                 out_target(buf, &(xlrec->target));
1077                                 break;
1078                         }
1079                 case XLOG_BTREE_SPLIT_L:
1080                         {
1081                                 xl_btree_split *xlrec = (xl_btree_split *) rec;
1082
1083                                 appendStringInfo(buf, "split_l: rel %u/%u/%u ",
1084                                                                  xlrec->node.spcNode, xlrec->node.dbNode,
1085                                                                  xlrec->node.relNode);
1086                                 appendStringInfo(buf, "left %u, right %u, next %u, level %u, firstright %d",
1087                                                            xlrec->leftsib, xlrec->rightsib, xlrec->rnext,
1088                                                                  xlrec->level, xlrec->firstright);
1089                                 break;
1090                         }
1091                 case XLOG_BTREE_SPLIT_R:
1092                         {
1093                                 xl_btree_split *xlrec = (xl_btree_split *) rec;
1094
1095                                 appendStringInfo(buf, "split_r: rel %u/%u/%u ",
1096                                                                  xlrec->node.spcNode, xlrec->node.dbNode,
1097                                                                  xlrec->node.relNode);
1098                                 appendStringInfo(buf, "left %u, right %u, next %u, level %u, firstright %d",
1099                                                            xlrec->leftsib, xlrec->rightsib, xlrec->rnext,
1100                                                                  xlrec->level, xlrec->firstright);
1101                                 break;
1102                         }
1103                 case XLOG_BTREE_SPLIT_L_ROOT:
1104                         {
1105                                 xl_btree_split *xlrec = (xl_btree_split *) rec;
1106
1107                                 appendStringInfo(buf, "split_l_root: rel %u/%u/%u ",
1108                                                                  xlrec->node.spcNode, xlrec->node.dbNode,
1109                                                                  xlrec->node.relNode);
1110                                 appendStringInfo(buf, "left %u, right %u, next %u, level %u, firstright %d",
1111                                                            xlrec->leftsib, xlrec->rightsib, xlrec->rnext,
1112                                                                  xlrec->level, xlrec->firstright);
1113                                 break;
1114                         }
1115                 case XLOG_BTREE_SPLIT_R_ROOT:
1116                         {
1117                                 xl_btree_split *xlrec = (xl_btree_split *) rec;
1118
1119                                 appendStringInfo(buf, "split_r_root: rel %u/%u/%u ",
1120                                                                  xlrec->node.spcNode, xlrec->node.dbNode,
1121                                                                  xlrec->node.relNode);
1122                                 appendStringInfo(buf, "left %u, right %u, next %u, level %u, firstright %d",
1123                                                            xlrec->leftsib, xlrec->rightsib, xlrec->rnext,
1124                                                                  xlrec->level, xlrec->firstright);
1125                                 break;
1126                         }
1127                 case XLOG_BTREE_VACUUM:
1128                         {
1129                                 xl_btree_vacuum *xlrec = (xl_btree_vacuum *) rec;
1130
1131                                 appendStringInfo(buf, "vacuum: rel %u/%u/%u; blk %u, lastBlockVacuumed %u",
1132                                                                  xlrec->node.spcNode, xlrec->node.dbNode,
1133                                                                  xlrec->node.relNode, xlrec->block,
1134                                                                  xlrec->lastBlockVacuumed);
1135                                 break;
1136                         }
1137                 case XLOG_BTREE_DELETE:
1138                         {
1139                                 xl_btree_delete *xlrec = (xl_btree_delete *) rec;
1140
1141                                 appendStringInfo(buf, "delete: index %u/%u/%u; iblk %u, heap %u/%u/%u;",
1142                                 xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
1143                                                                  xlrec->block,
1144                                                                  xlrec->hnode.spcNode, xlrec->hnode.dbNode, xlrec->hnode.relNode);
1145                                 break;
1146                         }
1147                 case XLOG_BTREE_DELETE_PAGE:
1148                 case XLOG_BTREE_DELETE_PAGE_META:
1149                 case XLOG_BTREE_DELETE_PAGE_HALF:
1150                         {
1151                                 xl_btree_delete_page *xlrec = (xl_btree_delete_page *) rec;
1152
1153                                 appendStringInfo(buf, "delete_page: ");
1154                                 out_target(buf, &(xlrec->target));
1155                                 appendStringInfo(buf, "; dead %u; left %u; right %u",
1156                                                         xlrec->deadblk, xlrec->leftblk, xlrec->rightblk);
1157                                 break;
1158                         }
1159                 case XLOG_BTREE_NEWROOT:
1160                         {
1161                                 xl_btree_newroot *xlrec = (xl_btree_newroot *) rec;
1162
1163                                 appendStringInfo(buf, "newroot: rel %u/%u/%u; root %u lev %u",
1164                                                                  xlrec->node.spcNode, xlrec->node.dbNode,
1165                                                                  xlrec->node.relNode,
1166                                                                  xlrec->rootblk, xlrec->level);
1167                                 break;
1168                         }
1169                 case XLOG_BTREE_REUSE_PAGE:
1170                         {
1171                                 xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) rec;
1172
1173                                 appendStringInfo(buf, "reuse_page: rel %u/%u/%u; latestRemovedXid %u",
1174                                                                  xlrec->node.spcNode, xlrec->node.dbNode,
1175                                                            xlrec->node.relNode, xlrec->latestRemovedXid);
1176                                 break;
1177                         }
1178                 default:
1179                         appendStringInfo(buf, "UNKNOWN");
1180                         break;
1181         }
1182 }
1183
1184 void
1185 btree_xlog_startup(void)
1186 {
1187         incomplete_actions = NIL;
1188 }
1189
1190 void
1191 btree_xlog_cleanup(void)
1192 {
1193         ListCell   *l;
1194
1195         foreach(l, incomplete_actions)
1196         {
1197                 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
1198
1199                 if (action->is_split)
1200                 {
1201                         /* finish an incomplete split */
1202                         Buffer          lbuf,
1203                                                 rbuf;
1204                         Page            lpage,
1205                                                 rpage;
1206                         BTPageOpaque lpageop,
1207                                                 rpageop;
1208                         bool            is_only;
1209                         Relation        reln;
1210
1211                         lbuf = XLogReadBuffer(action->node, action->leftblk, false);
1212                         /* failure is impossible because we wrote this page earlier */
1213                         if (!BufferIsValid(lbuf))
1214                                 elog(PANIC, "btree_xlog_cleanup: left block unfound");
1215                         lpage = (Page) BufferGetPage(lbuf);
1216                         lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
1217                         rbuf = XLogReadBuffer(action->node, action->rightblk, false);
1218                         /* failure is impossible because we wrote this page earlier */
1219                         if (!BufferIsValid(rbuf))
1220                                 elog(PANIC, "btree_xlog_cleanup: right block unfound");
1221                         rpage = (Page) BufferGetPage(rbuf);
1222                         rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
1223
1224                         /* if the pages are all of their level, it's a only-page split */
1225                         is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);
1226
1227                         reln = CreateFakeRelcacheEntry(action->node);
1228                         _bt_insert_parent(reln, lbuf, rbuf, NULL,
1229                                                           action->is_root, is_only);
1230                         FreeFakeRelcacheEntry(reln);
1231                 }
1232                 else
1233                 {
1234                         /* finish an incomplete deletion (of a half-dead page) */
1235                         Buffer          buf;
1236
1237                         buf = XLogReadBuffer(action->node, action->delblk, false);
1238                         if (BufferIsValid(buf))
1239                         {
1240                                 Relation        reln;
1241
1242                                 reln = CreateFakeRelcacheEntry(action->node);
1243                                 if (_bt_pagedel(reln, buf, NULL) == 0)
1244                                         elog(PANIC, "btree_xlog_cleanup: _bt_pagedel failed");
1245                                 FreeFakeRelcacheEntry(reln);
1246                         }
1247                 }
1248         }
1249         incomplete_actions = NIL;
1250 }
1251
1252 bool
1253 btree_safe_restartpoint(void)
1254 {
1255         if (incomplete_actions)
1256                 return false;
1257         return true;
1258 }