OSDN Git Service

Make the visibility map crash-safe.
authorRobert Haas <rhaas@postgresql.org>
Wed, 22 Jun 2011 03:04:40 +0000 (23:04 -0400)
committerRobert Haas <rhaas@postgresql.org>
Wed, 22 Jun 2011 03:04:40 +0000 (23:04 -0400)
This involves two main changes from the previous behavior.  First,
when we set a bit in the visibility map, emit a new WAL record of type
XLOG_HEAP2_VISIBLE.  Replay sets the page-level PD_ALL_VISIBLE bit and
the visibility map bit.  Second, when inserting, updating, or deleting
a tuple, we can no longer get away with clearing the visibility map
bit after releasing the lock on the corresponding heap page, because
an intervening crash might leave the visibility map bit set and the
page-level bit clear.  Making this work requires a bit of interface
refactoring.

In passing, a few minor but related cleanups: change the test in
visibilitymap_set and visibilitymap_clear to throw an error if the
wrong page (or no page) is pinned, rather than silently doing nothing;
this case should never occur.  Also, remove duplicate definitions of
InvalidXLogRecPtr.

Patch by me, review by Noah Misch.

12 files changed:
src/backend/access/heap/heapam.c
src/backend/access/heap/hio.c
src/backend/access/heap/visibilitymap.c
src/backend/access/transam/transam.c
src/backend/access/transam/xlog.c
src/backend/commands/vacuumlazy.c
src/include/access/heapam.h
src/include/access/hio.h
src/include/access/htup.h
src/include/access/transam.h
src/include/access/visibilitymap.h
src/include/access/xlog_internal.h

index b947c11..7bb4a87 100644 (file)
@@ -1862,6 +1862,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
        TransactionId xid = GetCurrentTransactionId();
        HeapTuple       heaptup;
        Buffer          buffer;
+       Buffer          vmbuffer = InvalidBuffer;
        bool            all_visible_cleared = false;
 
        if (relation->rd_rel->relhasoids)
@@ -1914,9 +1915,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
        else
                heaptup = tup;
 
-       /* Find buffer to insert this tuple into */
+       /*
+        * Find buffer to insert this tuple into.  If the page is all visible,
+        * this will also pin the requisite visibility map page.
+        */
        buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
-                                                                          InvalidBuffer, options, bistate);
+                                                                          InvalidBuffer, options, bistate,
+                                                                          &vmbuffer);
 
        /*
         * We're about to do the actual insert -- check for conflict at the
@@ -1934,6 +1939,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
        {
                all_visible_cleared = true;
                PageClearAllVisible(BufferGetPage(buffer));
+               visibilitymap_clear(relation,
+                                                       ItemPointerGetBlockNumber(&(heaptup->t_self)),
+                                                       vmbuffer);
        }
 
        /*
@@ -2010,11 +2018,8 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
        END_CRIT_SECTION();
 
        UnlockReleaseBuffer(buffer);
-
-       /* Clear the bit in the visibility map if necessary */
-       if (all_visible_cleared)
-               visibilitymap_clear(relation,
-                                                       ItemPointerGetBlockNumber(&(heaptup->t_self)));
+       if (vmbuffer != InvalidBuffer)
+               ReleaseBuffer(vmbuffer);
 
        /*
         * If tuple is cachable, mark it for invalidation from the caches in case
@@ -2089,17 +2094,43 @@ heap_delete(Relation relation, ItemPointer tid,
        ItemId          lp;
        HeapTupleData tp;
        Page            page;
+       BlockNumber     block;
        Buffer          buffer;
+       Buffer          vmbuffer = InvalidBuffer;
        bool            have_tuple_lock = false;
        bool            iscombo;
        bool            all_visible_cleared = false;
 
        Assert(ItemPointerIsValid(tid));
 
-       buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
+       block = ItemPointerGetBlockNumber(tid);
+       buffer = ReadBuffer(relation, block);
+       page = BufferGetPage(buffer);
+
+       /*
+        * Before locking the buffer, pin the visibility map page if it appears
+        * to be necessary.  Since we haven't got the lock yet, someone else might
+        * be in the middle of changing this, so we'll need to recheck after
+        * we have the lock.
+        */
+       if (PageIsAllVisible(page))
+               visibilitymap_pin(relation, block, &vmbuffer);
+
        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
