OSDN Git Service

33e6f341548ad6cd07c72096d16d92a698731f17
[pg-rex/syncrep.git] / src / backend / access / gist / gistvacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * gistvacuum.c
4  *        vacuuming routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        src/backend/access/gist/gistvacuum.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/gist_private.h"
19 #include "catalog/storage.h"
20 #include "commands/vacuum.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "storage/freespace.h"
24 #include "storage/indexfsm.h"
25 #include "storage/lmgr.h"
26 #include "utils/memutils.h"
27
28
29 /*
30  * VACUUM cleanup: update FSM
31  */
32 Datum
33 gistvacuumcleanup(PG_FUNCTION_ARGS)
34 {
35         IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
36         IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
37         Relation        rel = info->index;
38         BlockNumber npages,
39                                 blkno;
40         BlockNumber totFreePages;
41         bool            needLock;
42
43         /* No-op in ANALYZE ONLY mode */
44         if (info->analyze_only)
45                 PG_RETURN_POINTER(stats);
46
47         /* Set up all-zero stats if gistbulkdelete wasn't called */
48         if (stats == NULL)
49         {
50                 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
51                 /* use heap's tuple count */
52                 stats->num_index_tuples = info->num_heap_tuples;
53                 stats->estimated_count = info->estimated_count;
54
55                 /*
56                  * XXX the above is wrong if index is partial.  Would it be OK to just
57                  * return NULL, or is there work we must do below?
58                  */
59         }
60
61         /*
62          * Need lock unless it's local to this backend.
63          */
64         needLock = !RELATION_IS_LOCAL(rel);
65
66         /* try to find deleted pages */
67         if (needLock)
68                 LockRelationForExtension(rel, ExclusiveLock);
69         npages = RelationGetNumberOfBlocks(rel);
70         if (needLock)
71                 UnlockRelationForExtension(rel, ExclusiveLock);
72
73         totFreePages = 0;
74         for (blkno = GIST_ROOT_BLKNO + 1; blkno < npages; blkno++)
75         {
76                 Buffer          buffer;
77                 Page            page;
78
79                 vacuum_delay_point();
80
81                 buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
82                                                                         info->strategy);
83                 LockBuffer(buffer, GIST_SHARE);
84                 page = (Page) BufferGetPage(buffer);
85
86                 if (PageIsNew(page) || GistPageIsDeleted(page))
87                 {
88                         totFreePages++;
89                         RecordFreeIndexPage(rel, blkno);
90                 }
91                 UnlockReleaseBuffer(buffer);
92         }
93
94         /* Finally, vacuum the FSM */
95         IndexFreeSpaceMapVacuum(info->index);
96
97         /* return statistics */
98         stats->pages_free = totFreePages;
99         if (needLock)
100                 LockRelationForExtension(rel, ExclusiveLock);
101         stats->num_pages = RelationGetNumberOfBlocks(rel);
102         if (needLock)
103                 UnlockRelationForExtension(rel, ExclusiveLock);
104
105         PG_RETURN_POINTER(stats);
106 }
107
108 typedef struct GistBDItem
109 {
110         GistNSN         parentlsn;
111         BlockNumber blkno;
112         struct GistBDItem *next;
113 } GistBDItem;
114
115 static void
116 pushStackIfSplited(Page page, GistBDItem *stack)
117 {
118         GISTPageOpaque opaque = GistPageGetOpaque(page);
119
120         if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
121                 (GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) &&
122                 opaque->rightlink != InvalidBlockNumber /* sanity check */ )
123         {
124                 /* split page detected, install right link to the stack */
125
126                 GistBDItem *ptr = (GistBDItem *) palloc(sizeof(GistBDItem));
127
128                 ptr->blkno = opaque->rightlink;
129                 ptr->parentlsn = stack->parentlsn;
130                 ptr->next = stack->next;
131                 stack->next = ptr;
132         }
133 }
134
135
136 /*
137  * Bulk deletion of all index entries pointing to a set of heap tuples and
138  * check invalid tuples after crash recovery.
139  * The set of target tuples is specified via a callback routine that tells
140  * whether any given heap tuple (identified by ItemPointer) is being deleted.
141  *
142  * Result: a palloc'd struct containing statistical info for VACUUM displays.
143  */
144 Datum
145 gistbulkdelete(PG_FUNCTION_ARGS)
146 {
147         IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
148         IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
149         IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
150         void       *callback_state = (void *) PG_GETARG_POINTER(3);
151         Relation        rel = info->index;
152         GistBDItem *stack,
153                            *ptr;
154
155         /* first time through? */
156         if (stats == NULL)
157                 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
158         /* we'll re-count the tuples each time */
159         stats->estimated_count = false;
160         stats->num_index_tuples = 0;
161
162         stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
163         stack->blkno = GIST_ROOT_BLKNO;
164
165         while (stack)
166         {
167                 Buffer          buffer;
168                 Page            page;
169                 OffsetNumber i,
170                                         maxoff;
171                 IndexTuple      idxtuple;
172                 ItemId          iid;
173
174                 buffer = ReadBufferExtended(rel, MAIN_FORKNUM, stack->blkno,
175                                                                         RBM_NORMAL, info->strategy);
176                 LockBuffer(buffer, GIST_SHARE);
177                 gistcheckpage(rel, buffer);
178                 page = (Page) BufferGetPage(buffer);
179
180                 if (GistPageIsLeaf(page))
181                 {
182                         OffsetNumber todelete[MaxOffsetNumber];
183                         int                     ntodelete = 0;
184
185                         LockBuffer(buffer, GIST_UNLOCK);
186                         LockBuffer(buffer, GIST_EXCLUSIVE);
187
188                         page = (Page) BufferGetPage(buffer);
189                         if (stack->blkno == GIST_ROOT_BLKNO && !GistPageIsLeaf(page))
190                         {
191                                 /* only the root can become non-leaf during relock */
192                                 UnlockReleaseBuffer(buffer);
193                                 /* one more check */
194                                 continue;
195                         }
196
197                         /*
198                          * check for split proceeded after look at parent, we should check
199                          * it after relock
200                          */
201                         pushStackIfSplited(page, stack);
202
203                         /*
204                          * Remove deletable tuples from page
205                          */
206
207                         maxoff = PageGetMaxOffsetNumber(page);
208
209                         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
210                         {
211                                 iid = PageGetItemId(page, i);
212                                 idxtuple = (IndexTuple) PageGetItem(page, iid);
213
214                                 if (callback(&(idxtuple->t_tid), callback_state))
215                                 {
216                                         todelete[ntodelete] = i - ntodelete;
217                                         ntodelete++;
218                                         stats->tuples_removed += 1;
219                                 }
220                                 else
221                                         stats->num_index_tuples += 1;
222                         }
223
224                         if (ntodelete)
225                         {
226                                 START_CRIT_SECTION();
227
228                                 MarkBufferDirty(buffer);
229
230                                 for (i = 0; i < ntodelete; i++)
231                                         PageIndexTupleDelete(page, todelete[i]);
232                                 GistMarkTuplesDeleted(page);
233
234                                 if (RelationNeedsWAL(rel))
235                                 {
236                                         XLogRecPtr      recptr;
237
238                                         recptr = gistXLogUpdate(rel->rd_node, buffer,
239                                                                                         todelete, ntodelete,
240                                                                                         NULL, 0, InvalidBuffer);
241                                         PageSetLSN(page, recptr);
242                                         PageSetTLI(page, ThisTimeLineID);
243                                 }
244                                 else
245                                         PageSetLSN(page, GetXLogRecPtrForTemp());
246
247                                 END_CRIT_SECTION();
248                         }
249
250                 }
251                 else
252                 {
253                         /* check for split proceeded after look at parent */
254                         pushStackIfSplited(page, stack);
255
256                         maxoff = PageGetMaxOffsetNumber(page);
257
258                         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
259                         {
260                                 iid = PageGetItemId(page, i);
261                                 idxtuple = (IndexTuple) PageGetItem(page, iid);
262
263                                 ptr = (GistBDItem *) palloc(sizeof(GistBDItem));
264                                 ptr->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
265                                 ptr->parentlsn = PageGetLSN(page);
266                                 ptr->next = stack->next;
267                                 stack->next = ptr;
268
269                                 if (GistTupleIsInvalid(idxtuple))
270                                         ereport(LOG,
271                                                         (errmsg("index \"%s\" contains an inner tuple marked as invalid",
272                                                                         RelationGetRelationName(rel)),
273                                                          errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."),
274                                                          errhint("Please REINDEX it.")));
275                         }
276                 }
277
278                 UnlockReleaseBuffer(buffer);
279
280                 ptr = stack->next;
281                 pfree(stack);
282                 stack = ptr;
283
284                 vacuum_delay_point();
285         }
286
287         PG_RETURN_POINTER(stats);
288 }