OSDN Git Service

Update copyright for 2009.
[pg-rex/syncrep.git] / src / backend / storage / freespace / freespace.c
1 /*-------------------------------------------------------------------------
2  *
3  * freespace.c
4  *        POSTGRES free space map for quickly finding free space in relations
5  *
6  *
7  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/storage/freespace/freespace.c,v 1.71 2009/01/01 17:23:47 momjian Exp $
12  *
13  *
14  * NOTES:
15  *
16  *  Free Space Map keeps track of the amount of free space on pages, and
17  *  allows quickly searching for a page with enough free space. The FSM is
18  *  stored in a dedicated relation fork of all heap relations, and those
19  *  index access methods that need it (see also indexfsm.c). See README for
20  *  more information.
21  *
22  *-------------------------------------------------------------------------
23  */
24 #include "postgres.h"
25
26 #include "access/htup.h"
27 #include "access/xlogutils.h"
28 #include "storage/bufpage.h"
29 #include "storage/bufmgr.h"
30 #include "storage/freespace.h"
31 #include "storage/fsm_internals.h"
32 #include "storage/lmgr.h"
33 #include "storage/lwlock.h"
34 #include "storage/smgr.h"
35 #include "utils/rel.h"
36 #include "utils/inval.h"
37 #include "miscadmin.h"
38
39 /*
40  * We use just one byte to store the amount of free space on a page, so we
41  * divide the amount of free space a page can have into 256 different
42  * categories. The highest category, 255, represents a page with at least
43  * MaxFSMRequestSize bytes of free space, and the second highest category
44  * represents the range from 254 * FSM_CAT_STEP, inclusive, to
45  * MaxFSMRequestSize, exclusive.
46  *
47  * MaxFSMRequestSize depends on the architecture and BLCKSZ, but assuming
48  * default 8k BLCKSZ, and that MaxFSMRequestSize is 24 bytes, the categories
49  * look like this
50  *
51  *
52  * Range     Category
53  * 0    - 31   0
54  * 32   - 63   1
55  * ...    ...  ...
56  * 8096 - 8127 253
57  * 8128 - 8163 254
58  * 8164 - 8192 255
59  *
60  * The reason that MaxFSMRequestSize is special is that if MaxFSMRequestSize
61  * isn't equal to a range boundary, a page with exactly MaxFSMRequestSize
62  * bytes of free space wouldn't satisfy a request for MaxFSMRequestSize
63  * bytes. If there isn't more than MaxFSMRequestSize bytes of free space on a
64  * completely empty page, that would mean that we could never satisfy a
65  * request of exactly MaxFSMRequestSize bytes.
66  */
67 #define FSM_CATEGORIES  256
68 #define FSM_CAT_STEP    (BLCKSZ / FSM_CATEGORIES)
69 #define MaxFSMRequestSize       MaxHeapTupleSize
70
71 /*
72  * Depth of the on-disk tree. We need to be able to address 2^32-1 blocks,
73  * and 1626 is the smallest number that satisfies X^3 >= 2^32-1. Likewise,
74  * 216 is the smallest number that satisfies X^4 >= 2^32-1. In practice,
75  * this means that 4096 bytes is the smallest BLCKSZ that we can get away
76  * with a 3-level tree, and 512 is the smallest we support.
77  */
78 #define FSM_TREE_DEPTH  ((SlotsPerFSMPage >= 1626) ? 3 : 4)
79
80 #define FSM_ROOT_LEVEL  (FSM_TREE_DEPTH - 1)
81 #define FSM_BOTTOM_LEVEL 0
82
83 /*
84  * The internal FSM routines work on a logical addressing scheme. Each
85  * level of the tree can be thought of as a separately addressable file.
86  */
87 typedef struct
88 {
89         int level;              /* level */
90         int logpageno;  /* page number within the level */
91 } FSMAddress;
92
93 /* Address of the root page. */
94 static const FSMAddress FSM_ROOT_ADDRESS = { FSM_ROOT_LEVEL, 0 };
95
96 /* functions to navigate the tree */
97 static FSMAddress fsm_get_child(FSMAddress parent, uint16 slot);
98 static FSMAddress fsm_get_parent(FSMAddress child, uint16 *slot);
99 static FSMAddress fsm_get_location(BlockNumber heapblk, uint16 *slot);
100 static BlockNumber fsm_get_heap_blk(FSMAddress addr, uint16 slot);
101 static BlockNumber fsm_logical_to_physical(FSMAddress addr);
102
103 static Buffer fsm_readbuf(Relation rel, FSMAddress addr, bool extend);
104 static void fsm_extend(Relation rel, BlockNumber fsm_nblocks);
105
106 /* functions to convert amount of free space to a FSM category */
107 static uint8 fsm_space_avail_to_cat(Size avail);
108 static uint8 fsm_space_needed_to_cat(Size needed);
109 static Size  fsm_space_cat_to_avail(uint8 cat);
110
111 /* workhorse functions for various operations */
112 static int fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
113                                                           uint8 newValue, uint8 minValue);
114 static BlockNumber fsm_search(Relation rel, uint8 min_cat);
115 static uint8 fsm_vacuum_page(Relation rel, FSMAddress addr, bool *eof);
116
117
118 /******** Public API ********/
119
120 /*
121  * GetPageWithFreeSpace - try to find a page in the given relation with
122  *              at least the specified amount of free space.
123  *
124  * If successful, return the block number; if not, return InvalidBlockNumber.
125  *
126  * The caller must be prepared for the possibility that the returned page
127  * will turn out to have too little space available by the time the caller
128  * gets a lock on it.  In that case, the caller should report the actual
129  * amount of free space available on that page and then try again (see
130  * RecordAndGetPageWithFreeSpace).      If InvalidBlockNumber is returned,
131  * extend the relation.
132  */
133 BlockNumber
134 GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
135 {
136         uint8 min_cat = fsm_space_needed_to_cat(spaceNeeded);
137         return fsm_search(rel, min_cat);
138 }
139
140 /*
141  * RecordAndGetPageWithFreeSpace - update info about a page and try again.
142  *
143  * We provide this combo form to save some locking overhead, compared to
144  * separate RecordPageWithFreeSpace + GetPageWithFreeSpace calls. There's
145  * also some effort to return a page close to the old page; if there's a
146  * page with enough free space on the same FSM page where the old one page
147  * is located, it is preferred.
148  */
149 BlockNumber
150 RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
151                                                           Size oldSpaceAvail, Size spaceNeeded)
152 {
153         int                     old_cat = fsm_space_avail_to_cat(oldSpaceAvail);
154         int                     search_cat = fsm_space_needed_to_cat(spaceNeeded);
155         FSMAddress      addr;
156         uint16          slot;
157         int                     search_slot;
158
159         /* Get the location of the FSM byte representing the heap block */
160         addr = fsm_get_location(oldPage, &slot);
161
162         search_slot = fsm_set_and_search(rel, addr, slot, old_cat, search_cat);
163
164         /*
165          * If fsm_set_and_search found a suitable new block, return that.
166          * Otherwise, search as usual.
167          */
168         if (search_slot != -1)
169                 return fsm_get_heap_blk(addr, search_slot);
170         else
171                 return fsm_search(rel, search_cat);
172 }
173
174 /*
175  * RecordPageWithFreeSpace - update info about a page.
176  *
177  * Note that if the new spaceAvail value is higher than the old value stored
178  * in the FSM, the space might not become visible to searchers until the next
179  * FreeSpaceMapVacuum call, which updates the upper level pages.
180  */
181 void
182 RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
183 {
184         int                     new_cat = fsm_space_avail_to_cat(spaceAvail);
185         FSMAddress      addr;
186         uint16          slot;
187
188         /* Get the location of the FSM byte representing the heap block */
189         addr = fsm_get_location(heapBlk, &slot);
190
191         fsm_set_and_search(rel, addr, slot, new_cat, 0);
192 }
193
194 /*
195  * XLogRecordPageWithFreeSpace - like RecordPageWithFreeSpace, for use in
196  *              WAL replay
197  */
198 void
199 XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
200                                                         Size spaceAvail)
201 {
202         int                     new_cat = fsm_space_avail_to_cat(spaceAvail);
203         FSMAddress      addr;
204         uint16          slot;
205         BlockNumber blkno;
206         Buffer          buf;
207         Page            page;
208
209         /* Get the location of the FSM byte representing the heap block */
210         addr = fsm_get_location(heapBlk, &slot);
211         blkno = fsm_logical_to_physical(addr);
212
213         /* If the page doesn't exist already, extend */
214         buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR);
215         page = BufferGetPage(buf);
216         if (PageIsNew(page))
217                 PageInit(page, BLCKSZ, 0);
218
219         if (fsm_set_avail(page, slot, new_cat))
220                 MarkBufferDirty(buf);
221         UnlockReleaseBuffer(buf);
222 }
223
224 /*
225  * GetRecordedFreePage - return the amount of free space on a particular page,
226  *              according to the FSM.
227  */
228 Size
229 GetRecordedFreeSpace(Relation rel, BlockNumber heapBlk)
230 {
231         FSMAddress      addr;
232         uint16          slot;
233         Buffer          buf;
234         uint8           cat;
235
236         /* Get the location of the FSM byte representing the heap block */
237         addr = fsm_get_location(heapBlk, &slot);
238
239         buf = fsm_readbuf(rel, addr, false);
240         if (!BufferIsValid(buf))
241                 return 0;
242         cat = fsm_get_avail(BufferGetPage(buf), slot);
243         ReleaseBuffer(buf);
244
245         return fsm_space_cat_to_avail(cat);
246 }
247
248 /*
249  * FreeSpaceMapTruncateRel - adjust for truncation of a relation.
250  *
251  * The caller must hold AccessExclusiveLock on the relation, to ensure
252  * that other backends receive the relcache invalidation event that this
253  * function sends, before accessing the FSM again.
254  *
255  * nblocks is the new size of the heap.
256  */
257 void
258 FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks)
259 {
260         BlockNumber     new_nfsmblocks;
261         FSMAddress      first_removed_address;
262         uint16          first_removed_slot;
263         Buffer          buf;
264
265         RelationOpenSmgr(rel);
266
267         /*
268          * If no FSM has been created yet for this relation, there's nothing to
269          * truncate.
270          */
271         if (!smgrexists(rel->rd_smgr, FSM_FORKNUM))
272                 return;
273
274         /* Get the location in the FSM of the first removed heap block */
275         first_removed_address = fsm_get_location(nblocks, &first_removed_slot);
276
277         /*
278          * Zero out the tail of the last remaining FSM page. If the slot
279          * representing the first removed heap block is at a page boundary, as
280          * the first slot on the FSM page that first_removed_address points to,
281          * we can just truncate that page altogether.
282          */
283         if (first_removed_slot > 0)
284         {
285                 buf = fsm_readbuf(rel, first_removed_address, false);
286                 if (!BufferIsValid(buf))
287                         return; /* nothing to do; the FSM was already smaller */
288                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
289                 fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
290                 MarkBufferDirty(buf);
291                 UnlockReleaseBuffer(buf);
292
293                 new_nfsmblocks = fsm_logical_to_physical(first_removed_address) + 1;
294         }
295         else
296         {
297                 new_nfsmblocks = fsm_logical_to_physical(first_removed_address);
298                 if (smgrnblocks(rel->rd_smgr, FSM_FORKNUM) <= new_nfsmblocks)
299                         return; /* nothing to do; the FSM was already smaller */
300         }
301
302         /* Truncate the unused FSM pages */
303         smgrtruncate(rel->rd_smgr, FSM_FORKNUM, new_nfsmblocks, rel->rd_istemp);
304
305         /*
306          * Need to invalidate the relcache entry, because rd_fsm_nblocks
307          * seen by other backends is no longer valid.
308          */
309         if (!InRecovery)
310                 CacheInvalidateRelcache(rel);
311
312         rel->rd_fsm_nblocks = new_nfsmblocks;
313 }
314
315 /*
316  * FreeSpaceMapVacuum - scan and fix any inconsistencies in the FSM
317  */
318 void
319 FreeSpaceMapVacuum(Relation rel)
320 {
321         bool dummy;
322
323         /*
324          * Traverse the tree in depth-first order. The tree is stored physically
325          * in depth-first order, so this should be pretty I/O efficient.
326          */
327         fsm_vacuum_page(rel, FSM_ROOT_ADDRESS, &dummy);
328 }
329
330 /******** Internal routines ********/
331
332 /*
333  * Return category corresponding x bytes of free space
334  */
335 static uint8
336 fsm_space_avail_to_cat(Size avail)
337 {
338         int cat;
339
340         Assert(avail < BLCKSZ);
341
342         if (avail >= MaxFSMRequestSize)
343                 return 255;
344
345         cat = avail / FSM_CAT_STEP;
346
347         /*
348          * The highest category, 255, is reserved for MaxFSMRequestSize bytes or
349          * more.
350          */
351         if (cat > 254)
352                 cat = 254;
353
354         return (uint8) cat;
355 }
356
357 /*
358  * Return the lower bound of the range of free space represented by given
359  * category.
360  */
361 static Size
362 fsm_space_cat_to_avail(uint8 cat)
363 {
364         /* The highest category represents exactly MaxFSMRequestSize bytes. */
365         if (cat == 255)
366                 return MaxFSMRequestSize;
367         else
368                 return cat * FSM_CAT_STEP;
369 }
370
371 /*
372  * Which category does a page need to have, to accommodate x bytes of data?
373  * While fsm_size_to_avail_cat() rounds down, this needs to round up.
374  */
375 static uint8
376 fsm_space_needed_to_cat(Size needed)
377 {
378         int cat;
379
380         /* Can't ask for more space than the highest category represents */
381         if (needed > MaxFSMRequestSize)
382                         elog(ERROR, "invalid FSM request size %lu",
383                                  (unsigned long) needed);
384
385         if (needed == 0)
386                 return 1;
387
388         cat = (needed + FSM_CAT_STEP - 1) / FSM_CAT_STEP;
389
390         if (cat > 255)
391                 cat = 255;
392
393         return (uint8) cat;
394 }
395
396 /*
397  * Returns the physical block number an FSM page
398  */
399 static BlockNumber
400 fsm_logical_to_physical(FSMAddress addr)
401 {
402         BlockNumber pages;
403         int leafno;
404         int l;
405
406         /*
407          * Calculate the logical page number of the first leaf page below the
408          * given page.
409          */
410         leafno = addr.logpageno;
411         for (l = 0; l < addr.level; l++)
412                 leafno *= SlotsPerFSMPage;
413
414         /* Count upper level nodes required to address the leaf page */
415         pages = 0;
416         for (l = 0; l < FSM_TREE_DEPTH; l++)
417         {
418                 pages += leafno + 1;
419                 leafno /= SlotsPerFSMPage;
420         }
421
422         /*
423          * If the page we were asked for wasn't at the bottom level, subtract
424          * the additional lower level pages we counted above.
425          */
426         pages -= addr.level;
427
428         /* Turn the page count into 0-based block number */
429         return pages - 1;
430 }
431
432 /*
433  * Return the FSM location corresponding to given heap block.
434  */
435 static FSMAddress
436 fsm_get_location(BlockNumber heapblk, uint16 *slot)
437 {
438         FSMAddress addr;
439
440         addr.level = FSM_BOTTOM_LEVEL;
441         addr.logpageno = heapblk / SlotsPerFSMPage;
442         *slot = heapblk % SlotsPerFSMPage;
443
444         return addr;
445 }
446
447 /*
448  * Return the heap block number corresponding to given location in the FSM.
449  */
450 static BlockNumber
451 fsm_get_heap_blk(FSMAddress addr, uint16 slot)
452 {
453         Assert(addr.level == FSM_BOTTOM_LEVEL);
454         return ((unsigned int) addr.logpageno) * SlotsPerFSMPage + slot;
455 }
456
457 /*
458  * Given a logical address of a child page, get the logical page number of
459  * the parent, and the slot within the parent corresponding to the child.
460  */
461 static FSMAddress
462 fsm_get_parent(FSMAddress child, uint16 *slot)
463 {
464         FSMAddress parent;
465
466         Assert(child.level < FSM_ROOT_LEVEL);
467
468         parent.level = child.level + 1;
469         parent.logpageno = child.logpageno / SlotsPerFSMPage;
470         *slot = child.logpageno % SlotsPerFSMPage;
471
472         return parent;
473 }
474
475 /*
476  * Given a logical address of a parent page, and a slot number get the
477  * logical address of the corresponding child page.
478  */
479 static FSMAddress
480 fsm_get_child(FSMAddress parent, uint16 slot)
481 {
482         FSMAddress child;
483
484         Assert(parent.level > FSM_BOTTOM_LEVEL);
485
486         child.level = parent.level - 1;
487         child.logpageno = parent.logpageno * SlotsPerFSMPage + slot;
488
489         return child;
490 }
491
492 /*
493  * Read a FSM page.
494  *
495  * If the page doesn't exist, InvalidBuffer is returned, or if 'extend' is
496  * true, the FSM file is extended.
497  */
498 static Buffer
499 fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
500 {
501         BlockNumber blkno = fsm_logical_to_physical(addr);
502         Buffer buf;
503
504         RelationOpenSmgr(rel);
505
506         /* If we haven't cached the size of the FSM yet, check it first */
507         if (rel->rd_fsm_nblocks == InvalidBlockNumber)
508         {
509                 if (smgrexists(rel->rd_smgr, FSM_FORKNUM))
510                         rel->rd_fsm_nblocks = smgrnblocks(rel->rd_smgr, FSM_FORKNUM);
511                 else
512                         rel->rd_fsm_nblocks = 0;
513         }
514
515         /* Handle requests beyond EOF */
516         if (blkno >= rel->rd_fsm_nblocks)
517         {
518                 if (extend)
519                         fsm_extend(rel, blkno + 1);
520                 else
521                         return InvalidBuffer;
522         }
523
524         /*
525          * Use ZERO_ON_ERROR mode, and initialize the page if necessary. The FSM
526          * information is not accurate anyway, so it's better to clear corrupt
527          * pages than error out. Since the FSM changes are not WAL-logged, the
528          * so-called torn page problem on crash can lead to pages with corrupt
529          * headers, for example.
530          */
531         buf = ReadBufferExtended(rel, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR, NULL);
532         if (PageIsNew(BufferGetPage(buf)))
533                 PageInit(BufferGetPage(buf), BLCKSZ, 0);
534         return buf;
535 }
536
537 /*
538  * Ensure that the FSM fork is at least fsm_nblocks long, extending
539  * it if necessary with empty pages. And by empty, I mean pages filled
540  * with zeros, meaning there's no free space.
541  */
542 static void
543 fsm_extend(Relation rel, BlockNumber fsm_nblocks)
544 {
545         BlockNumber fsm_nblocks_now;
546         Page pg;
547
548         pg = (Page) palloc(BLCKSZ);
549         PageInit(pg, BLCKSZ, 0);
550
551         /*
552          * We use the relation extension lock to lock out other backends
553          * trying to extend the FSM at the same time. It also locks out
554          * extension of the main fork, unnecessarily, but extending the
555          * FSM happens seldom enough that it doesn't seem worthwhile to
556          * have a separate lock tag type for it.
557          *
558          * Note that another backend might have extended or created the
559          * relation before we get the lock.
560          */
561         LockRelationForExtension(rel, ExclusiveLock);
562
563         /* Create the FSM file first if it doesn't exist */
564         if ((rel->rd_fsm_nblocks == 0 || rel->rd_fsm_nblocks == InvalidBlockNumber)
565                 && !smgrexists(rel->rd_smgr, FSM_FORKNUM))
566         {
567                 smgrcreate(rel->rd_smgr, FSM_FORKNUM, false);
568                 fsm_nblocks_now = 0;
569         }
570         else
571                 fsm_nblocks_now = smgrnblocks(rel->rd_smgr, FSM_FORKNUM);
572
573         while (fsm_nblocks_now < fsm_nblocks)
574         {
575                 smgrextend(rel->rd_smgr, FSM_FORKNUM, fsm_nblocks_now,
576                                    (char *) pg, rel->rd_istemp);
577                 fsm_nblocks_now++;
578         }
579
580         UnlockRelationForExtension(rel, ExclusiveLock);
581
582         pfree(pg);
583
584         /* Update the relcache with the up-to-date size */
585         if (!InRecovery)
586                 CacheInvalidateRelcache(rel);
587         rel->rd_fsm_nblocks = fsm_nblocks_now;
588 }
589
590 /*
591  * Set value in given FSM page and slot.
592  *
593  * If minValue > 0, the updated page is also searched for a page with at
594  * least minValue of free space. If one is found, its slot number is
595  * returned, -1 otherwise.
596  */
597 static int
598 fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
599                                    uint8 newValue, uint8 minValue)
600 {
601         Buffer          buf;
602         Page            page;
603         int                     newslot = -1;
604
605         buf = fsm_readbuf(rel, addr, true);
606         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
607
608         page = BufferGetPage(buf);
609
610         if (fsm_set_avail(page, slot, newValue))
611                 MarkBufferDirty(buf);
612
613         if (minValue != 0)
614         {
615                 /* Search while we still hold the lock */
616                 newslot = fsm_search_avail(buf, minValue,
617                                                                    addr.level == FSM_BOTTOM_LEVEL,
618                                                                    true);
619         }
620
621         UnlockReleaseBuffer(buf);
622
623         return newslot;
624 }
625
626 /*
627  * Search the tree for a heap page with at least min_cat of free space
628  */
629 static BlockNumber
630 fsm_search(Relation rel, uint8 min_cat)
631 {
632         int restarts = 0;
633         FSMAddress addr = FSM_ROOT_ADDRESS;
634
635         for (;;)
636         {
637                 int slot;
638                 Buffer buf;
639                 uint8 max_avail = 0;
640
641                 /* Read the FSM page. */
642                 buf = fsm_readbuf(rel, addr, false);
643
644                 /* Search within the page */
645                 if (BufferIsValid(buf))
646                 {
647                         LockBuffer(buf, BUFFER_LOCK_SHARE);
648                         slot = fsm_search_avail(buf, min_cat,
649                                                                         (addr.level == FSM_BOTTOM_LEVEL),
650                                                                         false);
651                         if (slot == -1)
652                                 max_avail = fsm_get_max_avail(BufferGetPage(buf));
653                         UnlockReleaseBuffer(buf);
654                 }
655                 else
656                         slot = -1;
657
658                 if (slot != -1)
659                 {
660                         /*
661                          * Descend the tree, or return the found block if we're at the
662                          * bottom.
663                          */
664                         if (addr.level == FSM_BOTTOM_LEVEL)
665                                 return fsm_get_heap_blk(addr, slot);
666
667                         addr = fsm_get_child(addr, slot);
668                 }
669                 else if (addr.level == FSM_ROOT_LEVEL)
670                 {
671                         /*
672                          * At the root, failure means there's no page with enough free
673                          * space in the FSM. Give up.
674                          */
675                         return InvalidBlockNumber;
676                 }
677                 else
678                 {
679                         uint16 parentslot;
680                         FSMAddress parent;
681
682                         /*
683                          * At lower level, failure can happen if the value in the upper-
684                          * level node didn't reflect the value on the lower page. Update
685                          * the upper node, to avoid falling into the same trap again, and
686                          * start over.
687                          *
688                          * There's a race condition here, if another backend updates this
689                          * page right after we release it, and gets the lock on the parent
690                          * page before us. We'll then update the parent page with the now
691                          * stale information we had. It's OK, because it should happen
692                          * rarely, and will be fixed by the next vacuum.
693                          */
694                         parent = fsm_get_parent(addr, &parentslot);
695                         fsm_set_and_search(rel, parent, parentslot, max_avail, 0);
696
697                         /*
698                          * If the upper pages are badly out of date, we might need to
699                          * loop quite a few times, updating them as we go. Any
700                          * inconsistencies should eventually be corrected and the loop
701                          * should end. Looping indefinitely is nevertheless scary, so
702                          * provide an emergency valve.
703                          */
704                         if (restarts++ > 10000)
705                                 return InvalidBlockNumber;
706
707                         /* Start search all over from the root */
708                         addr = FSM_ROOT_ADDRESS;
709                 }
710         }
711 }
712
713
714 /*
715  * Recursive guts of FreeSpaceMapVacuum
716  */
717 static uint8
718 fsm_vacuum_page(Relation rel, FSMAddress addr, bool *eof_p)
719 {
720         Buffer buf;
721         Page page;
722         uint8 max_avail;
723
724         /* Read the page if it exists, or return EOF */
725         buf = fsm_readbuf(rel, addr, false);
726         if (!BufferIsValid(buf))
727         {
728                 *eof_p = true;
729                 return 0;
730         }
731         else
732                 *eof_p = false;
733
734         page = BufferGetPage(buf);
735
736         /*
737          * Recurse into children, and fix the information stored about them
738          * at this level.
739          */
740         if (addr.level > FSM_BOTTOM_LEVEL)
741         {
742                 int slot;
743                 bool eof = false;
744
745                 for (slot = 0; slot < SlotsPerFSMPage; slot++)
746                 {
747                         int child_avail;
748
749                         /* After we hit end-of-file, just clear the rest of the slots */
750                         if (!eof)
751                                 child_avail = fsm_vacuum_page(rel, fsm_get_child(addr, slot), &eof);
752                         else
753                                 child_avail = 0;
754
755                         /* Update information about the child */
756                         if (fsm_get_avail(page, slot) != child_avail)
757                         {
758                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
759                                 fsm_set_avail(BufferGetPage(buf), slot, child_avail);
760                                 MarkBufferDirty(buf);
761                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
762                         }
763                 }
764         }
765
766         max_avail = fsm_get_max_avail(BufferGetPage(buf));
767
768         /*
769          * Reset the next slot pointer. This encourages the use of low-numbered
770          * pages, increasing the chances that a later vacuum can truncate the
771          * relation.
772          */
773         ((FSMPage) PageGetContents(page))->fp_next_slot = 0;
774
775         ReleaseBuffer(buf);
776
777         return max_avail;
778 }