-       page = BufferGetPage(buffer);
+       /*
+        * If we didn't pin the visibility map page and the page has become all
+        * visible while we were busy locking the buffer, we'll have to unlock and
+        * re-lock, to avoid holding the buffer lock across an I/O.  That's a bit
+        * unfortunate, but hopefully shouldn't happen often.
+        */
+       if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
+       {
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               visibilitymap_pin(relation, block, &vmbuffer);
+               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+       }
+
        lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
        Assert(ItemIdIsNormal(lp));
 
@@ -2222,6 +2253,8 @@ l1:
                UnlockReleaseBuffer(buffer);
                if (have_tuple_lock)
                        UnlockTuple(relation, &(tp.t_self), ExclusiveLock);
+               if (vmbuffer != InvalidBuffer)
+                       ReleaseBuffer(vmbuffer);
                return result;
        }
 
@@ -2249,6 +2282,8 @@ l1:
        {
                all_visible_cleared = true;
                PageClearAllVisible(page);
+               visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
+                                                       vmbuffer);
        }
 
        /* store transaction information of xact deleting the tuple */
@@ -2296,6 +2331,9 @@ l1:
 
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
+       if (vmbuffer != InvalidBuffer)
+               ReleaseBuffer(vmbuffer);
+
        /*
         * If the tuple has toasted out-of-line attributes, we need to delete
         * those items too.  We have to do this before releasing the buffer
@@ -2317,10 +2355,6 @@ l1:
         */
        CacheInvalidateHeapTuple(relation, &tp);
 
-       /* Clear the bit in the visibility map if necessary */
-       if (all_visible_cleared)
-               visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
-
        /* Now we can release the buffer */
        ReleaseBuffer(buffer);
 
@@ -2419,8 +2453,11 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
        HeapTupleData oldtup;
        HeapTuple       heaptup;
        Page            page;
+       BlockNumber     block;
        Buffer          buffer,
-                               newbuf;
+                               newbuf,
+                               vmbuffer = InvalidBuffer,
+                               vmbuffer_new = InvalidBuffer;
        bool            need_toast,
                                already_marked;
        Size            newtupsize,
@@ -2447,10 +2484,34 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
         */
        hot_attrs = RelationGetIndexAttrBitmap(relation);
 
-       buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid));
+       block = ItemPointerGetBlockNumber(otid);
+       buffer = ReadBuffer(relation, block);
+       page = BufferGetPage(buffer);
+
+       /*
+        * Before locking the buffer, pin the visibility map page if it appears
+        * to be necessary.  Since we haven't got the lock yet, someone else might
+        * be in the middle of changing this, so we'll need to recheck after
+        * we have the lock.
+        */
+       if (PageIsAllVisible(page))
+               visibilitymap_pin(relation, block, &vmbuffer);
+
        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
-       page = BufferGetPage(buffer);
+       /*
+        * If we didn't pin the visibility map page and the page has become all
+        * visible while we were busy locking the buffer, we'll have to unlock and
+        * re-lock, to avoid holding the buffer lock across an I/O.  That's a bit
+        * unfortunate, but hopefully shouldn't happen often.
+        */
+       if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
+       {
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               visibilitymap_pin(relation, block, &vmbuffer);
+               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+       }
+
        lp = PageGetItemId(page, ItemPointerGetOffsetNumber(otid));
        Assert(ItemIdIsNormal(lp));
 
@@ -2580,6 +2641,8 @@ l2:
                UnlockReleaseBuffer(buffer);
                if (have_tuple_lock)
                        UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock);
+               if (vmbuffer != InvalidBuffer)
+                       ReleaseBuffer(vmbuffer);
                bms_free(hot_attrs);
                return result;
        }
@@ -2700,7 +2763,8 @@ l2:
                {
                        /* Assume there's no chance to put heaptup on same page. */
                        newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
-                                                                                          buffer, 0, NULL);
+                                                                                          buffer, 0, NULL,
+                                                                                          &vmbuffer_new);
                }
                else
                {
@@ -2717,7 +2781,8 @@ l2:
                                 */
                                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
                                newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
-                                                                                                  buffer, 0, NULL);
+                                                                                                  buffer, 0, NULL,
+                                                                                                  &vmbuffer_new);
                        }
                        else
                        {
@@ -2866,14 +2931,20 @@ l2:
 
        /* Clear bits in visibility map */
        if (all_visible_cleared)
-               visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
+               visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
+                                                       vmbuffer);
        if (all_visible_cleared_new)
