OSDN Git Service

Performance improvement for MultiRecordFreeSpace on large relations ---
[pg-rex/syncrep.git] / src / backend / storage / freespace / freespace.c
1 /*-------------------------------------------------------------------------
2  *
3  * freespace.c
4  *        POSTGRES free space map for quickly finding free space in relations
5  *
6  *
7  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/freespace/freespace.c,v 1.14 2002/09/20 19:56:01 tgl Exp $
12  *
13  *
14  * NOTES:
15  *
16  * The only really interesting aspect of this code is the heuristics for
17  * deciding how much information we can afford to keep about each relation,
18  * given that we have a limited amount of workspace in shared memory.
19  * These currently work as follows:
20  *
21  * The number of distinct relations tracked is limited by a configuration
22  * variable (MaxFSMRelations).  When this would be exceeded, we discard the
23  * least frequently used relation.      A previously-unknown relation is always
24  * entered into the map with useCount 1 on first reference, even if this
25  * causes an existing entry with higher useCount to be discarded.  This may
26  * cause a little bit of thrashing among the bottom entries in the list,
27  * but if we didn't do it then there'd be no way for a relation not in the
28  * map to get in once the map is full.  Note we allow a relation to be in the
29  * map even if no pages are currently stored for it: this allows us to track
30  * its useCount & threshold, which may eventually go high enough to give it
31  * priority for page storage.
32  *
33  * The total number of pages tracked is also set by a configuration variable
34  * (MaxFSMPages).  We allocate these dynamically among the known relations,
35  * giving preference to the more-frequently-referenced relations and those
36  * with smaller tuples.  This is done by a thresholding mechanism: for each
37  * relation we keep track of a current target threshold, and allow only pages
38  * with free space >= threshold to be stored in the map.  The threshold starts
39  * at BLCKSZ/2 (a somewhat arbitrary choice) for a newly entered relation.
40  * On each GetFreeSpace reference, the threshold is moved towards the
41  * request size using a slow exponential moving average; this means that
42  * for heavily used relations, the threshold will approach the average
43  * freespace request size (average tuple size).  Whenever we run out of
44  * storage space in the map, we double the threshold of all existing relations
45  * (but not to more than BLCKSZ, so as to prevent runaway thresholds).
46  * Infrequently used relations will thus tend to have large thresholds.
47  *
48  * XXX this thresholding mechanism is experimental; need to see how well
49  * it works in practice.
50  *
51  *-------------------------------------------------------------------------
52  */
53 #include "postgres.h"
54
55 #include <limits.h>
56
57 #include "storage/freespace.h"
58 #include "storage/itemid.h"
59 #include "storage/lwlock.h"
60 #include "storage/shmem.h"
61
62
63 /*
64  * Shared free-space-map objects
65  *
66  * Note: we handle pointers to these items as pointers, not as SHMEM_OFFSETs.
67  * This assumes that all processes accessing the map will have the shared
68  * memory segment mapped at the same place in their address space.
69  */
70 typedef struct FSMHeader FSMHeader;
71 typedef struct FSMRelation FSMRelation;
72 typedef struct FSMChunk FSMChunk;
73
74 /* Header for whole map */
75 struct FSMHeader
76 {
77         HTAB       *relHash;            /* hashtable of FSMRelation entries */
78         FSMRelation *relList;           /* FSMRelations in useCount order */
79         FSMRelation *relListTail;       /* tail of FSMRelation list */
80         int                     numRels;                /* number of FSMRelations now in use */
81         FSMChunk   *freeChunks;         /* linked list of currently-free chunks */
82         int                     numFreeChunks;  /* number of free chunks */
83 };
84
85 /*
86  * Per-relation struct --- this is an entry in the shared hash table.
87  * The hash key is the RelFileNode value (hence, we look at the physical
88  * relation ID, not the logical ID, which is appropriate).
89  */
90 struct FSMRelation
91 {
92         RelFileNode key;                        /* hash key (must be first) */
93         FSMRelation *nextRel;           /* next rel in useCount order */
94         FSMRelation *priorRel;          /* prior rel in useCount order */
95         int                     useCount;               /* use count for prioritizing rels */
96         Size            threshold;              /* minimum amount of free space to keep */
97         int                     nextPage;               /* index (from 0) to start next search at */
98         int                     numPages;               /* total number of pages we have info
99                                                                  * about */
100         int                     numChunks;              /* number of FSMChunks allocated to rel */
101         FSMChunk   *relChunks;          /* linked list of page info chunks */
102         FSMChunk   *lastChunk;          /* last chunk in linked list */
103 };
104
105 /*
106  * Info about individual pages in a relation is stored in chunks to reduce
107  * allocation overhead.  Note that we allow any chunk of a relation's list
108  * to be partly full, not only the last chunk; this speeds deletion of
109  * individual page entries.  When the total number of unused slots reaches
110  * CHUNKPAGES, we compact out the unused slots so that we can return a chunk
111  * to the freelist; but there's no point in doing the compaction before that.
112  */
113
114 #define CHUNKPAGES      32                      /* each chunk can store this many pages */
115
116 struct FSMChunk
117 {
118         FSMChunk   *next;                       /* linked-list link */
119         int                     numPages;               /* number of pages described here */
120         BlockNumber pages[CHUNKPAGES];          /* page numbers within relation */
121         ItemLength      bytes[CHUNKPAGES];              /* free space available on each
122                                                                                  * page */
123 };
124
125
126 int                     MaxFSMRelations;        /* these are set by guc.c */
127 int                     MaxFSMPages;
128
129 static FSMHeader *FreeSpaceMap; /* points to FSMHeader in shared memory */
130
131
132 static FSMRelation *lookup_fsm_rel(RelFileNode *rel);
133 static FSMRelation *create_fsm_rel(RelFileNode *rel);
134 static void delete_fsm_rel(FSMRelation *fsmrel);
135 static void link_fsm_rel_after(FSMRelation *fsmrel, FSMRelation *oldrel);
136 static void unlink_fsm_rel(FSMRelation *fsmrel);
137 static void free_chunk_chain(FSMChunk *fchunk);
138 static BlockNumber find_free_space(FSMRelation *fsmrel, Size spaceNeeded);
139 static void fsm_record_free_space(FSMRelation *fsmrel, BlockNumber page,
140                                           Size spaceAvail);
141 static bool lookup_fsm_page_entry(FSMRelation *fsmrel, BlockNumber page,
142                                           FSMChunk **outChunk, int *outChunkRelIndex);
143 static bool insert_fsm_page_entry(FSMRelation *fsmrel,
144                                           BlockNumber page, Size spaceAvail,
145                                           FSMChunk *chunk, int chunkRelIndex);
146 static bool append_fsm_chunk(FSMRelation *fsmrel);
147 static bool push_fsm_page_entry(BlockNumber page, Size spaceAvail,
148                                         FSMChunk *chunk, int chunkRelIndex);
149 static void delete_fsm_page_entry(FSMRelation *fsmrel, FSMChunk *chunk,
150                                           int chunkRelIndex);
151 static void compact_fsm_page_list(FSMRelation *fsmrel);
152 static void acquire_fsm_free_space(void);
153
154
155 /*
156  * Exported routines
157  */
158
159
160 /*
161  * InitFreeSpaceMap -- Initialize the freespace module.
162  *
163  * This must be called once during shared memory initialization.
164  * It builds the empty free space map table.  FreeSpaceLock must also be
165  * initialized at some point, but is not touched here --- we assume there is
166  * no need for locking, since only the calling process can be accessing shared
167  * memory as yet.
168  */
169 void
170 InitFreeSpaceMap(void)
171 {
172         HASHCTL         info;
173         FSMChunk   *chunks,
174                            *prevchunk;
175         int                     nchunks;
176
177         /* Create table header */
178         FreeSpaceMap = (FSMHeader *) ShmemAlloc(sizeof(FSMHeader));
179         if (FreeSpaceMap == NULL)
180                 elog(FATAL, "Insufficient shared memory for free space map");
181         MemSet(FreeSpaceMap, 0, sizeof(FSMHeader));
182
183         /* Create hashtable for FSMRelations */
184         info.keysize = sizeof(RelFileNode);
185         info.entrysize = sizeof(FSMRelation);
186         info.hash = tag_hash;
187
188         FreeSpaceMap->relHash = ShmemInitHash("Free Space Map Hash",
189                                                                                   MaxFSMRelations / 10,
190                                                                                   MaxFSMRelations,
191                                                                                   &info,
192                                                                                   (HASH_ELEM | HASH_FUNCTION));
193
194         if (!FreeSpaceMap->relHash)
195                 elog(FATAL, "Insufficient shared memory for free space map");
196
197         /* Allocate FSMChunks and fill up the free-chunks list */
198         nchunks = (MaxFSMPages - 1) / CHUNKPAGES + 1;
199         FreeSpaceMap->numFreeChunks = nchunks;
200
201         chunks = (FSMChunk *) ShmemAlloc(nchunks * sizeof(FSMChunk));
202         if (chunks == NULL)
203                 elog(FATAL, "Insufficient shared memory for free space map");
204
205         prevchunk = NULL;
206         while (nchunks-- > 0)
207         {
208                 chunks->next = prevchunk;
209                 prevchunk = chunks;
210                 chunks++;
211         }
212         FreeSpaceMap->freeChunks = prevchunk;
213 }
214
215 /*
216  * Estimate amount of shmem space needed for FSM.
217  */
218 int
219 FreeSpaceShmemSize(void)
220 {
221         int                     size;
222         int                     nchunks;
223
224         /* table header */
225         size = MAXALIGN(sizeof(FSMHeader));
226
227         /* hash table, including the FSMRelation objects */
228         size += hash_estimate_size(MaxFSMRelations, sizeof(FSMRelation));
229
230         /* FSMChunk objects */
231         nchunks = (MaxFSMPages - 1) / CHUNKPAGES + 1;
232
233         size += MAXALIGN(nchunks * sizeof(FSMChunk));
234
235         return size;
236 }
237
238 /*
239  * GetPageWithFreeSpace - try to find a page in the given relation with
240  *              at least the specified amount of free space.
241  *
242  * If successful, return the block number; if not, return InvalidBlockNumber.
243  *
244  * The caller must be prepared for the possibility that the returned page
245  * will turn out to have too little space available by the time the caller
246  * gets a lock on it.  In that case, the caller should report the actual
247  * amount of free space available on that page (via RecordFreeSpace) and
248  * then try again.      If InvalidBlockNumber is returned, extend the relation.
249  */
250 BlockNumber
251 GetPageWithFreeSpace(RelFileNode *rel, Size spaceNeeded)
252 {
253         FSMRelation *fsmrel;
254         BlockNumber freepage;
255
256         LWLockAcquire(FreeSpaceLock, LW_EXCLUSIVE);
257
258         /*
259          * We always add a rel to the hashtable when it is inquired about.
260          */
261         fsmrel = create_fsm_rel(rel);
262
263         /*
264          * Adjust the threshold towards the space request.      This essentially
265          * implements an exponential moving average with an equivalent period
266          * of about 63 requests.  Ignore silly requests, however, to ensure
267          * that the average stays in bounds.
268          *
269          * In theory, if the threshold increases here we should immediately
270          * delete any pages that fall below the new threshold.  In practice it
271          * seems OK to wait until we have a need to compact space.
272          */
273         if (spaceNeeded > 0 && spaceNeeded < BLCKSZ)
274         {
275                 int                     cur_avg = (int) fsmrel->threshold;
276
277                 cur_avg += ((int) spaceNeeded - cur_avg) / 32;
278                 fsmrel->threshold = (Size) cur_avg;
279         }
280         freepage = find_free_space(fsmrel, spaceNeeded);
281         LWLockRelease(FreeSpaceLock);
282         return freepage;
283 }
284
285 /*
286  * RecordFreeSpace - record the amount of free space available on a page.
287  *
288  * The FSM is at liberty to forget about the page instead, if the amount of
289  * free space is too small to be interesting.  The only guarantee is that
290  * a subsequent request to get a page with more than spaceAvail space free
291  * will not return this page.
292  */
293 void
294 RecordFreeSpace(RelFileNode *rel, BlockNumber page, Size spaceAvail)
295 {
296         FSMRelation *fsmrel;
297
298         /* Sanity check: ensure spaceAvail will fit into ItemLength */
299         AssertArg(spaceAvail < BLCKSZ);
300
301         LWLockAcquire(FreeSpaceLock, LW_EXCLUSIVE);
302
303         /*
304          * We choose not to add rels to the hashtable unless they've been
305          * inquired about with GetPageWithFreeSpace.  Also, a Record operation
306          * does not increase the useCount or change threshold, only Get does.
307          */
308         fsmrel = lookup_fsm_rel(rel);
309         if (fsmrel)
310                 fsm_record_free_space(fsmrel, page, spaceAvail);
311         LWLockRelease(FreeSpaceLock);
312 }
313
314 /*
315  * RecordAndGetPageWithFreeSpace - combo form to save one lock and
316  *              hash table lookup cycle.
317  */
318 BlockNumber
319 RecordAndGetPageWithFreeSpace(RelFileNode *rel,
320                                                           BlockNumber oldPage,
321                                                           Size oldSpaceAvail,
322                                                           Size spaceNeeded)
323 {
324         FSMRelation *fsmrel;
325         BlockNumber freepage;
326
327         /* Sanity check: ensure spaceAvail will fit into ItemLength */
328         AssertArg(oldSpaceAvail < BLCKSZ);
329
330         LWLockAcquire(FreeSpaceLock, LW_EXCLUSIVE);
331
332         /*
333          * We always add a rel to the hashtable when it is inquired about.
334          */
335         fsmrel = create_fsm_rel(rel);
336
337         /*
338          * Adjust the threshold towards the space request, same as in
339          * GetPageWithFreeSpace.
340          *
341          * Note that we do this before storing data for oldPage, which means this
342          * isn't exactly equivalent to Record followed by Get; but it seems
343          * appropriate to adjust the threshold first.
344          */
345         if (spaceNeeded > 0 && spaceNeeded < BLCKSZ)
346         {
347                 int                     cur_avg = (int) fsmrel->threshold;
348
349                 cur_avg += ((int) spaceNeeded - cur_avg) / 32;
350                 fsmrel->threshold = (Size) cur_avg;
351         }
352         /* Do the Record */
353         fsm_record_free_space(fsmrel, oldPage, oldSpaceAvail);
354         /* Do the Get */
355         freepage = find_free_space(fsmrel, spaceNeeded);
356         LWLockRelease(FreeSpaceLock);
357         return freepage;
358 }
359
360 /*
361  * MultiRecordFreeSpace - record available-space info about multiple pages
362  *              of a relation in one call.
363  *
364  * First, the FSM must discard any entries it has for pages >= minPage.
365  * This allows obsolete info to be discarded (for example, it is used when
366  * truncating a relation).  Any entries before minPage should be kept.
367  *
368  * Second, if nPages > 0, record the page numbers and free space amounts in
369  * the given pageSpaces[] array.  As with RecordFreeSpace, the FSM is at
370  * liberty to discard some of this information.  The pageSpaces[] array must
371  * be sorted in order by blkno, and may not contain entries before minPage.
372  */
373 void
374 MultiRecordFreeSpace(RelFileNode *rel,
375                                          BlockNumber minPage,
376                                          int nPages,
377                                          PageFreeSpaceInfo *pageSpaces)
378 {
379         FSMRelation *fsmrel;
380         int                     i;
381
382         LWLockAcquire(FreeSpaceLock, LW_EXCLUSIVE);
383         fsmrel = lookup_fsm_rel(rel);
384         if (fsmrel)
385         {
386                 /*
387                  * Remove entries >= minPage
388                  */
389                 FSMChunk   *chunk;
390                 int                     chunkRelIndex;
391
392                 /* Use lookup to locate first entry >= minPage */
393                 lookup_fsm_page_entry(fsmrel, minPage, &chunk, &chunkRelIndex);
394                 /* Set free space to 0 for each page >= minPage */
395                 while (chunk)
396                 {
397                         int                     numPages = chunk->numPages;
398
399                         for (i = chunkRelIndex; i < numPages; i++)
400                                 chunk->bytes[i] = 0;
401                         chunk = chunk->next;
402                         chunkRelIndex = 0;
403                 }
404                 /* Now compact out the zeroed entries, along with any other junk */
405                 compact_fsm_page_list(fsmrel);
406
407                 /*
408                  * Add new entries, if appropriate.
409                  *
410                  * This can be much cheaper than a full fsm_record_free_space()
411                  * call because we know we are appending to the end of the relation.
412                  */
413                 for (i = 0; i < nPages; i++)
414                 {
415                         BlockNumber page = pageSpaces[i].blkno;
416                         Size            avail = pageSpaces[i].avail;
417                         FSMChunk   *chunk;
418
419                         /* Check caller provides sorted data */
420                         if (i > 0 ? (page <= pageSpaces[i-1].blkno) : (page < minPage))
421                                 elog(ERROR, "MultiRecordFreeSpace: data not in page order");
422
423                         /* Ignore pages too small to fit */
424                         if (avail < fsmrel->threshold)
425                                 continue;
426
427                         /* Get another chunk if needed */
428                         /* We may need to loop if acquire_fsm_free_space() fails */
429                         while ((chunk = fsmrel->lastChunk) == NULL ||
430                                    chunk->numPages >= CHUNKPAGES)
431                         {
432                                 if (!append_fsm_chunk(fsmrel))
433                                         acquire_fsm_free_space();
434                         }
435
436                         /* Recheck in case threshold was raised by acquire */
437                         if (avail < fsmrel->threshold)
438                                 continue;
439
440                         /* Okay to store */
441                         chunk->pages[chunk->numPages] = page;
442                         chunk->bytes[chunk->numPages] = (ItemLength) avail;
443                         chunk->numPages++;
444                         fsmrel->numPages++;
445                 }
446         }
447         LWLockRelease(FreeSpaceLock);
448 }
449
450 /*
451  * FreeSpaceMapForgetRel - forget all about a relation.
452  *
453  * This is called when a relation is deleted.  Although we could just let
454  * the rel age out of the map, it's better to reclaim and reuse the space
455  * sooner.
456  */
457 void
458 FreeSpaceMapForgetRel(RelFileNode *rel)
459 {
460         FSMRelation *fsmrel;
461
462         LWLockAcquire(FreeSpaceLock, LW_EXCLUSIVE);
463         fsmrel = lookup_fsm_rel(rel);
464         if (fsmrel)
465                 delete_fsm_rel(fsmrel);
466         LWLockRelease(FreeSpaceLock);
467 }
468
469 /*
470  * FreeSpaceMapForgetDatabase - forget all relations of a database.
471  *
472  * This is called during DROP DATABASE.  As above, might as well reclaim
473  * map space sooner instead of later.
474  *
475  * XXX when we implement tablespaces, target Oid will need to be tablespace
476  * ID not database ID.
477  */
478 void
479 FreeSpaceMapForgetDatabase(Oid dbid)
480 {
481         FSMRelation *fsmrel,
482                            *nextrel;
483
484         LWLockAcquire(FreeSpaceLock, LW_EXCLUSIVE);
485         for (fsmrel = FreeSpaceMap->relList; fsmrel; fsmrel = nextrel)
486         {
487                 nextrel = fsmrel->nextRel;              /* in case we delete it */
488                 if (fsmrel->key.tblNode == dbid)
489                         delete_fsm_rel(fsmrel);
490         }
491         LWLockRelease(FreeSpaceLock);
492 }
493
494
495 /*
496  * Internal routines.  These all assume the caller holds the FreeSpaceLock.
497  */
498
499 /*
500  * Lookup a relation in the hash table.  If not present, return NULL.
501  * The relation's useCount is not changed.
502  */
503 static FSMRelation *
504 lookup_fsm_rel(RelFileNode *rel)
505 {
506         FSMRelation *fsmrel;
507
508         fsmrel = (FSMRelation *) hash_search(FreeSpaceMap->relHash,
509                                                                                  (void *) rel,
510                                                                                  HASH_FIND,
511                                                                                  NULL);
512         if (!fsmrel)
513                 return NULL;
514
515         return fsmrel;
516 }
517
518 /*
519  * Lookup a relation in the hash table, creating an entry if not present.
520  *
521  * On successful lookup, the relation's useCount is incremented, possibly
522  * causing it to move up in the priority list.
523  */
524 static FSMRelation *
525 create_fsm_rel(RelFileNode *rel)
526 {
527         FSMRelation *fsmrel,
528                            *oldrel;
529         bool            found;
530
531         fsmrel = (FSMRelation *) hash_search(FreeSpaceMap->relHash,
532                                                                                  (void *) rel,
533                                                                                  HASH_ENTER,
534                                                                                  &found);
535         if (!fsmrel)
536                 elog(ERROR, "FreeSpaceMap hashtable out of memory");
537
538         if (!found)
539         {
540                 /* New hashtable entry, initialize it (hash_search set the key) */
541                 fsmrel->useCount = 1;
542                 fsmrel->threshold = BLCKSZ / 2; /* starting point for new entry */
543                 fsmrel->nextPage = 0;
544                 fsmrel->numPages = 0;
545                 fsmrel->numChunks = 0;
546                 fsmrel->relChunks = NULL;
547                 fsmrel->lastChunk = NULL;
548                 /* Discard lowest-priority existing rel, if we are over limit */
549                 if (FreeSpaceMap->numRels >= MaxFSMRelations)
550                         delete_fsm_rel(FreeSpaceMap->relListTail);
551
552                 /*
553                  * Add new entry in front of any others with useCount 1 (since it
554                  * is more recently used than them).
555                  */
556                 oldrel = FreeSpaceMap->relListTail;
557                 while (oldrel && oldrel->useCount <= 1)
558                         oldrel = oldrel->priorRel;
559                 link_fsm_rel_after(fsmrel, oldrel);
560                 FreeSpaceMap->numRels++;
561         }
562         else
563         {
564                 int                     myCount;
565
566                 /* Existing entry, advance its useCount */
567                 if (++(fsmrel->useCount) >= INT_MAX / 2)
568                 {
569                         /* When useCounts threaten to overflow, reduce 'em all 2X */
570                         for (oldrel = FreeSpaceMap->relList;
571                                  oldrel != NULL;
572                                  oldrel = oldrel->nextRel)
573                                 oldrel->useCount >>= 1;
574                 }
575                 /* If warranted, move it up the priority list */
576                 oldrel = fsmrel->priorRel;
577                 myCount = fsmrel->useCount;
578                 if (oldrel && oldrel->useCount <= myCount)
579                 {
580                         unlink_fsm_rel(fsmrel);
581                         while (oldrel && oldrel->useCount <= myCount)
582                                 oldrel = oldrel->priorRel;
583                         link_fsm_rel_after(fsmrel, oldrel);
584                 }
585         }
586
587         return fsmrel;
588 }
589
590 /*
591  * Remove an existing FSMRelation entry.
592  */
593 static void
594 delete_fsm_rel(FSMRelation *fsmrel)
595 {
596         FSMRelation *result;
597
598         free_chunk_chain(fsmrel->relChunks);
599         unlink_fsm_rel(fsmrel);
600         FreeSpaceMap->numRels--;
601         result = (FSMRelation *) hash_search(FreeSpaceMap->relHash,
602                                                                                  (void *) &(fsmrel->key),
603                                                                                  HASH_REMOVE,
604                                                                                  NULL);
605         if (!result)
606                 elog(ERROR, "FreeSpaceMap hashtable corrupted");
607 }
608
609 /*
610  * Link a FSMRelation into the priority list just after the given existing
611  * entry (or at the head of the list, if oldrel is NULL).
612  */
613 static void
614 link_fsm_rel_after(FSMRelation *fsmrel, FSMRelation *oldrel)
615 {
616         if (oldrel == NULL)
617         {
618                 /* insert at head */
619                 fsmrel->priorRel = NULL;
620                 fsmrel->nextRel = FreeSpaceMap->relList;
621                 FreeSpaceMap->relList = fsmrel;
622                 if (fsmrel->nextRel != NULL)
623                         fsmrel->nextRel->priorRel = fsmrel;
624                 else
625                         FreeSpaceMap->relListTail = fsmrel;
626         }
627         else
628         {
629                 /* insert after oldrel */
630                 fsmrel->priorRel = oldrel;
631                 fsmrel->nextRel = oldrel->nextRel;
632                 oldrel->nextRel = fsmrel;
633                 if (fsmrel->nextRel != NULL)
634                         fsmrel->nextRel->priorRel = fsmrel;
635                 else
636                         FreeSpaceMap->relListTail = fsmrel;
637         }
638 }
639
640 /*
641  * Delink a FSMRelation from the priority list.
642  */
643 static void
644 unlink_fsm_rel(FSMRelation *fsmrel)
645 {
646         if (fsmrel->priorRel != NULL)
647                 fsmrel->priorRel->nextRel = fsmrel->nextRel;
648         else
649                 FreeSpaceMap->relList = fsmrel->nextRel;
650         if (fsmrel->nextRel != NULL)
651                 fsmrel->nextRel->priorRel = fsmrel->priorRel;
652         else
653                 FreeSpaceMap->relListTail = fsmrel->priorRel;
654 }
655
656 /*
657  * Return all the FSMChunks in the chain starting at fchunk to the freelist.
658  * (Caller must handle unlinking them from wherever they were.)
659  */
660 static void
661 free_chunk_chain(FSMChunk *fchunk)
662 {
663         int                     nchunks;
664         FSMChunk   *lchunk;
665
666         if (fchunk == NULL)
667                 return;
668         nchunks = 1;
669         lchunk = fchunk;
670         while (lchunk->next != NULL)
671         {
672                 nchunks++;
673                 lchunk = lchunk->next;
674         }
675         lchunk->next = FreeSpaceMap->freeChunks;
676         FreeSpaceMap->freeChunks = fchunk;
677         FreeSpaceMap->numFreeChunks += nchunks;
678 }
679
680 /*
681  * Look to see if a page with at least the specified amount of space is
682  * available in the given FSMRelation.  If so, return its page number,
683  * and advance the nextPage counter so that the next inquiry will return
684  * a different page if possible.  Return InvalidBlockNumber if no success.
685  */
686 static BlockNumber
687 find_free_space(FSMRelation *fsmrel, Size spaceNeeded)
688 {
689         int                     pagesToCheck,   /* outer loop counter */
690                                 pageIndex,              /* current page index relative to relation */
691                                 chunkRelIndex;  /* current page index relative to curChunk */
692         FSMChunk   *curChunk;
693
694         pageIndex = fsmrel->nextPage;
695         /* Last operation may have left nextPage pointing past end */
696         if (pageIndex >= fsmrel->numPages)
697                 pageIndex = 0;
698         curChunk = fsmrel->relChunks;
699         chunkRelIndex = pageIndex;
700
701         for (pagesToCheck = fsmrel->numPages; pagesToCheck > 0; pagesToCheck--)
702         {
703                 /*
704                  * Make sure we are in the right chunk.  First time through, we
705                  * may have to advance through multiple chunks; subsequent loops
706                  * should do this at most once.
707                  */
708                 while (chunkRelIndex >= curChunk->numPages)
709                 {
710                         chunkRelIndex -= curChunk->numPages;
711                         curChunk = curChunk->next;
712                 }
713                 /* Check the next page */
714                 if ((Size) curChunk->bytes[chunkRelIndex] >= spaceNeeded)
715                 {
716                         fsmrel->nextPage = pageIndex + 1;
717                         return curChunk->pages[chunkRelIndex];
718                 }
719                 /* Advance pageIndex and chunkRelIndex, wrapping around if needed */
720                 if (++pageIndex >= fsmrel->numPages)
721                 {
722                         pageIndex = 0;
723                         curChunk = fsmrel->relChunks;
724                         chunkRelIndex = 0;
725                 }
726                 else
727                         chunkRelIndex++;
728         }
729
730         return InvalidBlockNumber;      /* nothing found */
731 }
732
733 /*
734  * fsm_record_free_space - guts of RecordFreeSpace, which is also used by
735  * RecordAndGetPageWithFreeSpace.
736  */
737 static void
738 fsm_record_free_space(FSMRelation *fsmrel, BlockNumber page, Size spaceAvail)
739 {
740         FSMChunk   *chunk;
741         int                     chunkRelIndex;
742
743         if (lookup_fsm_page_entry(fsmrel, page, &chunk, &chunkRelIndex))
744         {
745                 /* Found an existing entry for page; update or delete it */
746                 if (spaceAvail >= fsmrel->threshold)
747                         chunk->bytes[chunkRelIndex] = (ItemLength) spaceAvail;
748                 else
749                         delete_fsm_page_entry(fsmrel, chunk, chunkRelIndex);
750         }
751         else
752         {
753                 /*
754                  * No existing entry; add one if spaceAvail exceeds threshold.
755                  *
756                  * CORNER CASE: if we have to do acquire_fsm_free_space then our own
757                  * threshold will increase, possibly meaning that we shouldn't
758                  * store the page after all.  Loop to redo the test if that
759                  * happens.  The loop also covers the possibility that
760                  * acquire_fsm_free_space must be executed more than once to free
761                  * any space (ie, thresholds must be more than doubled).
762                  */
763                 while (spaceAvail >= fsmrel->threshold)
764                 {
765                         if (insert_fsm_page_entry(fsmrel, page, spaceAvail,
766                                                                           chunk, chunkRelIndex))
767                                 break;
768                         /* No space, acquire some and recheck threshold */
769                         acquire_fsm_free_space();
770                         if (spaceAvail < fsmrel->threshold)
771                                 break;
772
773                         /*
774                          * Need to redo the lookup since our own page list may well
775                          * have lost entries, so position is not correct anymore.
776                          */
777                         if (lookup_fsm_page_entry(fsmrel, page,
778                                                                           &chunk, &chunkRelIndex))
779                                 elog(ERROR, "fsm_record_free_space: unexpected match");
780                 }
781         }
782 }
783
784 /*
785  * Look for an entry for a specific page (block number) in a FSMRelation.
786  * Returns TRUE if a matching entry exists, else FALSE.
787  *
788  * The output arguments *outChunk, *outChunkRelIndex are set to indicate where
789  * the entry exists (if TRUE result) or could be inserted (if FALSE result).
790  * *chunk is set to NULL if there is no place to insert, ie, the entry would
791  * need to be added to a new chunk.
792  */
793 static bool
794 lookup_fsm_page_entry(FSMRelation *fsmrel, BlockNumber page,
795                                           FSMChunk **outChunk, int *outChunkRelIndex)
796 {
797         FSMChunk   *chunk;
798         int                     chunkRelIndex;
799
800         for (chunk = fsmrel->relChunks; chunk; chunk = chunk->next)
801         {
802                 int                     numPages = chunk->numPages;
803
804                 /* Can skip the chunk quickly if page must be after last in chunk */
805                 if (numPages > 0 && page <= chunk->pages[numPages - 1])
806                 {
807                         for (chunkRelIndex = 0; chunkRelIndex < numPages; chunkRelIndex++)
808                         {
809                                 if (page <= chunk->pages[chunkRelIndex])
810                                 {
811                                         *outChunk = chunk;
812                                         *outChunkRelIndex = chunkRelIndex;
813                                         return (page == chunk->pages[chunkRelIndex]);
814                                 }
815                         }
816                         /* Should not get here, given above test */
817                         Assert(false);
818                 }
819
820                 /*
821                  * If we are about to fall off the end, and there's space
822                  * available in the end chunk, return a pointer to it.
823                  */
824                 if (chunk->next == NULL && numPages < CHUNKPAGES)
825                 {
826                         *outChunk = chunk;
827                         *outChunkRelIndex = numPages;
828                         return false;
829                 }
830         }
831
832         /*
833          * Adding the page would require a new chunk (or, perhaps, compaction
834          * of available free space --- not my problem here).
835          */
836         *outChunk = NULL;
837         *outChunkRelIndex = 0;
838         return false;
839 }
840
841 /*
842  * Insert a new page entry into a FSMRelation's list at given position
843  * (chunk == NULL implies at end).
844  *
845  * If there is no space available to insert the entry, return FALSE.
846  */
847 static bool
848 insert_fsm_page_entry(FSMRelation *fsmrel, BlockNumber page, Size spaceAvail,
849                                           FSMChunk *chunk, int chunkRelIndex)
850 {
851         /* Outer loop handles retry after compacting rel's page list */
852         for (;;)
853         {
854                 if (fsmrel->numPages >= fsmrel->numChunks * CHUNKPAGES)
855                 {
856                         /* No free space within chunk list, so need another chunk */
857                         if (!append_fsm_chunk(fsmrel))
858                                 return false;   /* can't do it */
859                         if (chunk == NULL)
860                         {
861                                 /* Original search found that new page belongs at end */
862                                 chunk = fsmrel->lastChunk;
863                                 chunkRelIndex = 0;
864                         }
865                 }
866
867                 /*
868                  * Try to insert it the easy way, ie, just move down subsequent
869                  * data
870                  */
871                 if (chunk &&
872                         push_fsm_page_entry(page, spaceAvail, chunk, chunkRelIndex))
873                 {
874                         fsmrel->numPages++;
875                         fsmrel->nextPage++; /* don't return same page twice running */
876                         return true;
877                 }
878
879                 /*
880                  * There is space available, but evidently it's before the place
881                  * where the page entry needs to go.  Compact the list and try
882                  * again. This will require us to redo the search for the
883                  * appropriate place. Furthermore, compact_fsm_page_list deletes
884                  * empty end chunks, so we may need to repeat the action of
885                  * grabbing a new end chunk.
886                  */
887                 compact_fsm_page_list(fsmrel);
888                 if (lookup_fsm_page_entry(fsmrel, page, &chunk, &chunkRelIndex))
889                         elog(ERROR, "insert_fsm_page_entry: entry already exists!");
890         }
891 }
892
893 /*
894  * Add one chunk to a FSMRelation's chunk list, if possible.
895  *
896  * Returns TRUE if successful, FALSE if no space available.  Note that on
897  * success, the new chunk is easily accessible via fsmrel->lastChunk.
898  */
899 static bool
900 append_fsm_chunk(FSMRelation *fsmrel)
901 {
902         FSMChunk   *newChunk;
903
904         /* Remove a chunk from the freelist */
905         if ((newChunk = FreeSpaceMap->freeChunks) == NULL)
906                 return false;                   /* can't do it */
907         FreeSpaceMap->freeChunks = newChunk->next;
908         FreeSpaceMap->numFreeChunks--;
909
910         /* Initialize chunk to empty */
911         newChunk->next = NULL;
912         newChunk->numPages = 0;
913
914         /* Link it into FSMRelation */
915         if (fsmrel->relChunks == NULL)
916                 fsmrel->relChunks = newChunk;
917         else
918                 fsmrel->lastChunk->next = newChunk;
919         fsmrel->lastChunk = newChunk;
920         fsmrel->numChunks++;
921
922         return true;
923 }
924
925 /*
926  * Auxiliary routine for insert_fsm_page_entry: try to push entries to the
927  * right to insert at chunk/chunkRelIndex.      Return TRUE if successful.
928  * Note that the FSMRelation's own fields are not updated.
929  */
930 static bool
931 push_fsm_page_entry(BlockNumber page, Size spaceAvail,
932                                         FSMChunk *chunk, int chunkRelIndex)
933 {
934         int                     i;
935
936         if (chunk->numPages >= CHUNKPAGES)
937         {
938                 if (chunk->next == NULL)
939                         return false;           /* no space */
940                 /* try to push chunk's last item to next chunk */
941                 if (!push_fsm_page_entry(chunk->pages[CHUNKPAGES - 1],
942                                                                  chunk->bytes[CHUNKPAGES - 1],
943                                                                  chunk->next, 0))
944                         return false;
945                 /* successfully pushed it */
946                 chunk->numPages--;
947         }
948         for (i = chunk->numPages; i > chunkRelIndex; i--)
949         {
950                 chunk->pages[i] = chunk->pages[i - 1];
951                 chunk->bytes[i] = chunk->bytes[i - 1];
952         }
953         chunk->numPages++;
954         chunk->pages[chunkRelIndex] = page;
955         chunk->bytes[chunkRelIndex] = (ItemLength) spaceAvail;
956         return true;
957 }
958
959 /*
960  * Delete one page entry from a FSMRelation's list.  Compact free space
961  * in the list, but only if a chunk can be returned to the freelist.
962  */
963 static void
964 delete_fsm_page_entry(FSMRelation *fsmrel, FSMChunk *chunk, int chunkRelIndex)
965 {
966         int                     i,
967                                 lim;
968
969         Assert(chunk && chunkRelIndex >= 0 && chunkRelIndex < chunk->numPages);
970         /* Compact out space in this chunk */
971         lim = --chunk->numPages;
972         for (i = chunkRelIndex; i < lim; i++)
973         {
974                 chunk->pages[i] = chunk->pages[i + 1];
975                 chunk->bytes[i] = chunk->bytes[i + 1];
976         }
977         /* Compact the whole list if a chunk can be freed */
978         fsmrel->numPages--;
979         if (fsmrel->numPages <= (fsmrel->numChunks - 1) * CHUNKPAGES)
980                 compact_fsm_page_list(fsmrel);
981 }
982
983 /*
984  * Remove any pages with free space less than the current threshold for the
985  * FSMRelation, compact out free slots in non-last chunks, and return any
986  * completely freed chunks to the freelist.
987  */
988 static void
989 compact_fsm_page_list(FSMRelation *fsmrel)
990 {
991         Size            threshold = fsmrel->threshold;
992         FSMChunk   *srcChunk,
993                            *dstChunk;
994         int                     srcIndex,
995                                 dstIndex,
996                                 dstPages,
997                                 dstChunkCnt;
998
999         srcChunk = dstChunk = fsmrel->relChunks;
1000         srcIndex = dstIndex = 0;
1001         dstPages = 0;                           /* total pages kept */
1002         dstChunkCnt = 1;                        /* includes current dstChunk */
1003
1004         while (srcChunk != NULL)
1005         {
1006                 int                     srcPages = srcChunk->numPages;
1007
1008                 while (srcIndex < srcPages)
1009                 {
1010                         if ((Size) srcChunk->bytes[srcIndex] >= threshold)
1011                         {
1012                                 if (dstIndex >= CHUNKPAGES)
1013                                 {
1014                                         /*
1015                                          * At this point srcChunk must be pointing to a later
1016                                          * chunk, so it's OK to overwrite dstChunk->numPages.
1017                                          */
1018                                         dstChunk->numPages = dstIndex;
1019                                         dstChunk = dstChunk->next;
1020                                         dstChunkCnt++;
1021                                         dstIndex = 0;
1022                                 }
1023                                 dstChunk->pages[dstIndex] = srcChunk->pages[srcIndex];
1024                                 dstChunk->bytes[dstIndex] = srcChunk->bytes[srcIndex];
1025                                 dstIndex++;
1026                                 dstPages++;
1027                         }
1028                         srcIndex++;
1029                 }
1030                 srcChunk = srcChunk->next;
1031                 srcIndex = 0;
1032         }
1033
1034         if (dstPages == 0)
1035         {
1036                 /* No chunks to be kept at all */
1037                 fsmrel->nextPage = 0;
1038                 fsmrel->numPages = 0;
1039                 fsmrel->numChunks = 0;
1040                 fsmrel->relChunks = NULL;
1041                 fsmrel->lastChunk = NULL;
1042                 free_chunk_chain(dstChunk);
1043         }
1044         else
1045         {
1046                 /* we deliberately don't change nextPage here */
1047                 fsmrel->numPages = dstPages;
1048                 fsmrel->numChunks = dstChunkCnt;
1049                 dstChunk->numPages = dstIndex;
1050                 free_chunk_chain(dstChunk->next);
1051                 dstChunk->next = NULL;
1052                 fsmrel->lastChunk = dstChunk;
1053         }
1054 }
1055
1056 /*
1057  * Acquire some free space by raising the thresholds of all FSMRelations.
1058  *
1059  * Note there is no guarantee as to how much space will be freed by a single
1060  * invocation; caller may repeat if necessary.
1061  */
1062 static void
1063 acquire_fsm_free_space(void)
1064 {
1065         FSMRelation *fsmrel;
1066
1067         for (fsmrel = FreeSpaceMap->relList; fsmrel; fsmrel = fsmrel->nextRel)
1068         {
1069                 fsmrel->threshold *= 2;
1070                 /* Limit thresholds to BLCKSZ so they can't get ridiculously large */
1071                 if (fsmrel->threshold > BLCKSZ)
1072                         fsmrel->threshold = BLCKSZ;
1073                 /* Release any pages that don't meet the new threshold */
1074                 compact_fsm_page_list(fsmrel);
1075         }
1076 }
1077
1078
1079 #ifdef FREESPACE_DEBUG
1080 /*
1081  * Dump contents of freespace map for debugging.
1082  *
1083  * We assume caller holds the FreeSpaceLock, or is otherwise unconcerned
1084  * about other processes.
1085  */
1086 void
1087 DumpFreeSpace(void)
1088 {
1089         FSMRelation *fsmrel;
1090         FSMRelation *prevrel = NULL;
1091         int                     relNum = 0;
1092         FSMChunk   *chunk;
1093         int                     chunkRelIndex;
1094         int                     nChunks;
1095         int                     nPages;
1096
1097         for (fsmrel = FreeSpaceMap->relList; fsmrel; fsmrel = fsmrel->nextRel)
1098         {
1099                 relNum++;
1100                 fprintf(stderr, "Map %d: rel %u/%u useCount %d threshold %u nextPage %d\nMap= ",
1101                                 relNum, fsmrel->key.tblNode, fsmrel->key.relNode,
1102                                 fsmrel->useCount, fsmrel->threshold, fsmrel->nextPage);
1103                 nChunks = nPages = 0;
1104                 for (chunk = fsmrel->relChunks; chunk; chunk = chunk->next)
1105                 {
1106                         int                     numPages = chunk->numPages;
1107
1108                         nChunks++;
1109                         for (chunkRelIndex = 0; chunkRelIndex < numPages; chunkRelIndex++)
1110                         {
1111                                 nPages++;
1112                                 fprintf(stderr, " %u:%u",
1113                                                 chunk->pages[chunkRelIndex],
1114                                                 chunk->bytes[chunkRelIndex]);
1115                         }
1116                 }
1117                 fprintf(stderr, "\n");
1118                 /* Cross-check local counters and list links */
1119                 if (nPages != fsmrel->numPages)
1120                         fprintf(stderr, "DumpFreeSpace: %d pages in rel, but numPages = %d\n",
1121                                         nPages, fsmrel->numPages);
1122                 if (nChunks != fsmrel->numChunks)
1123                         fprintf(stderr, "DumpFreeSpace: %d chunks in rel, but numChunks = %d\n",
1124                                         nChunks, fsmrel->numChunks);
1125                 if (prevrel != fsmrel->priorRel)
1126                         fprintf(stderr, "DumpFreeSpace: broken list links\n");
1127                 prevrel = fsmrel;
1128         }
1129         if (prevrel != FreeSpaceMap->relListTail)
1130                 fprintf(stderr, "DumpFreeSpace: broken list links\n");
1131         /* Cross-check global counters */
1132         if (relNum != FreeSpaceMap->numRels)
1133                 fprintf(stderr, "DumpFreeSpace: %d rels in list, but numRels = %d\n",
1134                                 relNum, FreeSpaceMap->numRels);
1135         nChunks = 0;
1136         for (chunk = FreeSpaceMap->freeChunks; chunk; chunk = chunk->next)
1137                 nChunks++;
1138         if (nChunks != FreeSpaceMap->numFreeChunks)
1139                 fprintf(stderr, "DumpFreeSpace: %d chunks in list, but numFreeChunks = %d\n",
1140                                 nChunks, FreeSpaceMap->numFreeChunks);
1141 }
1142
1143 #endif   /* FREESPACE_DEBUG */