OSDN Git Service

Respect Hot Standby controls while recycling btree index pages.
[pg-rex/syncrep.git] / src / backend / access / nbtree / nbtpage.c
index 85f352d..65354e6 100644 (file)
@@ -4,12 +4,12 @@
  *       BTree-specific page management code for the Postgres btree access
  *       method.
  *
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/nbtree/nbtpage.c,v 1.114 2009/12/19 01:32:33 sriggs Exp $
+ *       src/backend/access/nbtree/nbtpage.c
  *
  *     NOTES
  *        Postgres btree pages look like ordinary relation pages.      The opaque
@@ -29,6 +29,7 @@
 #include "storage/freespace.h"
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
+#include "storage/predicate.h"
 #include "utils/inval.h"
 #include "utils/snapmgr.h"
 
@@ -224,7 +225,7 @@ _bt_getroot(Relation rel, int access)
                MarkBufferDirty(metabuf);
 
                /* XLOG stuff */
-               if (!rel->rd_istemp)
+               if (RelationNeedsWAL(rel))
                {
                        xl_btree_newroot xlrec;
                        XLogRecPtr      recptr;
@@ -447,6 +448,47 @@ _bt_checkpage(Relation rel, Buffer buf)
 }
 
 /*
+ * Log the reuse of a page from the FSM.
+ */
+static void
+_bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedXid)
+{
+       if (!RelationNeedsWAL(rel))
+               return;
+
+       /* No ereport(ERROR) until changes are logged */
+       START_CRIT_SECTION();
+
+       /*
+        * We don't do MarkBufferDirty here because we're about initialise the
+        * page, and nobody else can see it yet.
+        */
+
+       /* XLOG stuff */
+       {
+               XLogRecData rdata[1];
+               xl_btree_reuse_page xlrec_reuse;
+
+               xlrec_reuse.node = rel->rd_node;
+               xlrec_reuse.block = blkno;
+               xlrec_reuse.latestRemovedXid = latestRemovedXid;
+               rdata[0].data = (char *) &xlrec_reuse;
+               rdata[0].len = SizeOfBtreeReusePage;
+               rdata[0].buffer = InvalidBuffer;
+               rdata[0].next = NULL;
+
+               XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE, rdata);
+
+               /*
+                * We don't do PageSetLSN or PageSetTLI here because we're about
+                * initialise the page, so no need.
+                */
+       }
+
+       END_CRIT_SECTION();
+}
+
+/*
  *     _bt_getbuf() -- Get a buffer by block number for read or write.
  *
  *             blkno == P_NEW means to get an unallocated index page.  The page
@@ -511,6 +553,18 @@ _bt_getbuf(Relation rel, BlockNumber blkno, int access)
                                page = BufferGetPage(buf);
                                if (_bt_page_recyclable(page))
                                {
+                                       /*
+                                        * If we are generating WAL for Hot Standby then create a
+                                        * WAL record that will allow us to conflict with queries
+                                        * running on standby.
+                                        */
+                                       if (XLogStandbyInfoActive())
+                                       {
+                                               BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+                                               _bt_log_reuse_page(rel, blkno, opaque->btpo.xact);
+                                       }
+
                                        /* Okay to use page.  Re-initialize and return it */
                                        _bt_pageinit(page, BufferGetPageSize(buf));
                                        return buf;
@@ -623,6 +677,7 @@ bool
 _bt_page_recyclable(Page page)
 {
        BTPageOpaque opaque;
+       TransactionId cutoff;
 
        /*
         * It's possible to find an all-zeroes page in an index --- for example, a
@@ -635,11 +690,18 @@ _bt_page_recyclable(Page page)
 
        /*
         * Otherwise, recycle if deleted and too old to have any processes
-        * interested in it.
+        * interested in it.  If we are generating records for Hot Standby
+        * defer page recycling until RecentGlobalXmin to respect user
+        * controls specified by vacuum_defer_cleanup_age or hot_standby_feedback.
         */
+       if (XLogStandbyInfoActive())
+               cutoff = RecentGlobalXmin;
+       else
+               cutoff = RecentXmin;
+
        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
        if (P_ISDELETED(opaque) &&
-               TransactionIdPrecedesOrEquals(opaque->btpo.xact, RecentXmin))
+               TransactionIdPrecedesOrEquals(opaque->btpo.xact, cutoff))
                return true;
        return false;
 }
