OSDN Git Service

Try again to make the visibility map crash safe.
authorRobert Haas <rhaas@postgresql.org>
Mon, 27 Jun 2011 17:55:55 +0000 (13:55 -0400)
committerRobert Haas <rhaas@postgresql.org>
Mon, 27 Jun 2011 17:55:55 +0000 (13:55 -0400)
My previous attempt was quite a bit less than half-baked with respect to
heap_update().

src/backend/access/heap/heapam.c
src/backend/access/heap/hio.c
src/include/access/hio.h

index a48e057..ce82d04 100644 (file)
@@ -1941,7 +1941,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
         */
        buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
                                                                           InvalidBuffer, options, bistate,
-                                                                          &vmbuffer);
+                                                                          &vmbuffer, NULL);
 
        /*
         * We're about to do the actual insert -- check for conflict at the
@@ -2519,19 +2519,6 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 
        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
-       /*
-        * If we didn't pin the visibility map page and the page has become all
-        * visible while we were busy locking the buffer, we'll have to unlock and
-        * re-lock, to avoid holding the buffer lock across an I/O.  That's a bit
-        * unfortunate, but hopefully shouldn't happen often.
-        */
-       if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
-       {
-               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-               visibilitymap_pin(relation, block, &vmbuffer);
-               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-       }
-
        lp = PageGetItemId(page, ItemPointerGetOffsetNumber(otid));
        Assert(ItemIdIsNormal(lp));
 
@@ -2668,6 +2655,20 @@ l2:
        }
 
        /*
+        * If we didn't pin the visibility map page and the page has become all
+        * visible while we were busy locking the buffer, or during some subsequent
+        * window during which we had it unlocked, we'll have to unlock and
+        * re-lock, to avoid holding the buffer lock across an I/O.  That's a bit
+        * unfortunate, but hopefully shouldn't happen often.
+        */
+       if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
+       {
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               visibilitymap_pin(relation, block, &vmbuffer);
+               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+       }
+
+       /*
         * We're about to do the actual update -- check for conflict first, to
         * avoid possibly having to roll back work we've just done.
         */
@@ -2784,7 +2785,7 @@ l2:
                        /* Assume there's no chance to put heaptup on same page. */
                        newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
                                                                                           buffer, 0, NULL,
-                                                                                          &vmbuffer_new);
+                                                                                          &vmbuffer_new, &vmbuffer);
                }
                else
                {
@@ -2802,7 +2803,7 @@ l2:
                                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
                                newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
                                                                                                   buffer, 0, NULL,
-                                                                                                  &vmbuffer_new);
+                                                                                                  &vmbuffer_new, &vmbuffer);
                        }
                        else
                        {
@@ -2908,11 +2909,15 @@ l2:
        {
                all_visible_cleared = true;
                PageClearAllVisible(BufferGetPage(buffer));
+               visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
+                                                       vmbuffer);
        }
        if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf)))
        {
                all_visible_cleared_new = true;
                PageClearAllVisible(BufferGetPage(newbuf));
+               visibilitymap_clear(relation, BufferGetBlockNumber(newbuf),
+                                                       vmbuffer_new);
        }
 
        if (newbuf != buffer)
@@ -2949,14 +2954,6 @@ l2:
         */
        CacheInvalidateHeapTuple(relation, &oldtup);
 
-       /* Clear bits in visibility map */
-       if (all_visible_cleared)
-               visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
-                                                       vmbuffer);
-       if (all_visible_cleared_new)
-               visibilitymap_clear(relation, BufferGetBlockNumber(newbuf),
-                                                       vmbuffer_new);
-
        /* Now we can release the buffer(s) */
        if (newbuf != buffer)
                ReleaseBuffer(newbuf);
