OSDN Git Service

Fix two distinct errors in creation of GIN_INSERT_LISTPAGE xlog records.
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 15 Sep 2009 20:31:30 +0000 (20:31 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 15 Sep 2009 20:31:30 +0000 (20:31 +0000)
In practice these mistakes were always masked when full_page_writes was on,
because XLogInsert would always choose to log the full page, and then
ginRedoInsertListPage wouldn't try to do anything.  But with full_page_writes
off a WAL replay failure was certain.

The GIN_INSERT_LISTPAGE record type could probably be eliminated entirely
in favor of using XLOG_HEAP_NEWPAGE, but I refrained from doing that now
since it would have required a significantly more invasive patch.

In passing do a little bit of code cleanup, including making the accounting
for free space on GIN list pages more precise.  (This wasn't a bug as the
errors were always in the conservative direction.)

Per report from Simon.  Back-patch to 8.4 which contains the identical code.

src/backend/access/gin/ginfast.c

index 20887ba..9d2351a 100644 (file)
@@ -11,7 +11,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *                     $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.3 2009/06/11 14:48:53 momjian Exp $
+ *                     $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.4 2009/09/15 20:31:30 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -41,13 +41,15 @@ typedef struct DatumArray
 
 /*
  * Build a pending-list page from the given array of tuples, and write it out.
+ *
+ * Returns amount of free space left on the page.
  */
 static int32
 writeListPage(Relation index, Buffer buffer,
                          IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
 {
        Page            page = BufferGetPage(buffer);
-       int                     i,
+       int32           i,
                                freesize,
                                size = 0;
        OffsetNumber l,
@@ -100,8 +102,6 @@ writeListPage(Relation index, Buffer buffer,
                GinPageGetOpaque(page)->maxoff = 0;
        }
 
-       freesize = PageGetFreeSpace(page);
-
        MarkBufferDirty(buffer);
 
        if (!index->rd_istemp)
@@ -110,26 +110,30 @@ writeListPage(Relation index, Buffer buffer,
                ginxlogInsertListPage data;
                XLogRecPtr      recptr;
 
-               rdata[0].buffer = buffer;
-               rdata[0].buffer_std = true;
+               data.node = index->rd_node;
+               data.blkno = BufferGetBlockNumber(buffer);
+               data.rightlink = rightlink;
+               data.ntuples = ntuples;
+
+               rdata[0].buffer = InvalidBuffer;
                rdata[0].data = (char *) &data;
                rdata[0].len = sizeof(ginxlogInsertListPage);
                rdata[0].next = rdata + 1;
 
-               rdata[1].buffer = InvalidBuffer;
+               rdata[1].buffer = buffer;
+               rdata[1].buffer_std = true;
                rdata[1].data = workspace;
                rdata[1].len = size;
                rdata[1].next = NULL;
 
-               data.blkno = BufferGetBlockNumber(buffer);
-               data.rightlink = rightlink;
-               data.ntuples = ntuples;
-
                recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE, rdata);
                PageSetLSN(page, recptr);
                PageSetTLI(page, ThisTimeLineID);
        }
 
+       /* get free space before releasing buffer */
+       freesize = PageGetExactFreeSpace(page);
+
        UnlockReleaseBuffer(buffer);
 
        END_CRIT_SECTION();
@@ -165,7 +169,8 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
                        {
                                res->nPendingPages++;
                                writeListPage(index, prevBuffer,
-                                                         tuples + startTuple, i - startTuple,
+                                                         tuples + startTuple,
+                                                         i - startTuple,
                                                          BufferGetBlockNumber(curBuffer));
                        }
                        else
@@ -180,7 +185,7 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
 
                tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
 
-               if (size + tupsize >= GinListPageSize)
+               if (size + tupsize > GinListPageSize)
                {
                        /* won't fit, force a new page and reprocess */
                        i--;
@@ -197,7 +202,8 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
         */
        res->tail = BufferGetBlockNumber(curBuffer);
        res->tailFreeSize = writeListPage(index, curBuffer,
-                                                                  tuples + startTuple, ntuples - startTuple,
+                                                                         tuples + startTuple,
+                                                                         ntuples - startTuple,
                                                                          InvalidBlockNumber);
        res->nPendingPages++;
        /* that was only one heap tuple */
@@ -237,7 +243,7 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
        metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
        metapage = BufferGetPage(metabuffer);
 
-       if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GIN_PAGE_FREESIZE)
+       if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
        {
                /*
                 * Total size is greater than one page => make sublist
@@ -265,13 +271,12 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
 
        if (separateList)
        {
-               GinMetaPageData sublist;
-
                /*
                 * We should make sublist separately and append it to the tail
                 */
-               memset(&sublist, 0, sizeof(GinMetaPageData));
+               GinMetaPageData sublist;
 
+               memset(&sublist, 0, sizeof(GinMetaPageData));
                makeSublist(index, collector->tuples, collector->ntuples, &sublist);
 
                /*
@@ -283,45 +288,44 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
                if (metadata->head == InvalidBlockNumber)
                {
                        /*
-                        * Sublist becomes main list
+                        * Main list is empty, so just copy sublist into main list
                         */
                        START_CRIT_SECTION();
+
                        memcpy(metadata, &sublist, sizeof(GinMetaPageData));
-                       memcpy(&data.metadata, &sublist, sizeof(GinMetaPageData));
                }
                else
                {
                        /*
-                        * merge lists
+                        * Merge lists
                         */
-
                        data.prevTail = metadata->tail;
+                       data.newRightlink = sublist.head;
+
                        buffer = ReadBuffer(index, metadata->tail);
                        LockBuffer(buffer, GIN_EXCLUSIVE);
                        page = BufferGetPage(buffer);
+
                        Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
 
                        START_CRIT_SECTION();
 
                        GinPageGetOpaque(page)->rightlink = sublist.head;
+
+                       MarkBufferDirty(buffer);
+
                        metadata->tail = sublist.tail;
                        metadata->tailFreeSize = sublist.tailFreeSize;
 
                        metadata->nPendingPages += sublist.nPendingPages;
                        metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
-
-                       memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
-                       data.newRightlink = sublist.head;
-
-                       MarkBufferDirty(buffer);
                }
        }
        else
        {
                /*
-                * Insert into tail page, metapage is already locked
+                * Insert into tail page.  Metapage is already locked
                 */
-
                OffsetNumber l,
                                        off;
                int                     i,
@@ -331,6 +335,7 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
                buffer = ReadBuffer(index, metadata->tail);
                LockBuffer(buffer, GIN_EXCLUSIVE);
                page = BufferGetPage(buffer);
+
                off = (PageIsEmpty(page)) ? FirstOffsetNumber :
                        OffsetNumberNext(PageGetMaxOffsetNumber(page));
 
@@ -368,20 +373,24 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
                        off++;
                }
 
-               metadata->tailFreeSize -= collector->sumsize + collector->ntuples * sizeof(ItemIdData);
-               memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
+               Assert((ptr - rdata[1].data) <= collector->sumsize);
+
+               metadata->tailFreeSize = PageGetExactFreeSpace(page);
+
                MarkBufferDirty(buffer);
        }
 
        /*
-        * Make real write
+        * Write metabuffer, make xlog entry
         */
-
        MarkBufferDirty(metabuffer);
+
        if (!index->rd_istemp)
        {
                XLogRecPtr      recptr;
 
+               memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
+
                recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, rdata);
                PageSetLSN(metapage, recptr);
                PageSetTLI(metapage, ThisTimeLineID);
@@ -552,7 +561,6 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
                        metadata->nPendingPages = 0;
                        metadata->nPendingHeapTuples = 0;
                }
-               memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
 
                MarkBufferDirty(metabuffer);
 
@@ -567,6 +575,8 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
                {
                        XLogRecPtr      recptr;
 
+                       memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
+
                        recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE, rdata);
                        PageSetLSN(metapage, recptr);
                        PageSetTLI(metapage, ThisTimeLineID);