@@ -665,15 +727,12 @@ _bt_page_recyclable(Page page)
  * ensure correct locking.
  */
 void
-_bt_delitems(Relation rel, Buffer buf,
-                        OffsetNumber *itemnos, int nitems, bool isVacuum,
-                        BlockNumber lastBlockVacuumed)
+_bt_delitems_vacuum(Relation rel, Buffer buf,
+                       OffsetNumber *itemnos, int nitems, BlockNumber lastBlockVacuumed)
 {
        Page            page = BufferGetPage(buf);
        BTPageOpaque opaque;
 
-       Assert(isVacuum || lastBlockVacuumed == 0);
-
        /* No ereport(ERROR) until changes are logged */
        START_CRIT_SECTION();
 
@@ -700,38 +759,19 @@ _bt_delitems(Relation rel, Buffer buf,
        MarkBufferDirty(buf);
 
        /* XLOG stuff */
-       if (!rel->rd_istemp)
+       if (RelationNeedsWAL(rel))
        {
                XLogRecPtr      recptr;
                XLogRecData rdata[2];
 
-               if (isVacuum)
-               {
-                       xl_btree_vacuum xlrec_vacuum;
-                       xlrec_vacuum.node = rel->rd_node;
-                       xlrec_vacuum.block = BufferGetBlockNumber(buf);
-
-                       xlrec_vacuum.lastBlockVacuumed = lastBlockVacuumed;
-                       rdata[0].data = (char *) &xlrec_vacuum;
-                       rdata[0].len = SizeOfBtreeVacuum;
-               }
-               else
-               {
-                       xl_btree_delete xlrec_delete;
-                       xlrec_delete.node = rel->rd_node;
-                       xlrec_delete.block = BufferGetBlockNumber(buf);
+               xl_btree_vacuum xlrec_vacuum;
 
-                       /*
-                        * XXX: We would like to set an accurate latestRemovedXid, but
-                        * there is no easy way of obtaining a useful value. So we punt
-                        * and store InvalidTransactionId, which forces the standby to
-                        * wait for/cancel all currently running transactions.
-                        */
-                       xlrec_delete.latestRemovedXid = InvalidTransactionId;
-                       rdata[0].data = (char *) &xlrec_delete;
-                       rdata[0].len = SizeOfBtreeDelete;
-               }
+               xlrec_vacuum.node = rel->rd_node;
+               xlrec_vacuum.block = BufferGetBlockNumber(buf);
 
+               xlrec_vacuum.lastBlockVacuumed = lastBlockVacuumed;
+               rdata[0].data = (char *) &xlrec_vacuum;
+               rdata[0].len = SizeOfBtreeVacuum;
                rdata[0].buffer = InvalidBuffer;
                rdata[0].next = &(rdata[1]);
 
@@ -754,10 +794,82 @@ _bt_delitems(Relation rel, Buffer buf,
                rdata[1].buffer_std = true;
                rdata[1].next = NULL;
 
-               if (isVacuum)
-                       recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_VACUUM, rdata);
-               else
-                       recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata);
+               recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_VACUUM, rdata);
+
+               PageSetLSN(page, recptr);
+               PageSetTLI(page, ThisTimeLineID);
+       }
+
+       END_CRIT_SECTION();
+}
+
+void
+_bt_delitems_delete(Relation rel, Buffer buf,
+                                       OffsetNumber *itemnos, int nitems, Relation heapRel)
+{
+       Page            page = BufferGetPage(buf);
+       BTPageOpaque opaque;
+
+       Assert(nitems > 0);
+
+       /* No ereport(ERROR) until changes are logged */
+       START_CRIT_SECTION();
+
+       /* Fix the page */
+       PageIndexMultiDelete(page, itemnos, nitems);
+
+       /*
+        * We can clear the vacuum cycle ID since this page has certainly been
+        * processed by the current vacuum scan.
+        */
+       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+       opaque->btpo_cycleid = 0;
+
+       /*
+        * Mark the page as not containing any LP_DEAD items.  This is not
+        * certainly true (there might be some that have recently been marked, but
+        * weren't included in our target-item list), but it will almost always be
+        * true and it doesn't seem worth an additional page scan to check it.
+        * Remember that BTP_HAS_GARBAGE is only a hint anyway.
+        */
+       opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
+
+       MarkBufferDirty(buf);
+
+       /* XLOG stuff */
+       if (RelationNeedsWAL(rel))
+       {
+               XLogRecPtr      recptr;
+               XLogRecData rdata[3];
+
+               xl_btree_delete xlrec_delete;
+
+               xlrec_delete.node = rel->rd_node;
+               xlrec_delete.hnode = heapRel->rd_node;
+               xlrec_delete.block = BufferGetBlockNumber(buf);
+               xlrec_delete.nitems = nitems;
+
+               rdata[0].data = (char *) &xlrec_delete;
+               rdata[0].len = SizeOfBtreeDelete;
+               rdata[0].buffer = InvalidBuffer;
+               rdata[0].next = &(rdata[1]);
+
+               /*
+                * We need the target-offsets array whether or not we store the to
+                * allow us to find the latestRemovedXid on a standby server.
+                */
+               rdata[1].data = (char *) itemnos;
+               rdata[1].len = nitems * sizeof(OffsetNumber);
+               rdata[1].buffer = InvalidBuffer;
+               rdata[1].next = &(rdata[2]);
+
+               rdata[2].data = NULL;
+               rdata[2].len = 0;
+               rdata[2].buffer = buf;
+               rdata[2].buffer_std = true;
+               rdata[2].next = NULL;
+
+               recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata);
 
                PageSetLSN(page, recptr);
                PageSetTLI(page, ThisTimeLineID);
@@ -877,7 +989,7 @@ _bt_parent_deletion_safe(Relation rel, BlockNumber target, BTStack stack)
  * frequently.
  */
 int
-_bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full)
+_bt_pagedel(Relation rel, Buffer buf, BTStack stack)
 {
        int                     result;
        BlockNumber target,
@@ -1071,6 +1183,19 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full)
         */
        rightsib = opaque->btpo_next;
        rbuf = _bt_getbuf(rel, rightsib, BT_WRITE);