-               visibilitymap_clear(relation, BufferGetBlockNumber(newbuf));
+               visibilitymap_clear(relation, BufferGetBlockNumber(newbuf),
+                                                       vmbuffer_new);
 
        /* Now we can release the buffer(s) */
        if (newbuf != buffer)
                ReleaseBuffer(newbuf);
        ReleaseBuffer(buffer);
+       if (BufferIsValid(vmbuffer_new))
+               ReleaseBuffer(vmbuffer_new);
+       if (BufferIsValid(vmbuffer))
+               ReleaseBuffer(vmbuffer);
 
        /*
         * If new tuple is cachable, mark it for invalidation from the caches in
@@ -4036,6 +4107,38 @@ log_heap_freeze(Relation reln, Buffer buffer,
 }
 
 /*
+ * Perform XLogInsert for a heap-visible operation.     'block' is the block
+ * being marked all-visible, and vm_buffer is the buffer containing the
+ * corresponding visibility map block.  Both should have already been modified
+ * and dirtied.
+ */
+XLogRecPtr
+log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer)
+{
+       xl_heap_visible xlrec;
+       XLogRecPtr      recptr;
+       XLogRecData rdata[2];
+
+       xlrec.node = rnode;
+       xlrec.block = block;
+
+       rdata[0].data = (char *) &xlrec;
+       rdata[0].len = SizeOfHeapVisible;
+       rdata[0].buffer = InvalidBuffer;
+       rdata[0].next = &(rdata[1]);
+
+       rdata[1].data = NULL;
+       rdata[1].len = 0;
+       rdata[1].buffer = vm_buffer;
+       rdata[1].buffer_std = false;
+       rdata[1].next = NULL;
+
+       recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE, rdata);
+
+       return recptr;
+}
+
+/*
  * Perform XLogInsert for a heap-update operation.     Caller must already
  * have modified the buffer(s) and marked them dirty.
  */
@@ -4323,6 +4426,92 @@ heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
        UnlockReleaseBuffer(buffer);
 }
 
