OSDN Git Service

Rework completion of incomplete inserts. Now it writes
authorTeodor Sigaev <teodor@sigaev.ru>
Fri, 19 May 2006 11:10:25 +0000 (11:10 +0000)
committerTeodor Sigaev <teodor@sigaev.ru>
Fri, 19 May 2006 11:10:25 +0000 (11:10 +0000)
WAL log during inserts.

src/backend/access/gist/gistvacuum.c
src/backend/access/gist/gistxlog.c

index 9b32304..a47d81d 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.21 2006/05/17 16:34:59 teodor Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.22 2006/05/19 11:10:25 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -104,19 +104,25 @@ gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) {
 
        if (!gv->index->rd_istemp)
        {
-               XLogRecData rdata;
+               XLogRecData rdata[2];
                XLogRecPtr      recptr;
                gistxlogPageDelete      xlrec;
 
                xlrec.node = gv->index->rd_node;
                xlrec.blkno = blkno;
 
-               rdata.buffer = InvalidBuffer;
-               rdata.data = (char *) &xlrec;
-               rdata.len = sizeof(gistxlogPageDelete);
-               rdata.next = NULL;
+               rdata[0].buffer = buffer;
+               rdata[0].buffer_std = true;
+               rdata[0].data = NULL;
+               rdata[0].len = 0;
+               rdata[0].next = &(rdata[1]);
 
-               recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, &rdata);
+               rdata[1].buffer = InvalidBuffer;
+               rdata[1].data = (char *) &xlrec;
+               rdata[1].len = sizeof(gistxlogPageDelete);
+               rdata[1].next = NULL;
+
+               recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, rdata);
                PageSetLSN(page, recptr);
                PageSetTLI(page, ThisTimeLineID);
        }
index 01dab11..1126727 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *                      $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.17 2006/05/17 16:34:59 teodor Exp $
+ *                      $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.18 2006/05/19 11:10:25 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -73,8 +73,18 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
                                         BlockNumber *blkno, int lenblk,
                                         PageSplitRecord *xlinfo /* to extract blkno info */ )
 {
-       MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
-       gistIncompleteInsert *ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
+       MemoryContext oldCxt;
+       gistIncompleteInsert *ninsert;
+
+       if ( !ItemPointerIsValid(&key) )
+               /* 
+                * if key is null then we should not store insertion as incomplete,
+                * because it's a vacuum operation..
+                */
+               return;
+
+       oldCxt = MemoryContextSwitchTo(insertCtx);
+       ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
 
        ninsert->node = node;
        ninsert->key = key;
@@ -115,6 +125,12 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
 {
        ListCell   *l;
 
+       if ( !ItemPointerIsValid(&key) )
+               return;
+
+       if (incomplete_inserts==NIL)
+               return;
+
        foreach(l, incomplete_inserts)
        {
                gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
@@ -180,16 +196,13 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
        Page            page;
 
        /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
-       if (ItemPointerIsValid(&(xldata->key)))
-       {
-               if (incomplete_inserts != NIL)
-                       forgetIncompleteInsert(xldata->node, xldata->key);
+       forgetIncompleteInsert(xldata->node, xldata->key);
 
-               if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
-                       pushIncompleteInsert(xldata->node, lsn, xldata->key,
-                                                                &(xldata->blkno), 1,
-                                                                NULL);
-       }
+       if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
+               /* operation with root always finalizes insertion */
+               pushIncompleteInsert(xldata->node, lsn, xldata->key,
+                                                        &(xldata->blkno), 1,
+                                                        NULL);
 
        /* nothing else to do if page was backed up (and no info to do it with) */
        if (record->xl_info & XLR_BKP_BLOCK_1)
@@ -252,12 +265,15 @@ gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
        Buffer          buffer;
        Page            page;
 
+       /* nothing else to do if page was backed up (and no info to do it with) */
+       if (record->xl_info & XLR_BKP_BLOCK_1)
+               return;
+
        reln = XLogOpenRelation(xldata->node);
        buffer = XLogReadBuffer(reln, xldata->blkno, false);
        if (!BufferIsValid(buffer))
                return;
 
-       GISTInitBuffer( buffer, 0 );
        page = (Page) BufferGetPage(buffer);
        GistPageSetDeleted(page);
 
@@ -333,15 +349,11 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
                UnlockReleaseBuffer(buffer);
        }
 
-       if (ItemPointerIsValid(&(xlrec.data->key)))
-       {
-               if (incomplete_inserts != NIL)
-                       forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
+       forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
 
-               pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
-                                                        NULL, 0,
-                                                        &xlrec);
-       }
+       pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
+                                                NULL, 0,
+                                                &xlrec);
 }
 
 static void
