OSDN Git Service

Fix "failed to re-find parent key" btree VACUUM failure by revising page
[pg-rex/syncrep.git] / src / backend / access / nbtree / nbtxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtxlog.c
4  *        WAL replay logic for btrees.
5  *
6  *
7  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.39 2006/11/01 19:43:17 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/nbtree.h"
18 #include "access/transam.h"
19
20 /*
21  * We must keep track of expected insertions due to page splits, and apply
22  * them manually if they are not seen in the WAL log during replay.  This
23  * makes it safe for page insertion to be a multiple-WAL-action process.
24  *
25  * Similarly, deletion of an only child page and deletion of its parent page
26  * form multiple WAL log entries, and we have to be prepared to follow through
27  * with the deletion if the log ends between.
28  *
29  * The data structure is a simple linked list --- this should be good enough,
30  * since we don't expect a page split or multi deletion to remain incomplete
31  * for long.  In any case we need to respect the order of operations.
32  */
33 typedef struct bt_incomplete_action
34 {
35         RelFileNode node;                       /* the index */
36         bool            is_split;               /* T = pending split, F = pending delete */
37         /* these fields are for a split: */
38         bool            is_root;                /* we split the root */
39         BlockNumber leftblk;            /* left half of split */
40         BlockNumber rightblk;           /* right half of split */
41         /* these fields are for a delete: */
42         BlockNumber delblk;                     /* parent block to be deleted */
43 } bt_incomplete_action;
44
45 static List *incomplete_actions;
46
47
48 static void
49 log_incomplete_split(RelFileNode node, BlockNumber leftblk,
50                                          BlockNumber rightblk, bool is_root)
51 {
52         bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
53
54         action->node = node;
55         action->is_split = true;
56         action->is_root = is_root;
57         action->leftblk = leftblk;
58         action->rightblk = rightblk;
59         incomplete_actions = lappend(incomplete_actions, action);
60 }
61
62 static void
63 forget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root)
64 {
65         ListCell   *l;
66
67         foreach(l, incomplete_actions)
68         {
69                 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
70
71                 if (RelFileNodeEquals(node, action->node) &&
72                         action->is_split &&
73                         downlink == action->rightblk)
74                 {
75                         if (is_root != action->is_root)
76                                 elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)",
77                                          action->is_root, is_root);
78                         incomplete_actions = list_delete_ptr(incomplete_actions, action);
79                         pfree(action);
80                         break;                          /* need not look further */
81                 }
82         }
83 }
84
85 static void
86 log_incomplete_deletion(RelFileNode node, BlockNumber delblk)
87 {
88         bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
89
90         action->node = node;
91         action->is_split = false;
92         action->delblk = delblk;
93         incomplete_actions = lappend(incomplete_actions, action);
94 }
95
96 static void
97 forget_matching_deletion(RelFileNode node, BlockNumber delblk)
98 {
99         ListCell   *l;
100
101         foreach(l, incomplete_actions)
102         {
103                 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
104
105                 if (RelFileNodeEquals(node, action->node) &&
106                         !action->is_split &&
107                         delblk == action->delblk)
108                 {
109                         incomplete_actions = list_delete_ptr(incomplete_actions, action);
110                         pfree(action);
111                         break;                          /* need not look further */
112                 }
113         }
114 }
115
116 /*
117  * _bt_restore_page -- re-enter all the index tuples on a page
118  *
119  * The page is freshly init'd, and *from (length len) is a copy of what
120  * had been its upper part (pd_upper to pd_special).  We assume that the
121  * tuples had been added to the page in item-number order, and therefore
122  * the one with highest item number appears first (lowest on the page).
123  *
124  * NOTE: the way this routine is coded, the rebuilt page will have the items
125  * in correct itemno sequence, but physically the opposite order from the
126  * original, because we insert them in the opposite of itemno order.  This
127  * does not matter in any current btree code, but it's something to keep an
128  * eye on.      Is it worth changing just on general principles?
129  */
130 static void
131 _bt_restore_page(Page page, char *from, int len)
132 {
133         IndexTupleData itupdata;
134         Size            itemsz;
135         char       *end = from + len;
136
137         for (; from < end;)
138         {
139                 /* Need to copy tuple header due to alignment considerations */
140                 memcpy(&itupdata, from, sizeof(IndexTupleData));
141                 itemsz = IndexTupleDSize(itupdata);
142                 itemsz = MAXALIGN(itemsz);
143                 if (PageAddItem(page, (Item) from, itemsz,
144                                                 FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
145                         elog(PANIC, "_bt_restore_page: can't add item to page");
146                 from += itemsz;
147         }
148 }
149
150 static void
151 _bt_restore_meta(Relation reln, XLogRecPtr lsn,
152                                  BlockNumber root, uint32 level,
153                                  BlockNumber fastroot, uint32 fastlevel)
154 {
155         Buffer          metabuf;
156         Page            metapg;
157         BTMetaPageData *md;
158         BTPageOpaque pageop;
159
160         metabuf = XLogReadBuffer(reln, BTREE_METAPAGE, true);
161         Assert(BufferIsValid(metabuf));
162         metapg = BufferGetPage(metabuf);
163
164         _bt_pageinit(metapg, BufferGetPageSize(metabuf));
165
166         md = BTPageGetMeta(metapg);
167         md->btm_magic = BTREE_MAGIC;
168         md->btm_version = BTREE_VERSION;
169         md->btm_root = root;
170         md->btm_level = level;
171         md->btm_fastroot = fastroot;
172         md->btm_fastlevel = fastlevel;
173
174         pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
175         pageop->btpo_flags = BTP_META;
176
177         /*
178          * Set pd_lower just past the end of the metadata.      This is not essential
179          * but it makes the page look compressible to xlog.c.
180          */
181         ((PageHeader) metapg)->pd_lower =
182                 ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
183
184         PageSetLSN(metapg, lsn);
185         PageSetTLI(metapg, ThisTimeLineID);
186         MarkBufferDirty(metabuf);
187         UnlockReleaseBuffer(metabuf);
188 }
189
190 static void
191 btree_xlog_insert(bool isleaf, bool ismeta,
192                                   XLogRecPtr lsn, XLogRecord *record)
193 {
194         xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
195         Relation        reln;
196         Buffer          buffer;
197         Page            page;
198         char       *datapos;
199         int                     datalen;
200         xl_btree_metadata md;
201         BlockNumber downlink = 0;
202
203         datapos = (char *) xlrec + SizeOfBtreeInsert;
204         datalen = record->xl_len - SizeOfBtreeInsert;
205         if (!isleaf)
206         {
207                 memcpy(&downlink, datapos, sizeof(BlockNumber));
208                 datapos += sizeof(BlockNumber);
209                 datalen -= sizeof(BlockNumber);
210         }
211         if (ismeta)
212         {
213                 memcpy(&md, datapos, sizeof(xl_btree_metadata));
214                 datapos += sizeof(xl_btree_metadata);
215                 datalen -= sizeof(xl_btree_metadata);
216         }
217
218         if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
219                 return;                                 /* nothing to do */
220
221         reln = XLogOpenRelation(xlrec->target.node);
222
223         if (!(record->xl_info & XLR_BKP_BLOCK_1))
224         {
225                 buffer = XLogReadBuffer(reln,
226                                                          ItemPointerGetBlockNumber(&(xlrec->target.tid)),
227                                                                 false);
228                 if (BufferIsValid(buffer))
229                 {
230                         page = (Page) BufferGetPage(buffer);
231
232                         if (XLByteLE(lsn, PageGetLSN(page)))
233                         {
234                                 UnlockReleaseBuffer(buffer);
235                         }
236                         else
237                         {
238                                 if (PageAddItem(page, (Item) datapos, datalen,
239                                                         ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
240                                                                 LP_USED) == InvalidOffsetNumber)
241                                         elog(PANIC, "btree_insert_redo: failed to add item");
242
243                                 PageSetLSN(page, lsn);
244                                 PageSetTLI(page, ThisTimeLineID);
245                                 MarkBufferDirty(buffer);
246                                 UnlockReleaseBuffer(buffer);
247                         }
248                 }
249         }
250
251         if (ismeta)
252                 _bt_restore_meta(reln, lsn,
253                                                  md.root, md.level,
254                                                  md.fastroot, md.fastlevel);
255
256         /* Forget any split this insertion completes */
257         if (!isleaf)
258                 forget_matching_split(xlrec->target.node, downlink, false);
259 }
260
261 static void
262 btree_xlog_split(bool onleft, bool isroot,
263                                  XLogRecPtr lsn, XLogRecord *record)
264 {
265         xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
266         Relation        reln;
267         BlockNumber targetblk;
268         OffsetNumber targetoff;
269         BlockNumber leftsib;
270         BlockNumber rightsib;
271         BlockNumber downlink = 0;
272         Buffer          buffer;
273         Page            page;
274         BTPageOpaque pageop;
275
276         reln = XLogOpenRelation(xlrec->target.node);
277         targetblk = ItemPointerGetBlockNumber(&(xlrec->target.tid));
278         targetoff = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
279         leftsib = (onleft) ? targetblk : xlrec->otherblk;
280         rightsib = (onleft) ? xlrec->otherblk : targetblk;
281
282         /* Left (original) sibling */
283         buffer = XLogReadBuffer(reln, leftsib, true);
284         Assert(BufferIsValid(buffer));
285         page = (Page) BufferGetPage(buffer);
286
287         _bt_pageinit(page, BufferGetPageSize(buffer));
288         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
289
290         pageop->btpo_prev = xlrec->leftblk;
291         pageop->btpo_next = rightsib;
292         pageop->btpo.level = xlrec->level;
293         pageop->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
294         pageop->btpo_cycleid = 0;
295
296         _bt_restore_page(page,
297                                          (char *) xlrec + SizeOfBtreeSplit,
298                                          xlrec->leftlen);
299
300         if (onleft && xlrec->level > 0)
301         {
302                 IndexTuple      itup;
303
304                 /* extract downlink in the target tuple */
305                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff));
306                 downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
307                 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
308         }
309
310         PageSetLSN(page, lsn);
311         PageSetTLI(page, ThisTimeLineID);
312         MarkBufferDirty(buffer);
313         UnlockReleaseBuffer(buffer);
314
315         /* Right (new) sibling */
316         buffer = XLogReadBuffer(reln, rightsib, true);
317         Assert(BufferIsValid(buffer));
318         page = (Page) BufferGetPage(buffer);
319
320         _bt_pageinit(page, BufferGetPageSize(buffer));
321         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
322
323         pageop->btpo_prev = leftsib;
324         pageop->btpo_next = xlrec->rightblk;
325         pageop->btpo.level = xlrec->level;
326         pageop->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
327         pageop->btpo_cycleid = 0;
328
329         _bt_restore_page(page,
330                                          (char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen,
331                                          record->xl_len - SizeOfBtreeSplit - xlrec->leftlen);
332
333         if (!onleft && xlrec->level > 0)
334         {
335                 IndexTuple      itup;
336
337                 /* extract downlink in the target tuple */
338                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff));
339                 downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
340                 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
341         }
342
343         PageSetLSN(page, lsn);
344         PageSetTLI(page, ThisTimeLineID);
345         MarkBufferDirty(buffer);
346         UnlockReleaseBuffer(buffer);
347
348         /* Fix left-link of right (next) page */
349         if (!(record->xl_info & XLR_BKP_BLOCK_1))
350         {
351                 if (xlrec->rightblk != P_NONE)
352                 {
353                         buffer = XLogReadBuffer(reln, xlrec->rightblk, false);
354                         if (BufferIsValid(buffer))
355                         {
356                                 page = (Page) BufferGetPage(buffer);
357
358                                 if (XLByteLE(lsn, PageGetLSN(page)))
359                                 {
360                                         UnlockReleaseBuffer(buffer);
361                                 }
362                                 else
363                                 {
364                                         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
365                                         pageop->btpo_prev = rightsib;
366
367                                         PageSetLSN(page, lsn);
368                                         PageSetTLI(page, ThisTimeLineID);
369                                         MarkBufferDirty(buffer);
370                                         UnlockReleaseBuffer(buffer);
371                                 }
372                         }
373                 }
374         }
375
376         /* Forget any split this insertion completes */
377         if (xlrec->level > 0)
378                 forget_matching_split(xlrec->target.node, downlink, false);
379
380         /* The job ain't done till the parent link is inserted... */
381         log_incomplete_split(xlrec->target.node,
382                                                  leftsib, rightsib, isroot);
383 }
384
385 static void
386 btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
387 {
388         xl_btree_delete *xlrec;
389         Relation        reln;
390         Buffer          buffer;
391         Page            page;
392         BTPageOpaque opaque;
393
394         if (record->xl_info & XLR_BKP_BLOCK_1)
395                 return;
396
397         xlrec = (xl_btree_delete *) XLogRecGetData(record);
398         reln = XLogOpenRelation(xlrec->node);
399         buffer = XLogReadBuffer(reln, xlrec->block, false);
400         if (!BufferIsValid(buffer))
401                 return;
402         page = (Page) BufferGetPage(buffer);
403
404         if (XLByteLE(lsn, PageGetLSN(page)))
405         {
406                 UnlockReleaseBuffer(buffer);
407                 return;
408         }
409
410         if (record->xl_len > SizeOfBtreeDelete)
411         {
412                 OffsetNumber *unused;
413                 OffsetNumber *unend;
414
415                 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
416                 unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
417
418                 PageIndexMultiDelete(page, unused, unend - unused);
419         }
420
421         /*
422          * Mark the page as not containing any LP_DELETE items --- see comments in
423          * _bt_delitems().
424          */
425         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
426         opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
427
428         PageSetLSN(page, lsn);
429         PageSetTLI(page, ThisTimeLineID);
430         MarkBufferDirty(buffer);
431         UnlockReleaseBuffer(buffer);
432 }
433
434 static void
435 btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
436 {
437         xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);
438         Relation        reln;
439         BlockNumber parent;
440         BlockNumber target;
441         BlockNumber leftsib;
442         BlockNumber rightsib;
443         Buffer          buffer;
444         Page            page;
445         BTPageOpaque pageop;
446
447         reln = XLogOpenRelation(xlrec->target.node);
448         parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
449         target = xlrec->deadblk;
450         leftsib = xlrec->leftblk;
451         rightsib = xlrec->rightblk;
452
453         /* parent page */
454         if (!(record->xl_info & XLR_BKP_BLOCK_1))
455         {
456                 buffer = XLogReadBuffer(reln, parent, false);
457                 if (BufferIsValid(buffer))
458                 {
459                         page = (Page) BufferGetPage(buffer);
460                         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
461                         if (XLByteLE(lsn, PageGetLSN(page)))
462                         {
463                                 UnlockReleaseBuffer(buffer);
464                         }
465                         else
466                         {
467                                 OffsetNumber poffset;
468
469                                 poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
470                                 if (poffset >= PageGetMaxOffsetNumber(page))
471                                 {
472                                         Assert(info == XLOG_BTREE_DELETE_PAGE_HALF);
473                                         Assert(poffset == P_FIRSTDATAKEY(pageop));
474                                         PageIndexTupleDelete(page, poffset);
475                                         pageop->btpo_flags |= BTP_HALF_DEAD;
476                                 }
477                                 else
478                                 {
479                                         ItemId          itemid;
480                                         IndexTuple      itup;
481                                         OffsetNumber nextoffset;
482
483                                         Assert(info != XLOG_BTREE_DELETE_PAGE_HALF);
484                                         itemid = PageGetItemId(page, poffset);
485                                         itup = (IndexTuple) PageGetItem(page, itemid);
486                                         ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
487                                         nextoffset = OffsetNumberNext(poffset);
488                                         PageIndexTupleDelete(page, nextoffset);
489                                 }
490
491                                 PageSetLSN(page, lsn);
492                                 PageSetTLI(page, ThisTimeLineID);
493                                 MarkBufferDirty(buffer);
494                                 UnlockReleaseBuffer(buffer);
495                         }
496                 }
497         }
498
499         /* Fix left-link of right sibling */
500         if (!(record->xl_info & XLR_BKP_BLOCK_2))
501         {
502                 buffer = XLogReadBuffer(reln, rightsib, false);
503                 if (BufferIsValid(buffer))
504                 {
505                         page = (Page) BufferGetPage(buffer);
506                         if (XLByteLE(lsn, PageGetLSN(page)))
507                         {
508                                 UnlockReleaseBuffer(buffer);
509                         }
510                         else
511                         {
512                                 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
513                                 pageop->btpo_prev = leftsib;
514
515                                 PageSetLSN(page, lsn);
516                                 PageSetTLI(page, ThisTimeLineID);
517                                 MarkBufferDirty(buffer);
518                                 UnlockReleaseBuffer(buffer);
519                         }
520                 }
521         }
522
523         /* Fix right-link of left sibling, if any */
524         if (!(record->xl_info & XLR_BKP_BLOCK_3))
525         {
526                 if (leftsib != P_NONE)
527                 {
528                         buffer = XLogReadBuffer(reln, leftsib, false);
529                         if (BufferIsValid(buffer))
530                         {
531                                 page = (Page) BufferGetPage(buffer);
532                                 if (XLByteLE(lsn, PageGetLSN(page)))
533                                 {
534                                         UnlockReleaseBuffer(buffer);
535                                 }
536                                 else
537                                 {
538                                         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
539                                         pageop->btpo_next = rightsib;
540
541                                         PageSetLSN(page, lsn);
542                                         PageSetTLI(page, ThisTimeLineID);
543                                         MarkBufferDirty(buffer);
544                                         UnlockReleaseBuffer(buffer);
545                                 }
546                         }
547                 }
548         }
549
550         /* Rewrite target page as empty deleted page */
551         buffer = XLogReadBuffer(reln, target, true);
552         Assert(BufferIsValid(buffer));
553         page = (Page) BufferGetPage(buffer);
554
555         _bt_pageinit(page, BufferGetPageSize(buffer));
556         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
557
558         pageop->btpo_prev = leftsib;
559         pageop->btpo_next = rightsib;
560         pageop->btpo.xact = FrozenTransactionId;
561         pageop->btpo_flags = BTP_DELETED;
562         pageop->btpo_cycleid = 0;
563
564         PageSetLSN(page, lsn);
565         PageSetTLI(page, ThisTimeLineID);
566         MarkBufferDirty(buffer);
567         UnlockReleaseBuffer(buffer);
568
569         /* Update metapage if needed */
570         if (info == XLOG_BTREE_DELETE_PAGE_META)
571         {
572                 xl_btree_metadata md;
573
574                 memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
575                            sizeof(xl_btree_metadata));
576                 _bt_restore_meta(reln, lsn,
577                                                  md.root, md.level,
578                                                  md.fastroot, md.fastlevel);
579         }
580
581         /* Forget any completed deletion */
582         forget_matching_deletion(xlrec->target.node, target);
583
584         /* If parent became half-dead, remember it for deletion */
585         if (info == XLOG_BTREE_DELETE_PAGE_HALF)
586                 log_incomplete_deletion(xlrec->target.node, parent);
587 }
588
589 static void
590 btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
591 {
592         xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
593         Relation        reln;
594         Buffer          buffer;
595         Page            page;
596         BTPageOpaque pageop;
597         BlockNumber downlink = 0;
598
599         reln = XLogOpenRelation(xlrec->node);
600         buffer = XLogReadBuffer(reln, xlrec->rootblk, true);
601         Assert(BufferIsValid(buffer));
602         page = (Page) BufferGetPage(buffer);
603
604         _bt_pageinit(page, BufferGetPageSize(buffer));
605         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
606
607         pageop->btpo_flags = BTP_ROOT;
608         pageop->btpo_prev = pageop->btpo_next = P_NONE;
609         pageop->btpo.level = xlrec->level;
610         if (xlrec->level == 0)
611                 pageop->btpo_flags |= BTP_LEAF;
612         pageop->btpo_cycleid = 0;
613
614         if (record->xl_len > SizeOfBtreeNewroot)
615         {
616                 IndexTuple      itup;
617
618                 _bt_restore_page(page,
619                                                  (char *) xlrec + SizeOfBtreeNewroot,
620                                                  record->xl_len - SizeOfBtreeNewroot);
621                 /* extract downlink to the right-hand split page */
622                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
623                 downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
624                 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
625         }
626
627         PageSetLSN(page, lsn);
628         PageSetTLI(page, ThisTimeLineID);
629         MarkBufferDirty(buffer);
630         UnlockReleaseBuffer(buffer);
631
632         _bt_restore_meta(reln, lsn,
633                                          xlrec->rootblk, xlrec->level,
634                                          xlrec->rootblk, xlrec->level);
635
636         /* Check to see if this satisfies any incomplete insertions */
637         if (record->xl_len > SizeOfBtreeNewroot)
638                 forget_matching_split(xlrec->node, downlink, true);
639 }
640
641
642 void
643 btree_redo(XLogRecPtr lsn, XLogRecord *record)
644 {
645         uint8           info = record->xl_info & ~XLR_INFO_MASK;
646
647         switch (info)
648         {
649                 case XLOG_BTREE_INSERT_LEAF:
650                         btree_xlog_insert(true, false, lsn, record);
651                         break;
652                 case XLOG_BTREE_INSERT_UPPER:
653                         btree_xlog_insert(false, false, lsn, record);
654                         break;
655                 case XLOG_BTREE_INSERT_META:
656                         btree_xlog_insert(false, true, lsn, record);
657                         break;
658                 case XLOG_BTREE_SPLIT_L:
659                         btree_xlog_split(true, false, lsn, record);
660                         break;
661                 case XLOG_BTREE_SPLIT_R:
662                         btree_xlog_split(false, false, lsn, record);
663                         break;
664                 case XLOG_BTREE_SPLIT_L_ROOT:
665                         btree_xlog_split(true, true, lsn, record);
666                         break;
667                 case XLOG_BTREE_SPLIT_R_ROOT:
668                         btree_xlog_split(false, true, lsn, record);
669                         break;
670                 case XLOG_BTREE_DELETE:
671                         btree_xlog_delete(lsn, record);
672                         break;
673                 case XLOG_BTREE_DELETE_PAGE:
674                 case XLOG_BTREE_DELETE_PAGE_META:
675                 case XLOG_BTREE_DELETE_PAGE_HALF:
676                         btree_xlog_delete_page(info, lsn, record);
677                         break;
678                 case XLOG_BTREE_NEWROOT:
679                         btree_xlog_newroot(lsn, record);
680                         break;
681                 default:
682                         elog(PANIC, "btree_redo: unknown op code %u", info);
683         }
684 }
685
686 static void
687 out_target(StringInfo buf, xl_btreetid *target)
688 {
689         appendStringInfo(buf, "rel %u/%u/%u; tid %u/%u",
690                          target->node.spcNode, target->node.dbNode, target->node.relNode,
691                                          ItemPointerGetBlockNumber(&(target->tid)),
692                                          ItemPointerGetOffsetNumber(&(target->tid)));
693 }
694
695 void
696 btree_desc(StringInfo buf, uint8 xl_info, char *rec)
697 {
698         uint8           info = xl_info & ~XLR_INFO_MASK;
699
700         switch (info)
701         {
702                 case XLOG_BTREE_INSERT_LEAF:
703                         {
704                                 xl_btree_insert *xlrec = (xl_btree_insert *) rec;
705
706                                 appendStringInfo(buf, "insert: ");
707                                 out_target(buf, &(xlrec->target));
708                                 break;
709                         }
710                 case XLOG_BTREE_INSERT_UPPER:
711                         {
712                                 xl_btree_insert *xlrec = (xl_btree_insert *) rec;
713
714                                 appendStringInfo(buf, "insert_upper: ");
715                                 out_target(buf, &(xlrec->target));
716                                 break;
717                         }
718                 case XLOG_BTREE_INSERT_META:
719                         {
720                                 xl_btree_insert *xlrec = (xl_btree_insert *) rec;
721
722                                 appendStringInfo(buf, "insert_meta: ");
723                                 out_target(buf, &(xlrec->target));
724                                 break;
725                         }
726                 case XLOG_BTREE_SPLIT_L:
727                         {
728                                 xl_btree_split *xlrec = (xl_btree_split *) rec;
729
730                                 appendStringInfo(buf, "split_l: ");
731                                 out_target(buf, &(xlrec->target));
732                                 appendStringInfo(buf, "; oth %u; rgh %u",
733                                                                  xlrec->otherblk, xlrec->rightblk);
734                                 break;
735                         }
736                 case XLOG_BTREE_SPLIT_R:
737                         {
738                                 xl_btree_split *xlrec = (xl_btree_split *) rec;
739
740                                 appendStringInfo(buf, "split_r: ");
741                                 out_target(buf, &(xlrec->target));
742                                 appendStringInfo(buf, "; oth %u; rgh %u",
743                                                                  xlrec->otherblk, xlrec->rightblk);
744                                 break;
745                         }
746                 case XLOG_BTREE_SPLIT_L_ROOT:
747                         {
748                                 xl_btree_split *xlrec = (xl_btree_split *) rec;
749
750                                 appendStringInfo(buf, "split_l_root: ");
751                                 out_target(buf, &(xlrec->target));
752                                 appendStringInfo(buf, "; oth %u; rgh %u",
753                                                                  xlrec->otherblk, xlrec->rightblk);
754                                 break;
755                         }
756                 case XLOG_BTREE_SPLIT_R_ROOT:
757                         {
758                                 xl_btree_split *xlrec = (xl_btree_split *) rec;
759
760                                 appendStringInfo(buf, "split_r_root: ");
761                                 out_target(buf, &(xlrec->target));
762                                 appendStringInfo(buf, "; oth %u; rgh %u",
763                                                                  xlrec->otherblk, xlrec->rightblk);
764                                 break;
765                         }
766                 case XLOG_BTREE_DELETE:
767                         {
768                                 xl_btree_delete *xlrec = (xl_btree_delete *) rec;
769
770                                 appendStringInfo(buf, "delete: rel %u/%u/%u; blk %u",
771                                                                  xlrec->node.spcNode, xlrec->node.dbNode,
772                                                                  xlrec->node.relNode, xlrec->block);
773                                 break;
774                         }
775                 case XLOG_BTREE_DELETE_PAGE:
776                 case XLOG_BTREE_DELETE_PAGE_META:
777                 case XLOG_BTREE_DELETE_PAGE_HALF:
778                         {
779                                 xl_btree_delete_page *xlrec = (xl_btree_delete_page *) rec;
780
781                                 appendStringInfo(buf, "delete_page: ");
782                                 out_target(buf, &(xlrec->target));
783                                 appendStringInfo(buf, "; dead %u; left %u; right %u",
784                                                         xlrec->deadblk, xlrec->leftblk, xlrec->rightblk);
785                                 break;
786                         }
787                 case XLOG_BTREE_NEWROOT:
788                         {
789                                 xl_btree_newroot *xlrec = (xl_btree_newroot *) rec;
790
791                                 appendStringInfo(buf, "newroot: rel %u/%u/%u; root %u lev %u",
792                                                                  xlrec->node.spcNode, xlrec->node.dbNode,
793                                                                  xlrec->node.relNode,
794                                                                  xlrec->rootblk, xlrec->level);
795                                 break;
796                         }
797                 default:
798                         appendStringInfo(buf, "UNKNOWN");
799                         break;
800         }
801 }
802
803 void
804 btree_xlog_startup(void)
805 {
806         incomplete_actions = NIL;
807 }
808
809 void
810 btree_xlog_cleanup(void)
811 {
812         ListCell   *l;
813
814         foreach(l, incomplete_actions)
815         {
816                 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
817                 Relation        reln;
818
819                 reln = XLogOpenRelation(action->node);
820                 if (action->is_split)
821                 {
822                         /* finish an incomplete split */
823                         Buffer          lbuf,
824                                                 rbuf;
825                         Page            lpage,
826                                                 rpage;
827                         BTPageOpaque lpageop,
828                                                 rpageop;
829                         bool            is_only;
830
831                         lbuf = XLogReadBuffer(reln, action->leftblk, false);
832                         /* failure is impossible because we wrote this page earlier */
833                         if (!BufferIsValid(lbuf))
834                                 elog(PANIC, "btree_xlog_cleanup: left block unfound");
835                         lpage = (Page) BufferGetPage(lbuf);
836                         lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
837                         rbuf = XLogReadBuffer(reln, action->rightblk, false);
838                         /* failure is impossible because we wrote this page earlier */
839                         if (!BufferIsValid(rbuf))
840                                 elog(PANIC, "btree_xlog_cleanup: right block unfound");
841                         rpage = (Page) BufferGetPage(rbuf);
842                         rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
843
844                         /* if the pages are all of their level, it's a only-page split */
845                         is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);
846
847                         _bt_insert_parent(reln, lbuf, rbuf, NULL,
848                                                           action->is_root, is_only);
849                 }
850                 else
851                 {
852                         /* finish an incomplete deletion (of a half-dead page) */
853                         Buffer          buf;
854
855                         buf = XLogReadBuffer(reln, action->delblk, false);
856                         if (BufferIsValid(buf))
857                                 if (_bt_pagedel(reln, buf, NULL, true) == 0)
858                                         elog(PANIC, "btree_xlog_cleanup: _bt_pagdel failed");
859                 }
860         }
861         incomplete_actions = NIL;
862 }
863
864 bool
865 btree_safe_restartpoint(void)
866 {
867         if (incomplete_actions)
868                 return false;
869         return true;
870 }