+/*
+ * Replay XLOG_HEAP2_VISIBLE record.
+ *
+ * The critical integrity requirement here is that we must never end up with
+ * a situation where the visibility map bit is set, and the page-level
+ * PD_ALL_VISIBLE bit is clear.  If that were to occur, then a subsequent
+ * page modification would fail to clear the visibility map bit.
+ */
+static void
+heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
+{
+       xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
+       Buffer          buffer;
+       Page            page;
+
+       /*
+        * Read the heap page, if it still exists.  If the heap file has been
+        * dropped or truncated later in recovery, this might fail.  In that case,
+        * there's no point in doing anything further, since the visibility map
+        * will have to be cleared out at the same time.
+        */
+       buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block,
+                                                                       RBM_NORMAL);
+       if (!BufferIsValid(buffer))
+               return;
+       page = (Page) BufferGetPage(buffer);
+
+       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+       /*
+        * We don't bump the LSN of the heap page when setting the visibility
+        * map bit, because that would generate an unworkable volume of
+        * full-page writes.  This exposes us to torn page hazards, but since
+        * we're not inspecting the existing page contents in any way, we
+        * don't care.
+        *
+        * However, all operations that clear the visibility map bit *do* bump
+        * the LSN, and those operations will only be replayed if the XLOG LSN
+        * follows the page LSN.  Thus, if the page LSN has advanced past our
+        * XLOG record's LSN, we mustn't mark the page all-visible, because
+        * the subsequent update won't be replayed to clear the flag.
+        */
+       if (!XLByteLE(lsn, PageGetLSN(page)))
+       {
+               PageSetAllVisible(page);
+               MarkBufferDirty(buffer);
+       }
+
+       /* Done with heap page. */
+       UnlockReleaseBuffer(buffer);
+
+       /*
+        * Even we skipped the heap page update due to the LSN interlock, it's
+        * still safe to update the visibility map.  Any WAL record that clears
+        * the visibility map bit does so before checking the page LSN, so any
+        * bits that need to be cleared will still be cleared.
+        */
+       if (record->xl_info & XLR_BKP_BLOCK_1)
+               RestoreBkpBlocks(lsn, record, false);
+       else
+       {
+               Relation        reln;
+               Buffer          vmbuffer = InvalidBuffer;
+
+               reln = CreateFakeRelcacheEntry(xlrec->node);
+               visibilitymap_pin(reln, xlrec->block, &vmbuffer);
+
+               /*
+                * Don't set the bit if replay has already passed this point.
+                *
+                * It might be safe to do this unconditionally; if replay has past
+                * this point, we'll replay at least as far this time as we did before,
+                * and if this bit needs to be cleared, the record responsible for
+                * doing so should be again replayed, and clear it.  For right now,
+                * out of an abundance of conservatism, we use the same test here
+                * we did for the heap page; if this results in a dropped bit, no real
+                * harm is done; and the next VACUUM will fix it.
+                */
+               if (!XLByteLE(lsn, PageGetLSN(BufferGetPage(vmbuffer))))
+                       visibilitymap_set(reln, xlrec->block, lsn, vmbuffer);
+
+               ReleaseBuffer(vmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+}
+
 static void
 heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
 {
@@ -4377,8 +4566,11 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
        if (xlrec->all_visible_cleared)
        {
                Relation        reln = CreateFakeRelcacheEntry(xlrec->target.node);
+               Buffer          vmbuffer = InvalidBuffer;
 
-               visibilitymap_clear(reln, blkno);
+               visibilitymap_pin(reln, blkno, &vmbuffer);
+               visibilitymap_clear(reln, blkno, vmbuffer);
+               ReleaseBuffer(vmbuffer);
                FreeFakeRelcacheEntry(reln);
        }
 
@@ -4455,8 +4647,11 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
        if (xlrec->all_visible_cleared)
        {
                Relation        reln = CreateFakeRelcacheEntry(xlrec->target.node);
+               Buffer          vmbuffer = InvalidBuffer;
 
-               visibilitymap_clear(reln, blkno);
+               visibilitymap_pin(reln, blkno, &vmbuffer);
+               visibilitymap_clear(reln, blkno, vmbuffer);
+               ReleaseBuffer(vmbuffer);
                FreeFakeRelcacheEntry(reln);
        }
 
@@ -4567,9 +4762,12 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
        if (xlrec->all_visible_cleared)
        {
                Relation        reln = CreateFakeRelcacheEntry(xlrec->target.node);
+               BlockNumber     block = ItemPointerGetBlockNumber(&xlrec->target.tid);
+               Buffer          vmbuffer = InvalidBuffer;
 
-               visibilitymap_clear(reln,
-                                                       ItemPointerGetBlockNumber(&xlrec->target.tid));
+               visibilitymap_pin(reln, block, &vmbuffer);
+               visibilitymap_clear(reln, block, vmbuffer);
+               ReleaseBuffer(vmbuffer);
                FreeFakeRelcacheEntry(reln);
        }
 
@@ -4648,8 +4846,12 @@ newt:;
        if (xlrec->new_all_visible_cleared)
        {
                Relation        reln = CreateFakeRelcacheEntry(xlrec->target.node);
+               BlockNumber     block = ItemPointerGetBlockNumber(&xlrec->newtid);
+               Buffer          vmbuffer = InvalidBuffer;
 
-               visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->newtid));
+               visibilitymap_pin(reln, block, &vmbuffer);
+               visibilitymap_clear(reln, block, vmbuffer);
+               ReleaseBuffer(vmbuffer);
                FreeFakeRelcacheEntry(reln);
        }
 
@@ -4915,6 +5117,9 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
                case XLOG_HEAP2_CLEANUP_INFO:
                        heap_xlog_cleanup_info(lsn, record);
                        break;
+               case XLOG_HEAP2_VISIBLE:
+                       heap_xlog_visible(lsn, record);
+                       break;
                default:
                        elog(PANIC, "heap2_redo: unknown op code %u", info);
        }
@@ -5044,6 +5249,14 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
                appendStringInfo(buf, "cleanup info: remxid %u",
                                                 xlrec->latestRemovedXid);
        }
+       else if (info == XLOG_HEAP2_VISIBLE)
+       {
+               xl_heap_visible *xlrec = (xl_heap_visible *) rec;
+
+               appendStringInfo(buf, "visible: rel %u/%u/%u; blk %u",
+                                                xlrec->node.spcNode, xlrec->node.dbNode,
+                                                xlrec->node.relNode, xlrec->block);
+       }
        else
                appendStringInfo(buf, "UNKNOWN");
 }
