OSDN Git Service

Rewrite hashbulkdelete() to make it amenable to new bucket locking
[pg-rex/syncrep.git] / src / backend / access / hash / hashpage.c
1 /*-------------------------------------------------------------------------
2  *
3  * hashpage.c
4  *        Hash table page management code for the Postgres hash access method
5  *
6  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.40 2003/09/02 02:18:38 tgl Exp $
12  *
13  * NOTES
14  *        Postgres hash pages look like ordinary relation pages.  The opaque
15  *        data at high addresses includes information about the page including
16  *        whether a page is an overflow page or a true bucket, the bucket
17  *        number, and the block numbers of the preceding and following pages
18  *        in the same bucket.
19  *
20  *        The first page in a hash relation, page zero, is special -- it stores
21  *        information describing the hash table; it is referred to as the
22  *        "meta page." Pages one and higher store the actual data.
23  *
24  *        There are also bitmap pages, which are not manipulated here;
25  *        see hashovfl.c.
26  *
27  *-------------------------------------------------------------------------
28  */
29
30 #include "postgres.h"
31
32 #include "access/genam.h"
33 #include "access/hash.h"
34 #include "miscadmin.h"
35 #include "storage/lmgr.h"
36
37
38 /*
39  *      We use high-concurrency locking on hash indices.  There are two cases in
40  *      which we don't do locking.  One is when we're building the index.
41  *      Since the creating transaction has not committed, no one can see
42  *      the index, and there's no reason to share locks.  The second case
43  *      is when we're just starting up the database system.  We use some
44  *      special-purpose initialization code in the relation cache manager
45  *      (see utils/cache/relcache.c) to allow us to do indexed scans on
46  *      the system catalogs before we'd normally be able to.  This happens
47  *      before the lock table is fully initialized, so we can't use it.
48  *      Strictly speaking, this violates 2pl, but we don't do 2pl on the
49  *      system catalogs anyway.
50  *
51  *      Note that our page locks are actual lockmanager locks, not buffer
52  *      locks (as are used by btree, for example).      This is a good idea because
53  *      the algorithms are not deadlock-free, and we'd better be able to detect
54  *      and recover from deadlocks.
55  *
56  *      Another important difference from btree is that a hash indexscan
57  *      retains both a lock and a buffer pin on the current index page
58  *      between hashgettuple() calls (btree keeps only a buffer pin).
59  *      Because of this, it's safe to do item deletions with only a regular
60  *      write lock on a hash page --- there cannot be an indexscan stopped on
61  *      the page being deleted, other than an indexscan of our own backend,
62  *      which will be taken care of by _hash_adjscans.
63  */
64 #define USELOCKING              (!BuildingHash && !IsInitProcessingMode())
65
66
67 static void _hash_setpagelock(Relation rel, BlockNumber blkno, int access);
68 static void _hash_unsetpagelock(Relation rel, BlockNumber blkno, int access);
69 static void _hash_splitbucket(Relation rel, Buffer metabuf,
70                                                           Bucket obucket, Bucket nbucket);
71
72
73 /*
74  *      _hash_metapinit() -- Initialize the metadata page of a hash index,
75  *                              the two buckets that we begin with and the initial
76  *                              bitmap page.
77  */
78 void
79 _hash_metapinit(Relation rel)
80 {
81         HashMetaPage metap;
82         HashPageOpaque pageopaque;
83         Buffer          metabuf;
84         Buffer          buf;
85         Page            pg;
86         uint16          i;
87
88         /* can't be sharing this with anyone, now... */
89         if (USELOCKING)
90                 LockRelation(rel, AccessExclusiveLock);
91
92         if (RelationGetNumberOfBlocks(rel) != 0)
93                 elog(ERROR, "cannot initialize non-empty hash index \"%s\"",
94                          RelationGetRelationName(rel));
95
96         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
97         pg = BufferGetPage(metabuf);
98         _hash_pageinit(pg, BufferGetPageSize(metabuf));
99
100         pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
101         pageopaque->hasho_oaddr = 0;
102         pageopaque->hasho_prevblkno = InvalidBlockNumber;
103         pageopaque->hasho_nextblkno = InvalidBlockNumber;
104         pageopaque->hasho_flag = LH_META_PAGE;
105         pageopaque->hasho_bucket = -1;
106
107         metap = (HashMetaPage) pg;
108
109         metap->hashm_magic = HASH_MAGIC;
110         metap->hashm_version = HASH_VERSION;
111         metap->hashm_ntuples = 0;
112         metap->hashm_nmaps = 0;
113         metap->hashm_ffactor = DEFAULT_FFACTOR;
114         metap->hashm_bsize = BufferGetPageSize(metabuf);
115         metap->hashm_bshift = _hash_log2(metap->hashm_bsize);
116         /* page size must be power of 2 */
117         Assert(metap->hashm_bsize == (1 << metap->hashm_bshift));
118         /* bitmap size is half of page size, to keep it also power of 2 */
119         metap->hashm_bmsize = (metap->hashm_bsize >> 1);
120         Assert(metap->hashm_bsize >= metap->hashm_bmsize +
121                    MAXALIGN(sizeof(PageHeaderData)) +
122                    MAXALIGN(sizeof(HashPageOpaqueData)));
123         Assert((1 << BMPG_SHIFT(metap)) == (BMPG_MASK(metap) + 1));
124
125         metap->hashm_procid = index_getprocid(rel, 1, HASHPROC);
126
127         /*
128          * We initialize the index with two buckets, 0 and 1, occupying physical
129          * blocks 1 and 2.  The first freespace bitmap page is in block 3.
130          */
131         metap->hashm_maxbucket = metap->hashm_lowmask = 1;      /* nbuckets - 1 */
132         metap->hashm_highmask = 3;      /* (nbuckets << 1) - 1 */
133
134         MemSet((char *) metap->hashm_spares, 0, sizeof(metap->hashm_spares));
135         MemSet((char *) metap->hashm_mapp, 0, sizeof(metap->hashm_mapp));
136
137         metap->hashm_spares[1] = 1;     /* the first bitmap page is only spare */
138         metap->hashm_ovflpoint = 1;
139         metap->hashm_firstfree = 0;
140
141         /*
142          * initialize the first two buckets
143          */
144         for (i = 0; i <= 1; i++)
145         {
146                 buf = _hash_getbuf(rel, BUCKET_TO_BLKNO(metap, i), HASH_WRITE);
147                 pg = BufferGetPage(buf);
148                 _hash_pageinit(pg, BufferGetPageSize(buf));
149                 pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
150                 pageopaque->hasho_oaddr = 0;
151                 pageopaque->hasho_prevblkno = InvalidBlockNumber;
152                 pageopaque->hasho_nextblkno = InvalidBlockNumber;
153                 pageopaque->hasho_flag = LH_BUCKET_PAGE;
154                 pageopaque->hasho_bucket = i;
155                 _hash_wrtbuf(rel, buf);
156         }
157
158         /*
159          * Initialize bitmap page.  Can't do this until we
160          * create the first two buckets, else smgr will complain.
161          */
162         _hash_initbitmap(rel, metap, 3);
163
164         /* all done */
165         _hash_wrtbuf(rel, metabuf);
166
167         if (USELOCKING)
168                 UnlockRelation(rel, AccessExclusiveLock);
169 }
170
171 /*
172  *      _hash_getbuf() -- Get a buffer by block number for read or write.
173  *
174  *              When this routine returns, the appropriate lock is set on the
175  *              requested buffer its reference count is correct.
176  *
177  *              XXX P_NEW is not used because, unlike the tree structures, we
178  *              need the bucket blocks to be at certain block numbers.  we must
179  *              depend on the caller to call _hash_pageinit on the block if it
180  *              knows that this is a new block.
181  */
182 Buffer
183 _hash_getbuf(Relation rel, BlockNumber blkno, int access)
184 {
185         Buffer          buf;
186
187         if (blkno == P_NEW)
188                 elog(ERROR, "hash AM does not use P_NEW");
189         switch (access)
190         {
191                 case HASH_WRITE:
192                 case HASH_READ:
193                         _hash_setpagelock(rel, blkno, access);
194                         break;
195                 default:
196                         elog(ERROR, "unrecognized hash access code: %d", access);
197                         break;
198         }
199         buf = ReadBuffer(rel, blkno);
200
201         /* ref count and lock type are correct */
202         return buf;
203 }
204
205 /*
206  *      _hash_relbuf() -- release a locked buffer.
207  */
208 void
209 _hash_relbuf(Relation rel, Buffer buf, int access)
210 {
211         BlockNumber blkno;
212
213         blkno = BufferGetBlockNumber(buf);
214
215         switch (access)
216         {
217                 case HASH_WRITE:
218                 case HASH_READ:
219                         _hash_unsetpagelock(rel, blkno, access);
220                         break;
221                 default:
222                         elog(ERROR, "unrecognized hash access code: %d", access);
223                         break;
224         }
225
226         ReleaseBuffer(buf);
227 }
228
229 /*
230  *      _hash_wrtbuf() -- write a hash page to disk.
231  *
232  *              This routine releases the lock held on the buffer and our reference
233  *              to it.  It is an error to call _hash_wrtbuf() without a write lock
234  *              or a reference to the buffer.
235  */
236 void
237 _hash_wrtbuf(Relation rel, Buffer buf)
238 {
239         BlockNumber blkno;
240
241         blkno = BufferGetBlockNumber(buf);
242         WriteBuffer(buf);
243         _hash_unsetpagelock(rel, blkno, HASH_WRITE);
244 }
245
246 /*
247  *      _hash_wrtnorelbuf() -- write a hash page to disk, but do not release
248  *                                               our reference or lock.
249  *
250  *              It is an error to call _hash_wrtnorelbuf() without a write lock
251  *              or a reference to the buffer.
252  */
253 void
254 _hash_wrtnorelbuf(Buffer buf)
255 {
256         BlockNumber blkno;
257
258         blkno = BufferGetBlockNumber(buf);
259         WriteNoReleaseBuffer(buf);
260 }
261
262 /*
263  * _hash_chgbufaccess() -- Change from read to write access or vice versa.
264  *
265  * When changing from write to read, we assume the buffer is dirty and tell
266  * bufmgr it must be written out.
267  */
268 void
269 _hash_chgbufaccess(Relation rel,
270                                    Buffer buf,
271                                    int from_access,
272                                    int to_access)
273 {
274         BlockNumber blkno;
275
276         blkno = BufferGetBlockNumber(buf);
277
278         if (from_access == HASH_WRITE)
279                 _hash_wrtnorelbuf(buf);
280
281         _hash_unsetpagelock(rel, blkno, from_access);
282
283         _hash_setpagelock(rel, blkno, to_access);
284 }
285
286 /*
287  *      _hash_pageinit() -- Initialize a new page.
288  */
289 void
290 _hash_pageinit(Page page, Size size)
291 {
292         Assert(PageIsNew(page));
293         PageInit(page, size, sizeof(HashPageOpaqueData));
294 }
295
296 /*
297  *  _hash_setpagelock() -- Acquire the requested type of lock on a page.
298  */
299 static void
300 _hash_setpagelock(Relation rel,
301                                   BlockNumber blkno,
302                                   int access)
303 {
304         if (USELOCKING)
305         {
306                 switch (access)
307                 {
308                         case HASH_WRITE:
309                                 LockPage(rel, blkno, ExclusiveLock);
310                                 break;
311                         case HASH_READ:
312                                 LockPage(rel, blkno, ShareLock);
313                                 break;
314                         default:
315                                 elog(ERROR, "unrecognized hash access code: %d", access);
316                                 break;
317                 }
318         }
319 }
320
321 /*
322  *  _hash_unsetpagelock() -- Release the specified type of lock on a page.
323  */
324 static void
325 _hash_unsetpagelock(Relation rel,
326                                         BlockNumber blkno,
327                                         int access)
328 {
329         if (USELOCKING)
330         {
331                 switch (access)
332                 {
333                         case HASH_WRITE:
334                                 UnlockPage(rel, blkno, ExclusiveLock);
335                                 break;
336                         case HASH_READ:
337                                 UnlockPage(rel, blkno, ShareLock);
338                                 break;
339                         default:
340                                 elog(ERROR, "unrecognized hash access code: %d", access);
341                                 break;
342                 }
343         }
344 }
345
346 /*
347  * Delete a hash index item.
348  *
349  * It is safe to delete an item after acquiring a regular WRITE lock on
350  * the page, because no other backend can hold a READ lock on the page,
351  * and that means no other backend currently has an indexscan stopped on
352  * any item of the item being deleted.  Our own backend might have such
353  * an indexscan (in fact *will*, since that's how VACUUM found the item
354  * in the first place), but _hash_adjscans will fix the scan position.
355  */
356 void
357 _hash_pagedel(Relation rel, ItemPointer tid)
358 {
359         Buffer          buf;
360         Buffer          metabuf;
361         Page            page;
362         BlockNumber blkno;
363         OffsetNumber offno;
364         HashMetaPage metap;
365         HashPageOpaque opaque;
366
367         blkno = ItemPointerGetBlockNumber(tid);
368         offno = ItemPointerGetOffsetNumber(tid);
369
370         buf = _hash_getbuf(rel, blkno, HASH_WRITE);
371         page = BufferGetPage(buf);
372         _hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
373         opaque = (HashPageOpaque) PageGetSpecialPointer(page);
374
375         PageIndexTupleDelete(page, offno);
376
377         if (PageIsEmpty(page) && (opaque->hasho_flag & LH_OVERFLOW_PAGE))
378                 _hash_freeovflpage(rel, buf);
379         else
380                 _hash_wrtbuf(rel, buf);
381
382         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
383         metap = (HashMetaPage) BufferGetPage(metabuf);
384         _hash_checkpage((Page) metap, LH_META_PAGE);
385         metap->hashm_ntuples--;
386         _hash_wrtbuf(rel, metabuf);
387 }
388
389 /*
390  * Expand the hash table by creating one new bucket.
391  */
392 void
393 _hash_expandtable(Relation rel, Buffer metabuf)
394 {
395         HashMetaPage metap;
396         Bucket          old_bucket;
397         Bucket          new_bucket;
398         uint32          spare_ndx;
399
400         metap = (HashMetaPage) BufferGetPage(metabuf);
401         _hash_checkpage((Page) metap, LH_META_PAGE);
402
403         _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE);
404
405         new_bucket = ++metap->hashm_maxbucket;
406         old_bucket = (new_bucket & metap->hashm_lowmask);
407
408         if (new_bucket > metap->hashm_highmask)
409         {
410                 /* Starting a new doubling */
411                 metap->hashm_lowmask = metap->hashm_highmask;
412                 metap->hashm_highmask = new_bucket | metap->hashm_lowmask;
413         }
414
415         /*
416          * If the split point is increasing (hashm_maxbucket's log base 2
417          * increases), we need to adjust the hashm_spares[] array and
418          * hashm_ovflpoint so that future overflow pages will be created beyond
419          * this new batch of bucket pages.
420          *
421          * XXX should initialize new bucket pages to prevent out-of-order
422          * page creation.
423          */
424         spare_ndx = _hash_log2(metap->hashm_maxbucket + 1);
425         if (spare_ndx > metap->hashm_ovflpoint)
426         {
427                 Assert(spare_ndx == metap->hashm_ovflpoint + 1);
428                 metap->hashm_spares[spare_ndx] = metap->hashm_spares[metap->hashm_ovflpoint];
429                 metap->hashm_ovflpoint = spare_ndx;
430         }
431
432         _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
433
434         /* Relocate records to the new bucket */
435         _hash_splitbucket(rel, metabuf, old_bucket, new_bucket);
436 }
437
438
439 /*
440  * _hash_splitbucket -- split 'obucket' into 'obucket' and 'nbucket'
441  *
442  * We are splitting a bucket that consists of a base bucket page and zero
443  * or more overflow (bucket chain) pages.  We must relocate tuples that
444  * belong in the new bucket, and compress out any free space in the old
445  * bucket.
446  */
447 static void
448 _hash_splitbucket(Relation rel,
449                                   Buffer metabuf,
450                                   Bucket obucket,
451                                   Bucket nbucket)
452 {
453         Bucket          bucket;
454         Buffer          obuf;
455         Buffer          nbuf;
456         Buffer          ovflbuf;
457         BlockNumber oblkno;
458         BlockNumber nblkno;
459         BlockNumber start_oblkno;
460         BlockNumber start_nblkno;
461         bool            null;
462         Datum           datum;
463         HashItem        hitem;
464         HashPageOpaque oopaque;
465         HashPageOpaque nopaque;
466         HashMetaPage metap;
467         IndexTuple      itup;
468         Size            itemsz;
469         OffsetNumber ooffnum;
470         OffsetNumber noffnum;
471         OffsetNumber omaxoffnum;
472         Page            opage;
473         Page            npage;
474         TupleDesc       itupdesc = RelationGetDescr(rel);
475
476         metap = (HashMetaPage) BufferGetPage(metabuf);
477         _hash_checkpage((Page) metap, LH_META_PAGE);
478
479         /* get the buffers & pages */
480         start_oblkno = BUCKET_TO_BLKNO(metap, obucket);
481         start_nblkno = BUCKET_TO_BLKNO(metap, nbucket);
482         oblkno = start_oblkno;
483         nblkno = start_nblkno;
484         obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
485         nbuf = _hash_getbuf(rel, nblkno, HASH_WRITE);
486         opage = BufferGetPage(obuf);
487         npage = BufferGetPage(nbuf);
488
489         /* initialize the new bucket page */
490         _hash_pageinit(npage, BufferGetPageSize(nbuf));
491         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
492         nopaque->hasho_prevblkno = InvalidBlockNumber;
493         nopaque->hasho_nextblkno = InvalidBlockNumber;
494         nopaque->hasho_flag = LH_BUCKET_PAGE;
495         nopaque->hasho_oaddr = 0;
496         nopaque->hasho_bucket = nbucket;
497         _hash_wrtnorelbuf(nbuf);
498
499         /*
500          * make sure the old bucket isn't empty.  advance 'opage' and friends
501          * through the overflow bucket chain until we find a non-empty page.
502          *
503          * XXX we should only need this once, if we are careful to preserve the
504          * invariant that overflow pages are never empty.
505          */
506         _hash_checkpage(opage, LH_BUCKET_PAGE);
507         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
508         if (PageIsEmpty(opage))
509         {
510                 oblkno = oopaque->hasho_nextblkno;
511                 _hash_relbuf(rel, obuf, HASH_WRITE);
512                 if (!BlockNumberIsValid(oblkno))
513                 {
514                         /*
515                          * the old bucket is completely empty; of course, the new
516                          * bucket will be as well, but since it's a base bucket page
517                          * we don't care.
518                          */
519                         _hash_relbuf(rel, nbuf, HASH_WRITE);
520                         return;
521                 }
522                 obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
523                 opage = BufferGetPage(obuf);
524                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
525                 if (PageIsEmpty(opage))
526                         elog(ERROR, "empty hash overflow page %u", oblkno);
527                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
528         }
529
530         /*
531          * we are now guaranteed that 'opage' is not empty.  partition the
532          * tuples in the old bucket between the old bucket and the new bucket,
533          * advancing along their respective overflow bucket chains and adding
534          * overflow pages as needed.
535          */
536         ooffnum = FirstOffsetNumber;
537         omaxoffnum = PageGetMaxOffsetNumber(opage);
538         for (;;)
539         {
540                 /*
541                  * at each iteration through this loop, each of these variables
542                  * should be up-to-date: obuf opage oopaque ooffnum omaxoffnum
543                  */
544
545                 /* check if we're at the end of the page */
546                 if (ooffnum > omaxoffnum)
547                 {
548                         /* at end of page, but check for overflow page */
549                         oblkno = oopaque->hasho_nextblkno;
550                         if (BlockNumberIsValid(oblkno))
551                         {
552                                 /*
553                                  * we ran out of tuples on this particular page, but we
554                                  * have more overflow pages; re-init values.
555                                  */
556                                 _hash_wrtbuf(rel, obuf);
557                                 obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
558                                 opage = BufferGetPage(obuf);
559                                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
560                                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
561                                 /* we're guaranteed that an ovfl page has at least 1 tuple */
562                                 if (PageIsEmpty(opage))
563                                         elog(ERROR, "empty hash overflow page %u", oblkno);
564                                 ooffnum = FirstOffsetNumber;
565                                 omaxoffnum = PageGetMaxOffsetNumber(opage);
566                         }
567                         else
568                         {
569                                 /*
570                                  * We're at the end of the bucket chain, so now we're
571                                  * really done with everything.  Before quitting, call
572                                  * _hash_squeezebucket to ensure the tuples remaining in the
573                                  * old bucket (including the overflow pages) are packed as
574                                  * tightly as possible.  The new bucket is already tight.
575                                  */
576                                 _hash_wrtbuf(rel, obuf);
577                                 _hash_wrtbuf(rel, nbuf);
578                                 _hash_squeezebucket(rel, obucket, start_oblkno);
579                                 return;
580                         }
581                 }
582
583                 /* hash on the tuple */
584                 hitem = (HashItem) PageGetItem(opage, PageGetItemId(opage, ooffnum));
585                 itup = &(hitem->hash_itup);
586                 datum = index_getattr(itup, 1, itupdesc, &null);
587                 Assert(!null);
588
589                 bucket = _hash_call(rel, metap, datum);
590
591                 if (bucket == nbucket)
592                 {
593                         /*
594                          * insert the tuple into the new bucket.  if it doesn't fit on
595                          * the current page in the new bucket, we must allocate a new
596                          * overflow page and place the tuple on that page instead.
597                          */
598                         itemsz = IndexTupleDSize(hitem->hash_itup)
599                                 + (sizeof(HashItemData) - sizeof(IndexTupleData));
600
601                         itemsz = MAXALIGN(itemsz);
602
603                         if (PageGetFreeSpace(npage) < itemsz)
604                         {
605                                 ovflbuf = _hash_addovflpage(rel, metabuf, nbuf);
606                                 _hash_wrtbuf(rel, nbuf);
607                                 nbuf = ovflbuf;
608                                 npage = BufferGetPage(nbuf);
609                                 _hash_checkpage(npage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
610                         }
611
612                         noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
613                         if (PageAddItem(npage, (Item) hitem, itemsz, noffnum, LP_USED)
614                                 == InvalidOffsetNumber)
615                                 elog(ERROR, "failed to add index item to \"%s\"",
616                                          RelationGetRelationName(rel));
617                         _hash_wrtnorelbuf(nbuf);
618
619                         /*
620                          * now delete the tuple from the old bucket.  after this
621                          * section of code, 'ooffnum' will actually point to the
622                          * ItemId to which we would point if we had advanced it before
623                          * the deletion (PageIndexTupleDelete repacks the ItemId
624                          * array).      this also means that 'omaxoffnum' is exactly one
625                          * less than it used to be, so we really can just decrement it
626                          * instead of calling PageGetMaxOffsetNumber.
627                          */
628                         PageIndexTupleDelete(opage, ooffnum);
629                         _hash_wrtnorelbuf(obuf);
630                         omaxoffnum = OffsetNumberPrev(omaxoffnum);
631
632                         /*
633                          * tidy up.  if the old page was an overflow page and it is
634                          * now empty, we must free it (we want to preserve the
635                          * invariant that overflow pages cannot be empty).
636                          */
637                         if (PageIsEmpty(opage) &&
638                                 (oopaque->hasho_flag & LH_OVERFLOW_PAGE))
639                         {
640                                 oblkno = _hash_freeovflpage(rel, obuf);
641
642                                 /* check that we're not through the bucket chain */
643                                 if (!BlockNumberIsValid(oblkno))
644                                 {
645                                         _hash_wrtbuf(rel, nbuf);
646                                         _hash_squeezebucket(rel, obucket, start_oblkno);
647                                         return;
648                                 }
649
650                                 /*
651                                  * re-init. again, we're guaranteed that an ovfl page has
652                                  * at least one tuple.
653                                  */
654                                 obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
655                                 opage = BufferGetPage(obuf);
656                                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
657                                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
658                                 if (PageIsEmpty(opage))
659                                         elog(ERROR, "empty hash overflow page %u", oblkno);
660                                 ooffnum = FirstOffsetNumber;
661                                 omaxoffnum = PageGetMaxOffsetNumber(opage);
662                         }
663                 }
664                 else
665                 {
666                         /*
667                          * the tuple stays on this page.  we didn't move anything, so
668                          * we didn't delete anything and therefore we don't have to
669                          * change 'omaxoffnum'.
670                          */
671                         Assert(bucket == obucket);
672                         ooffnum = OffsetNumberNext(ooffnum);
673                 }
674         }
675         /* NOTREACHED */
676 }