@@ -536,7 +548,43 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
                        insert->path[i++] = ptr->blkno;
        }
        else
-               elog(LOG, "lost parent for block %u", insert->origblkno);
+               elog(ERROR, "lost parent for block %u", insert->origblkno);
+}
+
+static SplitedPageLayout*
+gistMakePageLayout(Buffer *buffers, int nbuffers) {
+       SplitedPageLayout       *res=NULL, *resptr;
+
+       while( nbuffers-- > 0 ) {
+               Page page = BufferGetPage( buffers[ nbuffers ] );
+               IndexTuple      idxtup;
+               OffsetNumber    i;
+               char *ptr;
+
+               resptr = (SplitedPageLayout*)palloc0( sizeof(SplitedPageLayout) );
+
+               resptr->block.blkno = BufferGetBlockNumber( buffers[ nbuffers ] );
+               resptr->block.num = PageGetMaxOffsetNumber( page );
+
+               for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
+                       idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+                       resptr->lenlist += IndexTupleSize(idxtup);
+               }
+
+               resptr->list = (IndexTupleData*)palloc( resptr->lenlist );
+               ptr = (char*)(resptr->list);
+
+               for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
+                       idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+                       memcpy( ptr, idxtup, IndexTupleSize(idxtup) );
+                       ptr += IndexTupleSize(idxtup);
+               }
+
+               resptr->next = res;
+               res = resptr;
+       }
+
+       return res;
 }
 
 /*
@@ -548,11 +596,11 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
  * Note that we assume the index is now in a valid state, except for the
  * unfinished insertion.  In particular it's safe to invoke gistFindPath();
  * there shouldn't be any garbage pages for it to run into.
- *
- * Although stored LSN in gistIncompleteInsert is a LSN of child page,
- * we can compare it with LSN of parent, because parent is always locked
- * while we change child page (look at gistmakedeal). So if parent's LSN is
- * less than stored lsn then changes in parent aren't done yet.
+ * 
+ * To complete insert we can't use basic insertion algorithm because
+ * during insertion we can't call user-defined support functions of opclass.
+ * So, we insert 'invalid' tuples without real key and do it by separate algorithm.
+ * 'invalid' tuple should be updated by vacuum full.
  */
 static void
 gistContinueInsert(gistIncompleteInsert *insert)
@@ -574,39 +622,27 @@ gistContinueInsert(gistIncompleteInsert *insert)
        for (i = 0; i < insert->lenblk; i++)
                itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
 
+       /*
+        * any insertion of itup[] should make LOG message about 
+        */
+
        if (insert->origblkno == GIST_ROOT_BLKNO)
        {
                /*
                 * it was split root, so we should only make new root. it can't be
-                * simple insert into root, look at call pushIncompleteInsert in
-                * gistRedoPageSplitRecord
+                * simple insert into root, we should replace all content of root.
                 */
                Buffer          buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true);
-               Page            page;
-
-               Assert(BufferIsValid(buffer));
-               page = BufferGetPage(buffer);
 
-               GISTInitBuffer(buffer, 0);
-               gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
-
-               PageSetLSN(page, insert->lsn);
-               PageSetTLI(page, ThisTimeLineID);
-
-               MarkBufferDirty(buffer);
+               gistnewroot(index, buffer, itup, lenitup, NULL);
                UnlockReleaseBuffer(buffer);