index 72a69e5..aee2a20 100644 (file)
@@ -17,6 +17,7 @@
 
 #include "access/heapam.h"
 #include "access/hio.h"
+#include "access/visibilitymap.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
@@ -150,7 +151,8 @@ ReadBufferBI(Relation relation, BlockNumber targetBlock,
 Buffer
 RelationGetBufferForTuple(Relation relation, Size len,
                                                  Buffer otherBuffer, int options,
-                                                 struct BulkInsertStateData * bistate)
+                                                 struct BulkInsertStateData * bistate,
+                                                 Buffer *vmbuffer)
 {
        bool            use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
        Buffer          buffer = InvalidBuffer;
@@ -237,23 +239,37 @@ RelationGetBufferForTuple(Relation relation, Size len,
                 * Read and exclusive-lock the target block, as well as the other
                 * block if one was given, taking suitable care with lock ordering and
                 * the possibility they are the same block.
+                *
+                * If the page-level all-visible flag is set, caller will need to clear
+                * both that and the corresponding visibility map bit.  However, by the
+                * time we return, we'll have x-locked the buffer, and we don't want to
+                * do any I/O while in that state.  So we check the bit here before
+                * taking the lock, and pin the page if it appears necessary.
+                * Checking without the lock creates a risk of getting the wrong
+                * answer, so we'll have to recheck after acquiring the lock.
                 */
                if (otherBuffer == InvalidBuffer)
                {
                        /* easy case */
                        buffer = ReadBufferBI(relation, targetBlock, bistate);
+                       if (PageIsAllVisible(BufferGetPage(buffer)))
+                               visibilitymap_pin(relation, targetBlock, vmbuffer);
                        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
                }
                else if (otherBlock == targetBlock)
                {
                        /* also easy case */
                        buffer = otherBuffer;
+                       if (PageIsAllVisible(BufferGetPage(buffer)))
+                               visibilitymap_pin(relation, targetBlock, vmbuffer);
                        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
                }
                else if (otherBlock < targetBlock)
                {
                        /* lock other buffer first */
                        buffer = ReadBuffer(relation, targetBlock);
+                       if (PageIsAllVisible(BufferGetPage(buffer)))
+                               visibilitymap_pin(relation, targetBlock, vmbuffer);
                        LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
                        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
                }
@@ -261,11 +277,41 @@ RelationGetBufferForTuple(Relation relation, Size len,
                {
                        /* lock target buffer first */
                        buffer = ReadBuffer(relation, targetBlock);
+                       if (PageIsAllVisible(BufferGetPage(buffer)))
+                               visibilitymap_pin(relation, targetBlock, vmbuffer);
                        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
                        LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
                }
 
                /*
+                * If the page is all visible but we don't have the right visibility
+                * map page pinned, then give up our locks, go get the pin, and
+                * re-lock.  This is pretty painful, but hopefully shouldn't happen
+                * often.  Note that there's a small possibility that we didn't pin
+                * the page above but still have the correct page pinned anyway, either
+                * because we've already made a previous pass through this loop, or
+                * because caller passed us the right page anyway.
+                *
+                * Note also that it's possible that by the time we get the pin and
+                * retake the buffer locks, the visibility map bit will have been
+                * cleared by some other backend anyway.  In that case, we'll have done
+                * a bit of extra work for no gain, but there's no real harm done.
+                */
+               if (PageIsAllVisible(BufferGetPage(buffer))
+                       && !visibilitymap_pin_ok(targetBlock, *vmbuffer))
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       if (otherBlock != targetBlock)
+                               LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
+                       visibilitymap_pin(relation, targetBlock, vmbuffer);
+                       if (otherBuffer != InvalidBuffer && otherBlock < targetBlock)
+                               LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
+                       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+                       if (otherBuffer != InvalidBuffer && otherBlock > targetBlock)
+                               LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
+               }
+
+               /*
                 * Now we can check to see if there's enough free space here. If so,
                 * we're done.
                 */
index 58bab7d..a193520 100644 (file)
  *       src/backend/access/heap/visibilitymap.c
  *
  * INTERFACE ROUTINES
- *             visibilitymap_clear - clear a bit in the visibility map
- *             visibilitymap_pin       - pin a map page for setting a bit
- *             visibilitymap_set       - set a bit in a previously pinned page
- *             visibilitymap_test      - test if a bit is set
+ *             visibilitymap_clear  - clear a bit in the visibility map
+ *             visibilitymap_pin        - pin a map page for setting a bit
+ *             visibilitymap_pin_ok - check whether correct map page is already pinned
+ *             visibilitymap_set        - set a bit in a previously pinned page
+ *             visibilitymap_test       - test if a bit is set
  *
  * NOTES
  *
  * It would be nice to use the visibility map to skip visibility checks in
  * index scans.
  *
- * Currently, the visibility map is not 100% correct all the time.
- * During updates, the bit in the visibility map is cleared after releasing
- * the lock on the heap page. During the window between releasing the lock
- * and clearing the bit in the visibility map, the bit in the visibility map
- * is set, but the new insertion or deletion is not yet visible to other
- * backends.
- *
- * That might actually be OK for the index scans, though. The newly inserted
- * tuple wouldn't have an index pointer yet, so all tuples reachable from an
- * index would still be visible to all other backends, and deletions wouldn't
- * be visible to other backends yet.  (But HOT breaks that argument, no?)
- *
- * There's another hole in the way the PD_ALL_VISIBLE flag is set. When
- * vacuum observes that all tuples are visible to all, it sets the flag on
- * the heap page, and also sets the bit in the visibility map. If we then
- * crash, and only the visibility map page was flushed to disk, we'll have
- * a bit set in the visibility map, but the corresponding flag on the heap
- * page is not set. If the heap page is then updated, the updater won't
- * know to clear the bit in the visibility map.  (Isn't that prevented by
- * the LSN interlock?)
- *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
+#include "access/heapam.h"
 #include "access/visibilitymap.h"
+#include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
 #include "storage/lmgr.h"
@@ -127,38 +109,37 @@ static void vm_extend(Relation rel, BlockNumber nvmblocks);
 /*
  *     visibilitymap_clear - clear a bit in visibility map
  *
- * Clear a bit in the visibility map, marking that not all tuples are
- * visible to all transactions anymore.
+ * You must pass a buffer containing the correct map page to this function.
+ * Call visibilitymap_pin first to pin the right one. This function doesn't do
+ * any I/O.
  */
 void
-visibilitymap_clear(Relation rel, BlockNumber heapBlk)
+visibilitymap_clear(Relation rel, BlockNumber heapBlk, Buffer buf)
 {
        BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
        int                     mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
        int                     mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
        uint8           mask = 1 << mapBit;
-       Buffer          mapBuffer;
        char       *map;
 
 #ifdef TRACE_VISIBILITYMAP
        elog(DEBUG1, "vm_clear %s %d", RelationGetRelationName(rel), heapBlk);
 #endif
 
-       mapBuffer = vm_readbuf(rel, mapBlock, false);
-       if (!BufferIsValid(mapBuffer))
-               return;                                 /* nothing to do */
+       if (!BufferIsValid(buf) || BufferGetBlockNumber(buf) != mapBlock)
+               elog(ERROR, "wrong buffer passed to visibilitymap_clear");
 
-       LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
-       map = PageGetContents(BufferGetPage(mapBuffer));
+       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+       map = PageGetContents(BufferGetPage(buf));
 
        if (map[mapByte] & mask)
        {
                map[mapByte] &= ~mask;
 
-               MarkBufferDirty(mapBuffer);
+               MarkBufferDirty(buf);
        }
 
-       UnlockReleaseBuffer(mapBuffer);
+       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 }
 
 /*
@@ -194,19 +175,36 @@ visibilitymap_pin(Relation rel, BlockNumber heapBlk, Buffer *buf)
 }
 
 /*
+ *     visibilitymap_pin_ok - do we already have the correct page pinned?
+ *
+ * On entry, buf should be InvalidBuffer or a valid buffer returned by
+ * an earlier call to visibilitymap_pin or visibilitymap_test on the same
+ * relation.  The return value indicates whether the buffer covers the
+ * given heapBlk.
+ */
+bool
+visibilitymap_pin_ok(BlockNumber heapBlk, Buffer buf)
+{
+       BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+
+       return BufferIsValid(buf) && BufferGetBlockNumber(buf) == mapBlock;
+}
+
+/*
  *     visibilitymap_set - set a bit on a previously pinned page
  *
- * recptr is the LSN of the heap page. The LSN of the visibility map page is
- * advanced to that, to make sure that the visibility map doesn't get flushed
- * to disk before the update to the heap page that made all tuples visible.
+ * recptr is the LSN of the XLOG record we're replaying, if we're in recovery,
+ * or InvalidXLogRecPtr in normal running.  The page LSN is advanced to the
+ * one provided; in normal running, we generate a new XLOG record and set the
+ * page LSN to that value.
  *
- * This is an opportunistic function. It does nothing, unless *buf
- * contains the bit for heapBlk. Call visibilitymap_pin first to pin
- * the right map page. This function doesn't do any I/O.
+ * You must pass a buffer containing the correct map page to this function.
+ * Call visibilitymap_pin first to pin the right one. This function doesn't do
+ * any I/O.
  */
 void
 visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
-                                 Buffer *buf)
+                                 Buffer buf)
 {
        BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
        uint32          mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
@@ -218,25 +216,35 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
        elog(DEBUG1, "vm_set %s %d", RelationGetRelationName(rel), heapBlk);
 #endif
 
+       Assert(InRecovery || XLogRecPtrIsInvalid(recptr));
+
        /* Check that we have the right page pinned */
-       if (!BufferIsValid(*buf) || BufferGetBlockNumber(*buf) != mapBlock)
-               return;
+       if (!BufferIsValid(buf) || BufferGetBlockNumber(buf) != mapBlock)
+               elog(ERROR, "wrong buffer passed to visibilitymap_set");
 
-       page = BufferGetPage(*buf);
+       page = BufferGetPage(buf);
        map = PageGetContents(page);
-       LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
+       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
        if (!(map[mapByte] & (1 << mapBit)))
        {
+               START_CRIT_SECTION();
+
                map[mapByte] |= (1 << mapBit);
+               MarkBufferDirty(buf);
 
-               if (XLByteLT(PageGetLSN(page), recptr))
+               if (RelationNeedsWAL(rel))
+               {
+                       if (XLogRecPtrIsInvalid(recptr))
+                               recptr = log_heap_visible(rel->rd_node, heapBlk, buf);
                        PageSetLSN(page, recptr);
-               PageSetTLI(page, ThisTimeLineID);
-               MarkBufferDirty(*buf);
+                       PageSetTLI(page, ThisTimeLineID);
+               }
+
+               END_CRIT_SECTION();
        }
 
-       LockBuffer(*buf, BUFFER_LOCK_UNLOCK);
+       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 }
 
 /*
index bc02f15..472dd4c 100644 (file)
@@ -24,6 +24,8 @@
 #include "access/transam.h"
 #include "utils/snapmgr.h"
 
+/* Handy constant for an invalid xlog recptr */
+const XLogRecPtr InvalidXLogRecPtr = {0, 0};
 
 /*
  * Single-item cache for results of TransactionLogFetch.  It's worth having
@@ -35,9 +37,6 @@ static TransactionId cachedFetchXid = InvalidTransactionId;
 static XidStatus cachedFetchXidStatus;
 static XLogRecPtr cachedCommitLSN;
 
-/* Handy constant for an invalid xlog recptr */
-static const XLogRecPtr InvalidXLogRecPtr = {0, 0};
-
 /* Local functions */
 static XidStatus TransactionLogFetch(TransactionId transactionId);
 