+       page = BufferGetPage(rbuf);
+       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+       if (opaque->btpo_prev != target)
+               elog(ERROR, "right sibling's left-link doesn't match: "
+                        "block %u links to %u instead of expected %u in index \"%s\"",
+                        rightsib, opaque->btpo_prev, target,
+                        RelationGetRelationName(rel));
+
+       /*
+        * Any insert which would have gone on the target block will now go to the
+        * right sibling block.
+        */
+       PredicateLockPageCombine(rel, target, rightsib);
 
        /*
         * Next find and write-lock the current parent of the target page. This is
@@ -1149,6 +1274,38 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full)
        }
 
        /*
+        * Check that the parent-page index items we're about to delete/overwrite
+        * contain what we expect.      This can fail if the index has become corrupt
+        * for some reason.  We want to throw any error before entering the
+        * critical section --- otherwise it'd be a PANIC.
+        *
+        * The test on the target item is just an Assert because _bt_getstackbuf
+        * should have guaranteed it has the expected contents.  The test on the
+        * next-child downlink is known to sometimes fail in the field, though.
+        */
+       page = BufferGetPage(pbuf);
+       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+#ifdef USE_ASSERT_CHECKING
+       itemid = PageGetItemId(page, poffset);
+       itup = (IndexTuple) PageGetItem(page, itemid);
+       Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target);
+#endif
+
+       if (!parent_half_dead)
+       {
+               OffsetNumber nextoffset;
+
+               nextoffset = OffsetNumberNext(poffset);
+               itemid = PageGetItemId(page, nextoffset);
+               itup = (IndexTuple) PageGetItem(page, itemid);
+               if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib)
+                       elog(ERROR, "right sibling %u of block %u is not next child %u of block %u in index \"%s\"",
+                                rightsib, target, ItemPointerGetBlockNumber(&(itup->t_tid)),
+                                parent, RelationGetRelationName(rel));
+       }
+
+       /*
         * Here we begin doing the deletion.
         */
 