-
-               /*
-                * XXX fall out to avoid making LOG message at bottom of routine.
-                * I think the logic for when to emit that message is all wrong...
-                */
-               return;
        }
        else
        {
                Buffer     *buffers;
                Page       *pages;
                int                     numbuffer;
+               OffsetNumber    *todelete;
 
                /* construct path */
                gistxlogFindPath(index, insert);
@@ -615,49 +651,60 @@ gistContinueInsert(gistIncompleteInsert *insert)
 
                buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
                pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
+               todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
 
                for (i = 0; i < insert->pathlen; i++)
                {
                        int                     j,
                                                k,
-                                               pituplen = 0,
-                                               childfound = 0;
+                                               pituplen = 0;
+                       XLogRecData             *rdata;
+                       XLogRecPtr              recptr;
+                       Buffer  tempbuffer = InvalidBuffer;
+                       int     ntodelete = 0;
 
                        numbuffer = 1;
-                       buffers[numbuffer - 1] = ReadBuffer(index, insert->path[i]);
-                       LockBuffer(buffers[numbuffer - 1], GIST_EXCLUSIVE);
-                       pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]);
+                       buffers[0] = ReadBuffer(index, insert->path[i]);
+                       LockBuffer(buffers[0], GIST_EXCLUSIVE);
+                       /*
+                        * we check buffer, because we restored page earlier
+                        */
+                       gistcheckpage(index, buffers[0]);
 
-                       if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1])))
-                       {
-                               UnlockReleaseBuffer(buffers[numbuffer - 1]);
-                               return;
-                       }
+                       pages[0] = BufferGetPage(buffers[0]);
+                       Assert( !GistPageIsLeaf(pages[0]) );
 
-                       pituplen = PageGetMaxOffsetNumber(pages[numbuffer - 1]);
+                       pituplen = PageGetMaxOffsetNumber(pages[0]);
 
-                       /* remove old IndexTuples */
-                       for (j = 0; j < pituplen && childfound < lenitup; j++)
+                       /* find remove old IndexTuples to remove */
+                       for (j = 0; j < pituplen && ntodelete < lenitup; j++)
                        {
                                BlockNumber blkno;
-                               ItemId          iid = PageGetItemId(pages[numbuffer - 1], j + FirstOffsetNumber);
-                               IndexTuple      idxtup = (IndexTuple) PageGetItem(pages[numbuffer - 1], iid);
+                               ItemId          iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
+                               IndexTuple      idxtup = (IndexTuple) PageGetItem(pages[0], iid);
 
                                blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
 
                                for (k = 0; k < lenitup; k++)
                                        if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
                                        {
-                                               PageIndexTupleDelete(pages[numbuffer - 1], j + FirstOffsetNumber);
-                                               j--;
-                                               pituplen--;
-                                               childfound++;
+                                               todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
+                                               ntodelete++;
                                                break;
                                        }
                        }
 