index aa0b029..4952d22 100644 (file)
@@ -5462,7 +5462,6 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
 {
        char            recoveryPath[MAXPGPATH];
        char            xlogpath[MAXPGPATH];
-       XLogRecPtr      InvalidXLogRecPtr = {0, 0};
 
        /*
         * We are no longer in archive recovery state.
@@ -8069,8 +8068,6 @@ CreateRestartPoint(int flags)
        if (XLogRecPtrIsInvalid(lastCheckPointRecPtr) ||
                XLByteLE(lastCheckPoint.redo, ControlFile->checkPointCopy.redo))
        {
-               XLogRecPtr      InvalidXLogRecPtr = {0, 0};
-
                ereport(DEBUG2,
                                (errmsg("skipping restartpoint, already performed at %X/%X",
                                  lastCheckPoint.redo.xlogid, lastCheckPoint.redo.xrecoff)));
index ccc586f..c5bf32e 100644 (file)
@@ -513,7 +513,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                                visibilitymap_pin(onerel, blkno, &vmbuffer);
                                LockBuffer(buf, BUFFER_LOCK_SHARE);
                                if (PageIsAllVisible(page))
-                                       visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
+                                       visibilitymap_set(onerel, blkno, InvalidXLogRecPtr,
+                                                                         vmbuffer);
                                LockBuffer(buf, BUFFER_LOCK_UNLOCK);
                        }
 
@@ -765,7 +766,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                         * updating the visibility map, but since this case shouldn't
                         * happen anyway, don't worry about that.
                         */
-                       visibilitymap_clear(onerel, blkno);
+                       visibilitymap_pin(onerel, blkno, &vmbuffer);
+                       visibilitymap_clear(onerel, blkno, vmbuffer);
                }
 
                LockBuffer(buf, BUFFER_LOCK_UNLOCK);