@@ -1161,8 +1318,6 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full)
         * to copy the right sibling's downlink over the target downlink, and then
         * delete the following item.
         */
-       page = BufferGetPage(pbuf);
-       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
        if (parent_half_dead)
        {
                PageIndexTupleDelete(page, poffset);
@@ -1174,23 +1329,16 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full)
 
                itemid = PageGetItemId(page, poffset);
                itup = (IndexTuple) PageGetItem(page, itemid);
-               Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target);
                ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
 
                nextoffset = OffsetNumberNext(poffset);
-               /* This part is just for double-checking */
-               itemid = PageGetItemId(page, nextoffset);
-               itup = (IndexTuple) PageGetItem(page, itemid);
-               if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib)
-                       elog(PANIC, "right sibling %u of block %u is not next child of %u in index \"%s\"",
-                                rightsib, target, BufferGetBlockNumber(pbuf),
-                                RelationGetRelationName(rel));
                PageIndexTupleDelete(page, nextoffset);
        }
 
        /*
         * Update siblings' side-links.  Note the target page's side-links will
-        * continue to point to the siblings.
+        * continue to point to the siblings.  Asserts here are just rechecking
+        * things we already verified above.
         */
        if (BufferIsValid(lbuf))
        {
@@ -1207,14 +1355,13 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full)
 
        /*
         * Mark the page itself deleted.  It can be recycled when all current
-        * transactions are gone; or immediately if we're doing VACUUM FULL.
+        * transactions are gone.
         */
        page = BufferGetPage(buf);
        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
        opaque->btpo_flags &= ~BTP_HALF_DEAD;
        opaque->btpo_flags |= BTP_DELETED;
-       opaque->btpo.xact =
-               vacuum_full ? FrozenTransactionId : ReadNewTransactionId();
+       opaque->btpo.xact = ReadNewTransactionId();
 
        /* And update the metapage, if needed */
        if (BufferIsValid(metabuf))
@@ -1232,7 +1379,7 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full)
                MarkBufferDirty(lbuf);
 
        /* XLOG stuff */
-       if (!rel->rd_istemp)
+       if (RelationNeedsWAL(rel))
        {
                xl_btree_delete_page xlrec;
                xl_btree_metadata xlmeta;
@@ -1246,6 +1393,7 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full)
                xlrec.deadblk = target;
                xlrec.leftblk = leftsib;
                xlrec.rightblk = rightsib;
+               xlrec.btpo_xact = opaque->btpo.xact;
 
                rdata[0].data = (char *) &xlrec;
                rdata[0].len = SizeOfBtreeDeletePage;
@@ -1350,7 +1498,7 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full)
        {
                /* recursive call will release pbuf */
                _bt_relbuf(rel, rbuf);
-               result = _bt_pagedel(rel, pbuf, stack->bts_parent, vacuum_full) + 1;
+               result = _bt_pagedel(rel, pbuf, stack->bts_parent) + 1;
                _bt_relbuf(rel, buf);
        }
        else if (parent_one_child && rightsib_empty)
@@ -1358,7 +1506,7 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full)
                _bt_relbuf(rel, pbuf);
                _bt_relbuf(rel, buf);
                /* recursive call will release rbuf */
-               result = _bt_pagedel(rel, rbuf, stack, vacuum_full) + 1;
+               result = _bt_pagedel(rel, rbuf, stack) + 1;
        }
        else
        {