1 /*-------------------------------------------------------------------------
4 * delete & vacuum routines for the postgres GIN
7 * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/gin/ginvacuum.c
12 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/gin.h"
19 #include "catalog/storage.h"
20 #include "commands/vacuum.h"
21 #include "miscadmin.h"
22 #include "postmaster/autovacuum.h"
23 #include "storage/bufmgr.h"
24 #include "storage/indexfsm.h"
25 #include "storage/lmgr.h"
30 IndexBulkDeleteResult *result;
31 IndexBulkDeleteCallback callback;
34 BufferAccessStrategy strategy;
39 * Cleans array of ItemPointer (removes dead pointers)
40 * Results are always stored in *cleaned, which will be allocated
41 * if it's needed. In case of *cleaned!=NULL caller is responsible to
42 * have allocated enough space. *cleaned and items may point to the same
47 ginVacuumPostingList(GinVacuumState *gvs, ItemPointerData *items, uint32 nitem, ItemPointerData **cleaned)
53 * just scan over ItemPointer array
56 for (i = 0; i < nitem; i++)
58 if (gvs->callback(items + i, gvs->callback_state))
60 gvs->result->tuples_removed += 1;
63 *cleaned = (ItemPointerData *) palloc(sizeof(ItemPointerData) * nitem);
65 memcpy(*cleaned, items, sizeof(ItemPointerData) * i);
70 gvs->result->num_index_tuples += 1;
72 (*cleaned)[j] = items[i];
81 * fills WAL record for vacuum leaf page
84 xlogVacuumPage(Relation index, Buffer buffer)
86 Page page = BufferGetPage(buffer);
89 ginxlogVacuumPage data;
94 Assert(GinPageIsLeaf(page));
96 if (!RelationNeedsWAL(index))
99 data.node = index->rd_node;
100 data.blkno = BufferGetBlockNumber(buffer);
102 if (GinPageIsData(page))
104 backup = GinDataPageGetData(page);
105 data.nitem = GinPageGetOpaque(page)->maxoff;
107 len = MAXALIGN(sizeof(ItemPointerData) * data.nitem);
114 ptr = backup = itups;
115 for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
117 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
119 memcpy(ptr, itup, IndexTupleSize(itup));
120 ptr += MAXALIGN(IndexTupleSize(itup));
123 data.nitem = PageGetMaxOffsetNumber(page);
127 rdata[0].buffer = buffer;
128 rdata[0].buffer_std = (GinPageIsData(page)) ? FALSE : TRUE;
130 rdata[0].data = NULL;
131 rdata[0].next = rdata + 1;
133 rdata[1].buffer = InvalidBuffer;
134 rdata[1].len = sizeof(ginxlogVacuumPage);
135 rdata[1].data = (char *) &data;
139 rdata[1].next = NULL;
143 rdata[1].next = rdata + 2;
145 rdata[2].buffer = InvalidBuffer;
147 rdata[2].data = backup;
148 rdata[2].next = NULL;
151 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_PAGE, rdata);
152 PageSetLSN(page, recptr);
153 PageSetTLI(page, ThisTimeLineID);
157 ginVacuumPostingTreeLeaves(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, Buffer *rootBuffer)
161 bool hasVoidPage = FALSE;
163 buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, blkno,
164 RBM_NORMAL, gvs->strategy);
165 page = BufferGetPage(buffer);
168 * We should be sure that we don't concurrent with inserts, insert process
169 * never release root page until end (but it can unlock it and lock
170 * again). New scan can't start but previously started ones work
175 LockBufferForCleanup(buffer);
177 LockBuffer(buffer, GIN_EXCLUSIVE);
179 Assert(GinPageIsData(page));
181 if (GinPageIsLeaf(page))
183 OffsetNumber newMaxOff,
184 oldMaxOff = GinPageGetOpaque(page)->maxoff;
185 ItemPointerData *cleaned = NULL;
187 newMaxOff = ginVacuumPostingList(gvs,
188 (ItemPointer) GinDataPageGetData(page), oldMaxOff, &cleaned);
190 /* saves changes about deleted tuple ... */
191 if (oldMaxOff != newMaxOff)
194 START_CRIT_SECTION();
197 memcpy(GinDataPageGetData(page), cleaned, sizeof(ItemPointerData) * newMaxOff);
199 GinPageGetOpaque(page)->maxoff = newMaxOff;
201 MarkBufferDirty(buffer);
202 xlogVacuumPage(gvs->index, buffer);
206 /* if root is a leaf page, we don't desire further processing */
207 if (!isRoot && GinPageGetOpaque(page)->maxoff < FirstOffsetNumber)
214 bool isChildHasVoid = FALSE;
216 for (i = FirstOffsetNumber; i <= GinPageGetOpaque(page)->maxoff; i++)
218 PostingItem *pitem = (PostingItem *) GinDataPageGetItem(page, i);
220 if (ginVacuumPostingTreeLeaves(gvs, PostingItemGetBlockNumber(pitem), FALSE, NULL))
221 isChildHasVoid = TRUE;
229 * if we have root and theres void pages in tree, then we don't release
230 * lock to go further processing and guarantee that tree is unused
232 if (!(isRoot && hasVoidPage))
234 UnlockReleaseBuffer(buffer);
239 *rootBuffer = buffer;
246 ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkno,
247 BlockNumber parentBlkno, OffsetNumber myoff, bool isParentRoot)
255 dBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, deleteBlkno,
256 RBM_NORMAL, gvs->strategy);
258 if (leftBlkno != InvalidBlockNumber)
259 lBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, leftBlkno,
260 RBM_NORMAL, gvs->strategy);
262 lBuffer = InvalidBuffer;
264 pBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, parentBlkno,
265 RBM_NORMAL, gvs->strategy);
267 LockBuffer(dBuffer, GIN_EXCLUSIVE);
268 if (!isParentRoot) /* parent is already locked by
269 * LockBufferForCleanup() */
270 LockBuffer(pBuffer, GIN_EXCLUSIVE);
271 if (leftBlkno != InvalidBlockNumber)
272 LockBuffer(lBuffer, GIN_EXCLUSIVE);
274 START_CRIT_SECTION();
276 if (leftBlkno != InvalidBlockNumber)
278 BlockNumber rightlink;
280 page = BufferGetPage(dBuffer);
281 rightlink = GinPageGetOpaque(page)->rightlink;
283 page = BufferGetPage(lBuffer);
284 GinPageGetOpaque(page)->rightlink = rightlink;
287 parentPage = BufferGetPage(pBuffer);
288 #ifdef USE_ASSERT_CHECKING
291 PostingItem *tod = (PostingItem *) GinDataPageGetItem(parentPage, myoff);
293 Assert(PostingItemGetBlockNumber(tod) == deleteBlkno);
296 GinPageDeletePostingItem(parentPage, myoff);
298 page = BufferGetPage(dBuffer);
301 * we shouldn't change rightlink field to save workability of running
304 GinPageGetOpaque(page)->flags = GIN_DELETED;
306 MarkBufferDirty(pBuffer);
307 if (leftBlkno != InvalidBlockNumber)
308 MarkBufferDirty(lBuffer);
309 MarkBufferDirty(dBuffer);
311 if (RelationNeedsWAL(gvs->index))
314 XLogRecData rdata[4];
315 ginxlogDeletePage data;
318 data.node = gvs->index->rd_node;
319 data.blkno = deleteBlkno;
320 data.parentBlkno = parentBlkno;
321 data.parentOffset = myoff;
322 data.leftBlkno = leftBlkno;
323 data.rightLink = GinPageGetOpaque(page)->rightlink;
325 rdata[0].buffer = dBuffer;
326 rdata[0].buffer_std = FALSE;
327 rdata[0].data = NULL;
329 rdata[0].next = rdata + 1;
331 rdata[1].buffer = pBuffer;
332 rdata[1].buffer_std = FALSE;
333 rdata[1].data = NULL;
335 rdata[1].next = rdata + 2;
337 if (leftBlkno != InvalidBlockNumber)
339 rdata[2].buffer = lBuffer;
340 rdata[2].buffer_std = FALSE;
341 rdata[2].data = NULL;
343 rdata[2].next = rdata + 3;
349 rdata[n].buffer = InvalidBuffer;
350 rdata[n].buffer_std = FALSE;
351 rdata[n].len = sizeof(ginxlogDeletePage);
352 rdata[n].data = (char *) &data;
353 rdata[n].next = NULL;
355 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_PAGE, rdata);
356 PageSetLSN(page, recptr);
357 PageSetTLI(page, ThisTimeLineID);
358 PageSetLSN(parentPage, recptr);
359 PageSetTLI(parentPage, ThisTimeLineID);
360 if (leftBlkno != InvalidBlockNumber)
362 page = BufferGetPage(lBuffer);
363 PageSetLSN(page, recptr);
364 PageSetTLI(page, ThisTimeLineID);
369 LockBuffer(pBuffer, GIN_UNLOCK);
370 ReleaseBuffer(pBuffer);
372 if (leftBlkno != InvalidBlockNumber)
373 UnlockReleaseBuffer(lBuffer);
375 UnlockReleaseBuffer(dBuffer);
379 gvs->result->pages_deleted++;
382 typedef struct DataPageDeleteStack
384 struct DataPageDeleteStack *child;
385 struct DataPageDeleteStack *parent;
387 BlockNumber blkno; /* current block number */
388 BlockNumber leftBlkno; /* rightest non-deleted page on left */
390 } DataPageDeleteStack;
393 * scans posting tree and deletes empty pages
396 ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, DataPageDeleteStack *parent, OffsetNumber myoff)
398 DataPageDeleteStack *me;
401 bool meDelete = FALSE;
411 me = (DataPageDeleteStack *) palloc0(sizeof(DataPageDeleteStack));
414 me->leftBlkno = InvalidBlockNumber;
420 buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, blkno,
421 RBM_NORMAL, gvs->strategy);
422 page = BufferGetPage(buffer);
424 Assert(GinPageIsData(page));
426 if (!GinPageIsLeaf(page))
431 for (i = FirstOffsetNumber; i <= GinPageGetOpaque(page)->maxoff; i++)
433 PostingItem *pitem = (PostingItem *) GinDataPageGetItem(page, i);
435 if (ginScanToDelete(gvs, PostingItemGetBlockNumber(pitem), FALSE, me, i))
440 if (GinPageGetOpaque(page)->maxoff < FirstOffsetNumber)
442 if (!(me->leftBlkno == InvalidBlockNumber && GinPageRightMost(page)))
444 /* we never delete right most branch */
446 if (GinPageGetOpaque(page)->maxoff < FirstOffsetNumber)
448 ginDeletePage(gvs, blkno, me->leftBlkno, me->parent->blkno, myoff, me->parent->isRoot);
454 ReleaseBuffer(buffer);
457 me->leftBlkno = blkno;
463 ginVacuumPostingTree(GinVacuumState *gvs, BlockNumber rootBlkno)
465 Buffer rootBuffer = InvalidBuffer;
466 DataPageDeleteStack root,
470 if (ginVacuumPostingTreeLeaves(gvs, rootBlkno, TRUE, &rootBuffer) == FALSE)
472 Assert(rootBuffer == InvalidBuffer);
476 memset(&root, 0, sizeof(DataPageDeleteStack));
477 root.leftBlkno = InvalidBlockNumber;
480 vacuum_delay_point();
482 ginScanToDelete(gvs, rootBlkno, TRUE, &root, InvalidOffsetNumber);
492 UnlockReleaseBuffer(rootBuffer);
496 * returns modified page or NULL if page isn't modified.
497 * Function works with original page until first change is occurred,
498 * then page is copied into temporary one.
501 ginVacuumEntryPage(GinVacuumState *gvs, Buffer buffer, BlockNumber *roots, uint32 *nroot)
503 Page origpage = BufferGetPage(buffer),
506 maxoff = PageGetMaxOffsetNumber(origpage);
512 for (i = FirstOffsetNumber; i <= maxoff; i++)
514 IndexTuple itup = (IndexTuple) PageGetItem(tmppage, PageGetItemId(tmppage, i));
516 if (GinIsPostingTree(itup))
519 * store posting tree's roots for further processing, we can't
520 * vacuum it just now due to risk of deadlocks with scans/inserts
522 roots[*nroot] = GinItemPointerGetBlockNumber(&itup->t_tid);
525 else if (GinGetNPosting(itup) > 0)
528 * if we already create temporary page, we will make changes in
531 ItemPointerData *cleaned = (tmppage == origpage) ? NULL : GinGetPosting(itup);
532 uint32 newN = ginVacuumPostingList(gvs, GinGetPosting(itup), GinGetNPosting(itup), &cleaned);
534 if (GinGetNPosting(itup) != newN)
540 * Some ItemPointers was deleted, so we should remake our
544 if (tmppage == origpage)
547 * On first difference we create temporary page in memory
548 * and copies content in to it.
550 tmppage = PageGetTempPageCopy(origpage);
554 Size pos = ((char *) GinGetPosting(itup)) - ((char *) origpage);
556 memcpy(tmppage + pos, cleaned, sizeof(ItemPointerData) * newN);
561 /* set itup pointer to new page */
562 itup = (IndexTuple) PageGetItem(tmppage, PageGetItemId(tmppage, i));
565 value = gin_index_getattr(&gvs->ginstate, itup);
566 attnum = gintuple_get_attrnum(&gvs->ginstate, itup);
567 itup = GinFormTuple(gvs->index, &gvs->ginstate, attnum, value,
568 GinGetPosting(itup), newN, true);
569 PageIndexTupleDelete(tmppage, i);
571 if (PageAddItem(tmppage, (Item) itup, IndexTupleSize(itup), i, false, false) != i)
572 elog(ERROR, "failed to add item to index page in \"%s\"",
573 RelationGetRelationName(gvs->index));
580 return (tmppage == origpage) ? NULL : tmppage;
584 ginbulkdelete(PG_FUNCTION_ARGS)
586 IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
587 IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
588 IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
589 void *callback_state = (void *) PG_GETARG_POINTER(3);
590 Relation index = info->index;
591 BlockNumber blkno = GIN_ROOT_BLKNO;
594 BlockNumber rootOfPostingTree[BLCKSZ / (sizeof(IndexTupleData) + sizeof(ItemId))];
598 gvs.callback = callback;
599 gvs.callback_state = callback_state;
600 gvs.strategy = info->strategy;
601 initGinState(&gvs.ginstate, index);
603 /* first time through? */
606 /* Yes, so initialize stats to zeroes */
607 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
608 /* and cleanup any pending inserts */
609 ginInsertCleanup(index, &gvs.ginstate, true, stats);
612 /* we'll re-count the tuples each time */
613 stats->num_index_tuples = 0;
616 buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
617 RBM_NORMAL, info->strategy);
622 Page page = BufferGetPage(buffer);
625 LockBuffer(buffer, GIN_SHARE);
627 Assert(!GinPageIsData(page));
629 if (GinPageIsLeaf(page))
631 LockBuffer(buffer, GIN_UNLOCK);
632 LockBuffer(buffer, GIN_EXCLUSIVE);
634 if (blkno == GIN_ROOT_BLKNO && !GinPageIsLeaf(page))
636 LockBuffer(buffer, GIN_UNLOCK);
637 continue; /* check it one more */
642 Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
644 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
645 blkno = GinItemPointerGetBlockNumber(&(itup)->t_tid);
646 Assert(blkno != InvalidBlockNumber);
648 UnlockReleaseBuffer(buffer);
649 buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
650 RBM_NORMAL, info->strategy);
653 /* right now we found leftmost page in entry's BTree */
657 Page page = BufferGetPage(buffer);
661 Assert(!GinPageIsData(page));
663 resPage = ginVacuumEntryPage(&gvs, buffer, rootOfPostingTree, &nRoot);
665 blkno = GinPageGetOpaque(page)->rightlink;
669 START_CRIT_SECTION();
670 PageRestoreTempPage(resPage, page);
671 MarkBufferDirty(buffer);
672 xlogVacuumPage(gvs.index, buffer);
673 UnlockReleaseBuffer(buffer);
678 UnlockReleaseBuffer(buffer);
681 vacuum_delay_point();
683 for (i = 0; i < nRoot; i++)
685 ginVacuumPostingTree(&gvs, rootOfPostingTree[i]);
686 vacuum_delay_point();
689 if (blkno == InvalidBlockNumber) /* rightmost page */
692 buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
693 RBM_NORMAL, info->strategy);
694 LockBuffer(buffer, GIN_EXCLUSIVE);
697 PG_RETURN_POINTER(gvs.result);
701 ginvacuumcleanup(PG_FUNCTION_ARGS)
703 IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
704 IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
705 Relation index = info->index;
709 BlockNumber totFreePages;
711 GinStatsData idxStat;
714 * In an autovacuum analyze, we want to clean up pending insertions.
715 * Otherwise, an ANALYZE-only call is a no-op.
717 if (info->analyze_only)
719 if (IsAutoVacuumWorkerProcess())
721 initGinState(&ginstate, index);
722 ginInsertCleanup(index, &ginstate, true, stats);
724 PG_RETURN_POINTER(stats);
728 * Set up all-zero stats and cleanup pending inserts if ginbulkdelete
733 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
734 initGinState(&ginstate, index);
735 ginInsertCleanup(index, &ginstate, true, stats);
738 memset(&idxStat, 0, sizeof(idxStat));
741 * XXX we always report the heap tuple count as the number of index
742 * entries. This is bogus if the index is partial, but it's real hard to
743 * tell how many distinct heap entries are referenced by a GIN index.
745 stats->num_index_tuples = info->num_heap_tuples;
746 stats->estimated_count = info->estimated_count;
749 * Need lock unless it's local to this backend.
751 needLock = !RELATION_IS_LOCAL(index);
754 LockRelationForExtension(index, ExclusiveLock);
755 npages = RelationGetNumberOfBlocks(index);
757 UnlockRelationForExtension(index, ExclusiveLock);
761 for (blkno = GIN_ROOT_BLKNO; blkno < npages; blkno++)
766 vacuum_delay_point();
768 buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
769 RBM_NORMAL, info->strategy);
770 LockBuffer(buffer, GIN_SHARE);
771 page = (Page) BufferGetPage(buffer);
773 if (GinPageIsDeleted(page))
775 Assert(blkno != GIN_ROOT_BLKNO);
776 RecordFreeIndexPage(index, blkno);
779 else if (GinPageIsData(page))
781 idxStat.nDataPages++;
783 else if (!GinPageIsList(page))
785 idxStat.nEntryPages++;
787 if ( GinPageIsLeaf(page) )
788 idxStat.nEntries += PageGetMaxOffsetNumber(page);
791 UnlockReleaseBuffer(buffer);
794 /* Update the metapage with accurate page and entry counts */
795 idxStat.nTotalPages = npages;
796 ginUpdateStats(info->index, &idxStat);
798 /* Finally, vacuum the FSM */
799 IndexFreeSpaceMapVacuum(info->index);
801 stats->pages_free = totFreePages;
804 LockRelationForExtension(index, ExclusiveLock);
805 stats->num_pages = RelationGetNumberOfBlocks(index);
807 UnlockRelationForExtension(index, ExclusiveLock);
809 PG_RETURN_POINTER(stats);