@@ -776,7 +778,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                        visibilitymap_pin(onerel, blkno, &vmbuffer);
                        LockBuffer(buf, BUFFER_LOCK_SHARE);
                        if (PageIsAllVisible(page))
-                               visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
+                               visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer);
                        LockBuffer(buf, BUFFER_LOCK_UNLOCK);
                }
 
index 4dbc393..fc65761 100644 (file)
@@ -136,6 +136,8 @@ extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
 extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
                                TransactionId cutoff_xid,
                                OffsetNumber *offsets, int offcnt);
+extern XLogRecPtr log_heap_visible(RelFileNode rnode, BlockNumber block,
+                                Buffer vm_buffer);
 extern XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum,
                        BlockNumber blk, Page page);
 
index 6b661a3..7ae8797 100644 (file)
@@ -38,6 +38,7 @@ extern void RelationPutHeapTuple(Relation relation, Buffer buffer,
                                         HeapTuple tuple);
 extern Buffer RelationGetBufferForTuple(Relation relation, Size len,
                                                  Buffer otherBuffer, int options,
-                                                 struct BulkInsertStateData * bistate);
+                                                 struct BulkInsertStateData * bistate,
+                                                 Buffer *vmbuffer);
 
 #endif   /* HIO_H */
index c147707..ba5d9b2 100644 (file)
@@ -606,6 +606,7 @@ typedef HeapTupleData *HeapTuple;
 #define XLOG_HEAP2_CLEAN               0x10
 /* 0x20 is free, was XLOG_HEAP2_CLEAN_MOVE */
 #define XLOG_HEAP2_CLEANUP_INFO 0x30