-                       if (gistnospace(pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber))
+                       if ( ntodelete == 0 ) 
+                               elog(PANIC,"gistContinueInsert: can't find pointer to page(s)");
+
+                       /*
+                        * we check space with subtraction only first tuple to delete, hope,
+                        * that wiil be enough space....
+                        */
+
+                       if (gistnospace(pages[0], itup, lenitup, *todelete))
                        {
+
                                /* no space left on page, so we must split */
                                buffers[numbuffer] = ReadBuffer(index, P_NEW);
                                LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
@@ -668,62 +715,86 @@ gistContinueInsert(gistIncompleteInsert *insert)
 
                                if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
                                {
-                                       IndexTuple *parentitup;
+                                       Buffer tmp;
 
                                        /*
-                                        * we split root, just copy tuples from old root to new
-                                        * page
+                                        * we split root, just copy content from root to new page
                                         */
-                                       parentitup = gistextractpage(pages[numbuffer - 1],
-                                                                                                  &pituplen);
 
                                        /* sanity check */
                                        if (i + 1 != insert->pathlen)
                                                elog(PANIC, "unexpected pathlen in index \"%s\"",
                                                         RelationGetRelationName(index));
 
-                                       /* fill new page */
-                                       buffers[numbuffer] = ReadBuffer(index, P_NEW);
-                                       LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
-                                       GISTInitBuffer(buffers[numbuffer], 0);
-                                       pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
-                                       gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
-                                       numbuffer++;
-
-                                       /* fill root page */
-                                       GISTInitBuffer(buffers[0], 0);
-                                       for (j = 1; j < numbuffer; j++)
-                                       {
-                                               IndexTuple      tuple = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
-
-                                               if (PageAddItem(pages[0],
-                                                                               (Item) tuple,
-                                                                               IndexTupleSize(tuple),
-                                                                               (OffsetNumber) j,
-                                                                               LP_USED) == InvalidOffsetNumber)
-                                                       elog(PANIC, "failed to add item to index page in \"%s\"",
-                                                                RelationGetRelationName(index));
-                                       }
+                                       /* fill new page, root will be changed later */
+                                       tempbuffer = ReadBuffer(index, P_NEW);
+                                       LockBuffer(tempbuffer, GIST_EXCLUSIVE);
+                                       memcpy( BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer) );
+
+                                       /* swap buffers[0] (was root) and temp buffer */
+                                       tmp = buffers[0];
+                                       buffers[0] = tempbuffer;
+                                       tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO, it is still unchanged */
+
+                                       pages[0] = BufferGetPage(buffers[0]);
                                }
+
+                               START_CRIT_SECTION();
+
+                               for(j=0;j<ntodelete;j++)
+                                       PageIndexTupleDelete(pages[0], todelete[j]);
+
+                               rdata = formSplitRdata(index->rd_node, insert->path[i],
+                                                                               false, &(insert->key), 
+                                                                               gistMakePageLayout( buffers, numbuffer ) );
+
+                       } else {
+                               START_CRIT_SECTION();
+
+                               for(j=0;j<ntodelete;j++)
+                                       PageIndexTupleDelete(pages[0], todelete[j]);
+                               gistfillbuffer(index, pages[0], itup, lenitup, InvalidOffsetNumber);
+
+                               rdata = formUpdateRdata(index->rd_node, buffers[0], 
+                                                       todelete, ntodelete,
+                                                       itup, lenitup, &(insert->key)); 
                        }
-                       else
-                               gistfillbuffer(index, pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber);
 
-                       lenitup = numbuffer;
+                       /* 
+                        * use insert->key as mark for completion of insert (form*Rdata() above)
+                        * for following possible replays
+                        */
+
+                       /* write pages with XLOG LSN */
+                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
                        for (j = 0; j < numbuffer; j++)
                        {
-                               itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
-                               PageSetLSN(pages[j], insert->lsn);
+                               PageSetLSN(pages[j], recptr);
                                PageSetTLI(pages[j], ThisTimeLineID);
                                GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
                                MarkBufferDirty(buffers[j]);
+                       }
+
+                       END_CRIT_SECTION();
+
+                       lenitup = numbuffer;
+                       for (j = 0; j < numbuffer; j++) {
+                               itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
                                UnlockReleaseBuffer(buffers[j]);
                        }
+
+                       if ( tempbuffer != InvalidBuffer ) {
+                               /*
+                                * it was a root split, so fill it by new values
+                                */
+                               gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
+                               UnlockReleaseBuffer(tempbuffer);
+                       }
                }
        }
 
        ereport(LOG,
-       (errmsg("index %u/%u/%u needs VACUUM or REINDEX to finish crash recovery",
+       (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
                        insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
         errdetail("Incomplete insertion detected during crash replay.")));
 }
@@ -747,6 +818,7 @@ gist_xlog_cleanup(void)
        MemoryContext oldCxt;
 
        oldCxt = MemoryContextSwitchTo(opCtx);
+
        foreach(l, incomplete_inserts)
        {
                gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);