index aee2a20..b4ea84a 100644 (file)
@@ -97,6 +97,64 @@ ReadBufferBI(Relation relation, BlockNumber targetBlock,
 }
 
 /*
+ * For each heap page which is all-visible, acquire a pin on the appropriate
+ * visibility map page, if we haven't already got one.
+ *
+ * buffer2 may be InvalidBuffer, if only one buffer is involved.  buffer1
+ * must not be InvalidBuffer.  If both buffers are specified, buffer1 must
+ * be less than buffer2.
+ */
+static void
+GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
+                                        BlockNumber block1, BlockNumber block2,
+                                        Buffer *vmbuffer1, Buffer *vmbuffer2)
+{
+       bool    need_to_pin_buffer1;
+       bool    need_to_pin_buffer2;
+
+       Assert(BufferIsValid(buffer1));
+       Assert(buffer2 == InvalidBuffer || buffer1 <= buffer2);
+
+       while (1)
+       {
+               /* Figure out which pins we need but don't have. */
+               need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
+                       && !visibilitymap_pin_ok(block1, *vmbuffer1);
+               need_to_pin_buffer2 = buffer2 != InvalidBuffer
+                       && PageIsAllVisible(BufferGetPage(buffer2))
+                       && !visibilitymap_pin_ok(block2, *vmbuffer2);
+               if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
+                       return;
+
+               /* We must unlock both buffers before doing any I/O. */
+               LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
+               if (buffer2 != InvalidBuffer && buffer2 != buffer1)
+                       LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
+
+               /* Get pins. */
+               if (need_to_pin_buffer1)
+                       visibilitymap_pin(relation, block1, vmbuffer1);
+               if (need_to_pin_buffer2)
+                       visibilitymap_pin(relation, block2, vmbuffer2);
+
+               /* Relock buffers. */
+               LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE);
+               if (buffer2 != InvalidBuffer && buffer2 != buffer1)
+                       LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE);
+
+               /*
+                * If there are two buffers involved and we pinned just one of them,
+                * it's possible that the second one became all-visible while we were
+                * busy pinning the first one.  If it looks like that's a possible
+                * scenario, we'll need to make a second pass through this loop.
+                */
+               if (buffer2 == InvalidBuffer || buffer1 == buffer2
+                       || (need_to_pin_buffer1 && need_to_pin_buffer2))
+                       break;
+       }
+}
+
+/*
  * RelationGetBufferForTuple
  *
  *     Returns pinned and exclusive-locked buffer of a page in given relation
@@ -152,7 +210,7 @@ Buffer
 RelationGetBufferForTuple(Relation relation, Size len,
                                                  Buffer otherBuffer, int options,
                                                  struct BulkInsertStateData * bistate,
-                                                 Buffer *vmbuffer)
+                                                 Buffer *vmbuffer, Buffer *vmbuffer_other)
 {
        bool            use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
        Buffer          buffer = InvalidBuffer;
@@ -284,11 +342,17 @@ RelationGetBufferForTuple(Relation relation, Size len,
                }
 
                /*
-                * If the page is all visible but we don't have the right visibility
-                * map page pinned, then give up our locks, go get the pin, and
-                * re-lock.  This is pretty painful, but hopefully shouldn't happen
-                * often.  Note that there's a small possibility that we didn't pin
-                * the page above but still have the correct page pinned anyway, either
+                * We now have the target page (and the other buffer, if any) pinned
+                * and locked.  However, since our initial PageIsAllVisible checks
+                * were performed before acquiring the lock, the results might now
+                * be out of date, either for the selected victim buffer, or for the
+                * other buffer passed by the caller.  In that case, we'll need to give
+                * up our locks, go get the pin(s) we failed to get earlier, and
+                * re-lock.  That's pretty painful, but hopefully shouldn't happen
+                * often.
+                *
+                * Note that there's a small possibility that we didn't pin the
+                * page above but still have the correct page pinned anyway, either
                 * because we've already made a previous pass through this loop, or
                 * because caller passed us the right page anyway.
                 *
@@ -297,19 +361,14 @@ RelationGetBufferForTuple(Relation relation, Size len,
                 * cleared by some other backend anyway.  In that case, we'll have done
                 * a bit of extra work for no gain, but there's no real harm done.
                 */
-               if (PageIsAllVisible(BufferGetPage(buffer))
-                       && !visibilitymap_pin_ok(targetBlock, *vmbuffer))
-               {
-                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-                       if (otherBlock != targetBlock)
-                               LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
-                       visibilitymap_pin(relation, targetBlock, vmbuffer);
-                       if (otherBuffer != InvalidBuffer && otherBlock < targetBlock)
-                               LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
-                       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-                       if (otherBuffer != InvalidBuffer && otherBlock > targetBlock)
-                               LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
-               }
+               if (otherBuffer == InvalidBuffer || buffer <= otherBuffer)
+                       GetVisibilityMapPins(relation, buffer, otherBuffer,
+                                                                targetBlock, otherBlock, vmbuffer,
+                                                                vmbuffer_other);
+               else
+                       GetVisibilityMapPins(relation, otherBuffer, buffer,
+                                                                otherBlock, targetBlock, vmbuffer_other,
+                                                                vmbuffer);
 
                /*
                 * Now we can check to see if there's enough free space here. If so,
index 7ae8797..6eac97e 100644 (file)
@@ -39,6 +39,6 @@ extern void RelationPutHeapTuple(Relation relation, Buffer buffer,
 extern Buffer RelationGetBufferForTuple(Relation relation, Size len,
                                                  Buffer otherBuffer, int options,
                                                  struct BulkInsertStateData * bistate,
-                                                 Buffer *vmbuffer);
+                                                 Buffer *vmbuffer, Buffer *vmbuffer_other);
 
 #endif   /* HIO_H */