+#define XLOG_HEAP2_VISIBLE             0x40
 
 /*
  * All what we need to find changed tuple
@@ -750,6 +751,15 @@ typedef struct xl_heap_freeze
 
 #define SizeOfHeapFreeze (offsetof(xl_heap_freeze, cutoff_xid) + sizeof(TransactionId))
 
+/* This is what we need to know about setting a visibility map bit */
+typedef struct xl_heap_visible
+{
+       RelFileNode node;
+       BlockNumber block;
+} xl_heap_visible;
+
+#define SizeOfHeapVisible (offsetof(xl_heap_visible, block) + sizeof(BlockNumber))
+
 extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
                                                                           TransactionId *latestRemovedXid);
 
index c5e6ab0..c038fd9 100644 (file)
@@ -135,6 +135,9 @@ extern bool TransactionStartedDuringRecovery(void);
 /* in transam/varsup.c */
 extern PGDLLIMPORT VariableCache ShmemVariableCache;
 
+/* in transam/transam.c */
+extern const XLogRecPtr InvalidXLogRecPtr;
+
 
 /*
  * prototypes for functions in transam/transam.c
index 689060b..7d62c12 100644 (file)
 #include "storage/buf.h"
 #include "utils/relcache.h"
 
-extern void visibilitymap_clear(Relation rel, BlockNumber heapBlk);
+extern void visibilitymap_clear(Relation rel, BlockNumber heapBlk,
+                                       Buffer vmbuf);
 extern void visibilitymap_pin(Relation rel, BlockNumber heapBlk,
                                  Buffer *vmbuf);
+extern bool visibilitymap_pin_ok(BlockNumber heapBlk, Buffer vmbuf);
 extern void visibilitymap_set(Relation rel, BlockNumber heapBlk,
-                                 XLogRecPtr recptr, Buffer *vmbuf);
+                                 XLogRecPtr recptr, Buffer vmbuf);
 extern bool visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
 extern void visibilitymap_truncate(Relation rel, BlockNumber heapblk);
 
index 7e39630..34316ff 100644 (file)
@@ -71,7 +71,7 @@ typedef struct XLogContRecord
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD066 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD067 /* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {