OSDN Git Service

Generalize concept of temporary relations to "relation persistence".
[pg-rex/syncrep.git] / src / backend / access / gin / ginvacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * ginvacuum.c
4  *        delete & vacuum routines for the postgres GIN
5  *
6  *
7  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/gin/ginvacuum.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/gin.h"
19 #include "catalog/storage.h"
20 #include "commands/vacuum.h"
21 #include "miscadmin.h"
22 #include "postmaster/autovacuum.h"
23 #include "storage/bufmgr.h"
24 #include "storage/indexfsm.h"
25 #include "storage/lmgr.h"
26
27 typedef struct
28 {
29         Relation        index;
30         IndexBulkDeleteResult *result;
31         IndexBulkDeleteCallback callback;
32         void       *callback_state;
33         GinState        ginstate;
34         BufferAccessStrategy strategy;
35 } GinVacuumState;
36
37
38 /*
39  * Cleans array of ItemPointer (removes dead pointers)
40  * Results are always stored in *cleaned, which will be allocated
41  * if it's needed. In case of *cleaned!=NULL caller is responsible to
42  * have allocated enough space. *cleaned and items may point to the same
43  * memory address.
44  */
45
46 static uint32
47 ginVacuumPostingList(GinVacuumState *gvs, ItemPointerData *items, uint32 nitem, ItemPointerData **cleaned)
48 {
49         uint32          i,
50                                 j = 0;
51
52         /*
53          * just scan over ItemPointer array
54          */
55
56         for (i = 0; i < nitem; i++)
57         {
58                 if (gvs->callback(items + i, gvs->callback_state))
59                 {
60                         gvs->result->tuples_removed += 1;
61                         if (!*cleaned)
62                         {
63                                 *cleaned = (ItemPointerData *) palloc(sizeof(ItemPointerData) * nitem);
64                                 if (i != 0)
65                                         memcpy(*cleaned, items, sizeof(ItemPointerData) * i);
66                         }
67                 }
68                 else
69                 {
70                         gvs->result->num_index_tuples += 1;
71                         if (i != j)
72                                 (*cleaned)[j] = items[i];
73                         j++;
74                 }
75         }
76
77         return j;
78 }
79
80 /*
81  * fills WAL record for vacuum leaf page
82  */
83 static void
84 xlogVacuumPage(Relation index, Buffer buffer)
85 {
86         Page            page = BufferGetPage(buffer);
87         XLogRecPtr      recptr;
88         XLogRecData rdata[3];
89         ginxlogVacuumPage data;
90         char       *backup;
91         char            itups[BLCKSZ];
92         uint32          len = 0;
93
94         Assert(GinPageIsLeaf(page));
95
96         if (!RelationNeedsWAL(index))
97                 return;
98
99         data.node = index->rd_node;
100         data.blkno = BufferGetBlockNumber(buffer);
101
102         if (GinPageIsData(page))
103         {
104                 backup = GinDataPageGetData(page);
105                 data.nitem = GinPageGetOpaque(page)->maxoff;
106                 if (data.nitem)
107                         len = MAXALIGN(sizeof(ItemPointerData) * data.nitem);
108         }
109         else
110         {
111                 char       *ptr;
112                 OffsetNumber i;
113
114                 ptr = backup = itups;
115                 for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
116                 {
117                         IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
118
119                         memcpy(ptr, itup, IndexTupleSize(itup));
120                         ptr += MAXALIGN(IndexTupleSize(itup));
121                 }
122
123                 data.nitem = PageGetMaxOffsetNumber(page);
124                 len = ptr - backup;
125         }
126
127         rdata[0].buffer = buffer;
128         rdata[0].buffer_std = (GinPageIsData(page)) ? FALSE : TRUE;
129         rdata[0].len = 0;
130         rdata[0].data = NULL;
131         rdata[0].next = rdata + 1;
132
133         rdata[1].buffer = InvalidBuffer;
134         rdata[1].len = sizeof(ginxlogVacuumPage);
135         rdata[1].data = (char *) &data;
136
137         if (len == 0)
138         {
139                 rdata[1].next = NULL;
140         }
141         else
142         {
143                 rdata[1].next = rdata + 2;
144
145                 rdata[2].buffer = InvalidBuffer;
146                 rdata[2].len = len;
147                 rdata[2].data = backup;
148                 rdata[2].next = NULL;
149         }
150
151         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_PAGE, rdata);
152         PageSetLSN(page, recptr);
153         PageSetTLI(page, ThisTimeLineID);
154 }
155
156 static bool
157 ginVacuumPostingTreeLeaves(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, Buffer *rootBuffer)
158 {
159         Buffer          buffer;
160         Page            page;
161         bool            hasVoidPage = FALSE;
162
163         buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, blkno,
164                                                                 RBM_NORMAL, gvs->strategy);
165         page = BufferGetPage(buffer);
166
167         /*
168          * We should be sure that we don't concurrent with inserts, insert process
169          * never release root page until end (but it can unlock it and lock
170          * again). New scan can't start but previously started ones work
171          * concurrently.
172          */
173
174         if (isRoot)
175                 LockBufferForCleanup(buffer);
176         else
177                 LockBuffer(buffer, GIN_EXCLUSIVE);
178
179         Assert(GinPageIsData(page));
180
181         if (GinPageIsLeaf(page))
182         {
183                 OffsetNumber newMaxOff,
184                                         oldMaxOff = GinPageGetOpaque(page)->maxoff;
185                 ItemPointerData *cleaned = NULL;
186
187                 newMaxOff = ginVacuumPostingList(gvs,
188                                 (ItemPointer) GinDataPageGetData(page), oldMaxOff, &cleaned);
189
190                 /* saves changes about deleted tuple ... */
191                 if (oldMaxOff != newMaxOff)
192                 {
193
194                         START_CRIT_SECTION();
195
196                         if (newMaxOff > 0)
197                                 memcpy(GinDataPageGetData(page), cleaned, sizeof(ItemPointerData) * newMaxOff);
198                         pfree(cleaned);
199                         GinPageGetOpaque(page)->maxoff = newMaxOff;
200
201                         MarkBufferDirty(buffer);
202                         xlogVacuumPage(gvs->index, buffer);
203
204                         END_CRIT_SECTION();
205
206                         /* if root is a leaf page, we don't desire further processing */
207                         if (!isRoot && GinPageGetOpaque(page)->maxoff < FirstOffsetNumber)
208                                 hasVoidPage = TRUE;
209                 }
210         }
211         else
212         {
213                 OffsetNumber i;
214                 bool            isChildHasVoid = FALSE;
215
216                 for (i = FirstOffsetNumber; i <= GinPageGetOpaque(page)->maxoff; i++)
217                 {
218                         PostingItem *pitem = (PostingItem *) GinDataPageGetItem(page, i);
219
220                         if (ginVacuumPostingTreeLeaves(gvs, PostingItemGetBlockNumber(pitem), FALSE, NULL))
221                                 isChildHasVoid = TRUE;
222                 }
223
224                 if (isChildHasVoid)
225                         hasVoidPage = TRUE;
226         }
227
228         /*
229          * if we have root and theres void pages in tree, then we don't release
230          * lock to go further processing and guarantee that tree is unused
231          */
232         if (!(isRoot && hasVoidPage))
233         {
234                 UnlockReleaseBuffer(buffer);
235         }
236         else
237         {
238                 Assert(rootBuffer);
239                 *rootBuffer = buffer;
240         }
241
242         return hasVoidPage;
243 }
244
245 static void
246 ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkno,
247                           BlockNumber parentBlkno, OffsetNumber myoff, bool isParentRoot)
248 {
249         Buffer          dBuffer;
250         Buffer          lBuffer;
251         Buffer          pBuffer;
252         Page            page,
253                                 parentPage;
254
255         dBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, deleteBlkno,
256                                                                  RBM_NORMAL, gvs->strategy);
257
258         if (leftBlkno != InvalidBlockNumber)
259                 lBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, leftBlkno,
260                                                                          RBM_NORMAL, gvs->strategy);
261         else
262                 lBuffer = InvalidBuffer;
263
264         pBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, parentBlkno,
265                                                                  RBM_NORMAL, gvs->strategy);
266
267         LockBuffer(dBuffer, GIN_EXCLUSIVE);
268         if (!isParentRoot)                      /* parent is already locked by
269                                                                  * LockBufferForCleanup() */
270                 LockBuffer(pBuffer, GIN_EXCLUSIVE);
271         if (leftBlkno != InvalidBlockNumber)
272                 LockBuffer(lBuffer, GIN_EXCLUSIVE);
273
274         START_CRIT_SECTION();
275
276         if (leftBlkno != InvalidBlockNumber)
277         {
278                 BlockNumber rightlink;
279
280                 page = BufferGetPage(dBuffer);
281                 rightlink = GinPageGetOpaque(page)->rightlink;
282
283                 page = BufferGetPage(lBuffer);
284                 GinPageGetOpaque(page)->rightlink = rightlink;
285         }
286
287         parentPage = BufferGetPage(pBuffer);
288 #ifdef USE_ASSERT_CHECKING
289         do
290         {
291                 PostingItem *tod = (PostingItem *) GinDataPageGetItem(parentPage, myoff);
292
293                 Assert(PostingItemGetBlockNumber(tod) == deleteBlkno);
294         } while (0);
295 #endif
296         GinPageDeletePostingItem(parentPage, myoff);
297
298         page = BufferGetPage(dBuffer);
299
300         /*
301          * we shouldn't change rightlink field to save workability of running
302          * search scan
303          */
304         GinPageGetOpaque(page)->flags = GIN_DELETED;
305
306         MarkBufferDirty(pBuffer);
307         if (leftBlkno != InvalidBlockNumber)
308                 MarkBufferDirty(lBuffer);
309         MarkBufferDirty(dBuffer);
310
311         if (RelationNeedsWAL(gvs->index))
312         {
313                 XLogRecPtr      recptr;
314                 XLogRecData rdata[4];
315                 ginxlogDeletePage data;
316                 int                     n;
317
318                 data.node = gvs->index->rd_node;
319                 data.blkno = deleteBlkno;
320                 data.parentBlkno = parentBlkno;
321                 data.parentOffset = myoff;
322                 data.leftBlkno = leftBlkno;
323                 data.rightLink = GinPageGetOpaque(page)->rightlink;
324
325                 rdata[0].buffer = dBuffer;
326                 rdata[0].buffer_std = FALSE;
327                 rdata[0].data = NULL;
328                 rdata[0].len = 0;
329                 rdata[0].next = rdata + 1;
330
331                 rdata[1].buffer = pBuffer;
332                 rdata[1].buffer_std = FALSE;
333                 rdata[1].data = NULL;
334                 rdata[1].len = 0;
335                 rdata[1].next = rdata + 2;
336
337                 if (leftBlkno != InvalidBlockNumber)
338                 {
339                         rdata[2].buffer = lBuffer;
340                         rdata[2].buffer_std = FALSE;
341                         rdata[2].data = NULL;
342                         rdata[2].len = 0;
343                         rdata[2].next = rdata + 3;
344                         n = 3;
345                 }
346                 else
347                         n = 2;
348
349                 rdata[n].buffer = InvalidBuffer;
350                 rdata[n].buffer_std = FALSE;
351                 rdata[n].len = sizeof(ginxlogDeletePage);
352                 rdata[n].data = (char *) &data;
353                 rdata[n].next = NULL;
354
355                 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_PAGE, rdata);
356                 PageSetLSN(page, recptr);
357                 PageSetTLI(page, ThisTimeLineID);
358                 PageSetLSN(parentPage, recptr);
359                 PageSetTLI(parentPage, ThisTimeLineID);
360                 if (leftBlkno != InvalidBlockNumber)
361                 {
362                         page = BufferGetPage(lBuffer);
363                         PageSetLSN(page, recptr);
364                         PageSetTLI(page, ThisTimeLineID);
365                 }
366         }
367
368         if (!isParentRoot)
369                 LockBuffer(pBuffer, GIN_UNLOCK);
370         ReleaseBuffer(pBuffer);
371
372         if (leftBlkno != InvalidBlockNumber)
373                 UnlockReleaseBuffer(lBuffer);
374
375         UnlockReleaseBuffer(dBuffer);
376
377         END_CRIT_SECTION();
378
379         gvs->result->pages_deleted++;
380 }
381
382 typedef struct DataPageDeleteStack
383 {
384         struct DataPageDeleteStack *child;
385         struct DataPageDeleteStack *parent;
386
387         BlockNumber blkno;                      /* current block number */
388         BlockNumber leftBlkno;          /* rightest non-deleted page on left */
389         bool            isRoot;
390 } DataPageDeleteStack;
391
392 /*
393  * scans posting tree and deletes empty pages
394  */
395 static bool
396 ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, DataPageDeleteStack *parent, OffsetNumber myoff)
397 {
398         DataPageDeleteStack *me;
399         Buffer          buffer;
400         Page            page;
401         bool            meDelete = FALSE;
402
403         if (isRoot)
404         {
405                 me = parent;
406         }
407         else
408         {
409                 if (!parent->child)
410                 {
411                         me = (DataPageDeleteStack *) palloc0(sizeof(DataPageDeleteStack));
412                         me->parent = parent;
413                         parent->child = me;
414                         me->leftBlkno = InvalidBlockNumber;
415                 }
416                 else
417                         me = parent->child;
418         }
419
420         buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, blkno,
421                                                                 RBM_NORMAL, gvs->strategy);
422         page = BufferGetPage(buffer);
423
424         Assert(GinPageIsData(page));
425
426         if (!GinPageIsLeaf(page))
427         {
428                 OffsetNumber i;
429
430                 me->blkno = blkno;
431                 for (i = FirstOffsetNumber; i <= GinPageGetOpaque(page)->maxoff; i++)
432                 {
433                         PostingItem *pitem = (PostingItem *) GinDataPageGetItem(page, i);
434
435                         if (ginScanToDelete(gvs, PostingItemGetBlockNumber(pitem), FALSE, me, i))
436                                 i--;
437                 }
438         }
439
440         if (GinPageGetOpaque(page)->maxoff < FirstOffsetNumber)
441         {
442                 if (!(me->leftBlkno == InvalidBlockNumber && GinPageRightMost(page)))
443                 {
444                         /* we never delete right most branch */
445                         Assert(!isRoot);
446                         if (GinPageGetOpaque(page)->maxoff < FirstOffsetNumber)
447                         {
448                                 ginDeletePage(gvs, blkno, me->leftBlkno, me->parent->blkno, myoff, me->parent->isRoot);
449                                 meDelete = TRUE;
450                         }
451                 }
452         }
453
454         ReleaseBuffer(buffer);
455
456         if (!meDelete)
457                 me->leftBlkno = blkno;
458
459         return meDelete;
460 }
461
462 static void
463 ginVacuumPostingTree(GinVacuumState *gvs, BlockNumber rootBlkno)
464 {
465         Buffer          rootBuffer = InvalidBuffer;
466         DataPageDeleteStack root,
467                            *ptr,
468                            *tmp;
469
470         if (ginVacuumPostingTreeLeaves(gvs, rootBlkno, TRUE, &rootBuffer) == FALSE)
471         {
472                 Assert(rootBuffer == InvalidBuffer);
473                 return;
474         }
475
476         memset(&root, 0, sizeof(DataPageDeleteStack));
477         root.leftBlkno = InvalidBlockNumber;
478         root.isRoot = TRUE;
479
480         vacuum_delay_point();
481
482         ginScanToDelete(gvs, rootBlkno, TRUE, &root, InvalidOffsetNumber);
483
484         ptr = root.child;
485         while (ptr)
486         {
487                 tmp = ptr->child;
488                 pfree(ptr);
489                 ptr = tmp;
490         }
491
492         UnlockReleaseBuffer(rootBuffer);
493 }
494
495 /*
496  * returns modified page or NULL if page isn't modified.
497  * Function works with original page until first change is occurred,
498  * then page is copied into temporary one.
499  */
500 static Page
501 ginVacuumEntryPage(GinVacuumState *gvs, Buffer buffer, BlockNumber *roots, uint32 *nroot)
502 {
503         Page            origpage = BufferGetPage(buffer),
504                                 tmppage;
505         OffsetNumber i,
506                                 maxoff = PageGetMaxOffsetNumber(origpage);
507
508         tmppage = origpage;
509
510         *nroot = 0;
511
512         for (i = FirstOffsetNumber; i <= maxoff; i++)
513         {
514                 IndexTuple      itup = (IndexTuple) PageGetItem(tmppage, PageGetItemId(tmppage, i));
515
516                 if (GinIsPostingTree(itup))
517                 {
518                         /*
519                          * store posting tree's roots for further processing, we can't
520                          * vacuum it just now due to risk of deadlocks with scans/inserts
521                          */
522                         roots[*nroot] = GinItemPointerGetBlockNumber(&itup->t_tid);
523                         (*nroot)++;
524                 }
525                 else if (GinGetNPosting(itup) > 0)
526                 {
527                         /*
528                          * if we already create temporary page, we will make changes in
529                          * place
530                          */
531                         ItemPointerData *cleaned = (tmppage == origpage) ? NULL : GinGetPosting(itup);
532                         uint32          newN = ginVacuumPostingList(gvs, GinGetPosting(itup), GinGetNPosting(itup), &cleaned);
533
534                         if (GinGetNPosting(itup) != newN)
535                         {
536                                 Datum           value;
537                                 OffsetNumber attnum;
538
539                                 /*
540                                  * Some ItemPointers was deleted, so we should remake our
541                                  * tuple
542                                  */
543
544                                 if (tmppage == origpage)
545                                 {
546                                         /*
547                                          * On first difference we create temporary page in memory
548                                          * and copies content in to it.
549                                          */
550                                         tmppage = PageGetTempPageCopy(origpage);
551
552                                         if (newN > 0)
553                                         {
554                                                 Size            pos = ((char *) GinGetPosting(itup)) - ((char *) origpage);
555
556                                                 memcpy(tmppage + pos, cleaned, sizeof(ItemPointerData) * newN);
557                                         }
558
559                                         pfree(cleaned);
560
561                                         /* set itup pointer to new page */
562                                         itup = (IndexTuple) PageGetItem(tmppage, PageGetItemId(tmppage, i));
563                                 }
564
565                                 value = gin_index_getattr(&gvs->ginstate, itup);
566                                 attnum = gintuple_get_attrnum(&gvs->ginstate, itup);
567                                 itup = GinFormTuple(gvs->index, &gvs->ginstate, attnum, value,
568                                                                         GinGetPosting(itup), newN, true);
569                                 PageIndexTupleDelete(tmppage, i);
570
571                                 if (PageAddItem(tmppage, (Item) itup, IndexTupleSize(itup), i, false, false) != i)
572                                         elog(ERROR, "failed to add item to index page in \"%s\"",
573                                                  RelationGetRelationName(gvs->index));
574
575                                 pfree(itup);
576                         }
577                 }
578         }
579
580         return (tmppage == origpage) ? NULL : tmppage;
581 }
582
583 Datum
584 ginbulkdelete(PG_FUNCTION_ARGS)
585 {
586         IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
587         IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
588         IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
589         void       *callback_state = (void *) PG_GETARG_POINTER(3);
590         Relation        index = info->index;
591         BlockNumber blkno = GIN_ROOT_BLKNO;
592         GinVacuumState gvs;
593         Buffer          buffer;
594         BlockNumber rootOfPostingTree[BLCKSZ / (sizeof(IndexTupleData) + sizeof(ItemId))];
595         uint32          nRoot;
596
597         gvs.index = index;
598         gvs.callback = callback;
599         gvs.callback_state = callback_state;
600         gvs.strategy = info->strategy;
601         initGinState(&gvs.ginstate, index);
602
603         /* first time through? */
604         if (stats == NULL)
605         {
606                 /* Yes, so initialize stats to zeroes */
607                 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
608                 /* and cleanup any pending inserts */
609                 ginInsertCleanup(index, &gvs.ginstate, true, stats);
610         }
611
612         /* we'll re-count the tuples each time */
613         stats->num_index_tuples = 0;
614         gvs.result = stats;
615
616         buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
617                                                                 RBM_NORMAL, info->strategy);
618
619         /* find leaf page */
620         for (;;)
621         {
622                 Page            page = BufferGetPage(buffer);
623                 IndexTuple      itup;
624
625                 LockBuffer(buffer, GIN_SHARE);
626
627                 Assert(!GinPageIsData(page));
628
629                 if (GinPageIsLeaf(page))
630                 {
631                         LockBuffer(buffer, GIN_UNLOCK);
632                         LockBuffer(buffer, GIN_EXCLUSIVE);
633
634                         if (blkno == GIN_ROOT_BLKNO && !GinPageIsLeaf(page))
635                         {
636                                 LockBuffer(buffer, GIN_UNLOCK);
637                                 continue;               /* check it one more */
638                         }
639                         break;
640                 }
641
642                 Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
643
644                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
645                 blkno = GinItemPointerGetBlockNumber(&(itup)->t_tid);
646                 Assert(blkno != InvalidBlockNumber);
647
648                 UnlockReleaseBuffer(buffer);
649                 buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
650                                                                         RBM_NORMAL, info->strategy);
651         }
652
653         /* right now we found leftmost page in entry's BTree */
654
655         for (;;)
656         {
657                 Page            page = BufferGetPage(buffer);
658                 Page            resPage;
659                 uint32          i;
660
661                 Assert(!GinPageIsData(page));
662
663                 resPage = ginVacuumEntryPage(&gvs, buffer, rootOfPostingTree, &nRoot);
664
665                 blkno = GinPageGetOpaque(page)->rightlink;
666
667                 if (resPage)
668                 {
669                         START_CRIT_SECTION();
670                         PageRestoreTempPage(resPage, page);
671                         MarkBufferDirty(buffer);
672                         xlogVacuumPage(gvs.index, buffer);
673                         UnlockReleaseBuffer(buffer);
674                         END_CRIT_SECTION();
675                 }
676                 else
677                 {
678                         UnlockReleaseBuffer(buffer);
679                 }
680
681                 vacuum_delay_point();
682
683                 for (i = 0; i < nRoot; i++)
684                 {
685                         ginVacuumPostingTree(&gvs, rootOfPostingTree[i]);
686                         vacuum_delay_point();
687                 }
688
689                 if (blkno == InvalidBlockNumber)                /* rightmost page */
690                         break;
691
692                 buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
693                                                                         RBM_NORMAL, info->strategy);
694                 LockBuffer(buffer, GIN_EXCLUSIVE);
695         }
696
697         PG_RETURN_POINTER(gvs.result);
698 }
699
700 Datum
701 ginvacuumcleanup(PG_FUNCTION_ARGS)
702 {
703         IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
704         IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
705         Relation        index = info->index;
706         bool            needLock;
707         BlockNumber npages,
708                                 blkno;
709         BlockNumber totFreePages;
710         GinState        ginstate;
711         GinStatsData idxStat;
712
713         /*
714          * In an autovacuum analyze, we want to clean up pending insertions.
715          * Otherwise, an ANALYZE-only call is a no-op.
716          */
717         if (info->analyze_only)
718         {
719                 if (IsAutoVacuumWorkerProcess())
720                 {
721                         initGinState(&ginstate, index);
722                         ginInsertCleanup(index, &ginstate, true, stats);
723                 }
724                 PG_RETURN_POINTER(stats);
725         }
726
727         /*
728          * Set up all-zero stats and cleanup pending inserts if ginbulkdelete
729          * wasn't called
730          */
731         if (stats == NULL)
732         {
733                 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
734                 initGinState(&ginstate, index);
735                 ginInsertCleanup(index, &ginstate, true, stats);
736         }
737
738         memset(&idxStat, 0, sizeof(idxStat));
739
740         /*
741          * XXX we always report the heap tuple count as the number of index
742          * entries.  This is bogus if the index is partial, but it's real hard to
743          * tell how many distinct heap entries are referenced by a GIN index.
744          */
745         stats->num_index_tuples = info->num_heap_tuples;
746         stats->estimated_count = info->estimated_count;
747
748         /*
749          * Need lock unless it's local to this backend.
750          */
751         needLock = !RELATION_IS_LOCAL(index);
752
753         if (needLock)
754                 LockRelationForExtension(index, ExclusiveLock);
755         npages = RelationGetNumberOfBlocks(index);
756         if (needLock)
757                 UnlockRelationForExtension(index, ExclusiveLock);
758
759         totFreePages = 0;
760
761         for (blkno = GIN_ROOT_BLKNO; blkno < npages; blkno++)
762         {
763                 Buffer          buffer;
764                 Page            page;
765
766                 vacuum_delay_point();
767
768                 buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
769                                                                         RBM_NORMAL, info->strategy);
770                 LockBuffer(buffer, GIN_SHARE);
771                 page = (Page) BufferGetPage(buffer);
772
773                 if (GinPageIsDeleted(page))
774                 {
775                         Assert(blkno != GIN_ROOT_BLKNO);
776                         RecordFreeIndexPage(index, blkno);
777                         totFreePages++;
778                 }
779                 else if (GinPageIsData(page))
780                 {
781                         idxStat.nDataPages++;
782                 }
783                 else if (!GinPageIsList(page))
784                 {
785                         idxStat.nEntryPages++;
786
787                         if ( GinPageIsLeaf(page) )
788                                 idxStat.nEntries += PageGetMaxOffsetNumber(page);
789                 }
790
791                 UnlockReleaseBuffer(buffer);
792         }
793
794         /* Update the metapage with accurate page and entry counts */
795         idxStat.nTotalPages = npages;
796         ginUpdateStats(info->index, &idxStat);
797
798         /* Finally, vacuum the FSM */
799         IndexFreeSpaceMapVacuum(info->index);
800
801         stats->pages_free = totFreePages;
802
803         if (needLock)
804                 LockRelationForExtension(index, ExclusiveLock);
805         stats->num_pages = RelationGetNumberOfBlocks(index);
806         if (needLock)
807                 UnlockRelationForExtension(index, ExclusiveLock);
808
809         PG_RETURN_POINTER(stats);
810 }