OSDN Git Service

Update copyright to 2004.
[pg-rex/syncrep.git] / src / backend / access / hash / hash.c
1 /*-------------------------------------------------------------------------
2  *
3  * hash.c
4  *        Implementation of Margo Seltzer's Hashing package for postgres.
5  *
6  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/hash/hash.c,v 1.72 2004/08/29 04:12:17 momjian Exp $
12  *
13  * NOTES
14  *        This file contains only the public interface routines.
15  *
16  *-------------------------------------------------------------------------
17  */
18
19 #include "postgres.h"
20
21 #include "access/genam.h"
22 #include "access/hash.h"
23 #include "access/heapam.h"
24 #include "access/xlogutils.h"
25 #include "catalog/index.h"
26 #include "commands/vacuum.h"
27 #include "executor/executor.h"
28 #include "miscadmin.h"
29
30
31 /* Working state for hashbuild and its callback */
32 typedef struct
33 {
34         double          indtuples;
35 } HashBuildState;
36
37 static void hashbuildCallback(Relation index,
38                                   HeapTuple htup,
39                                   Datum *attdata,
40                                   char *nulls,
41                                   bool tupleIsAlive,
42                                   void *state);
43
44
45 /*
46  *      hashbuild() -- build a new hash index.
47  *
48  *              We use a global variable to record the fact that we're creating
49  *              a new index.  This is used to avoid high-concurrency locking,
50  *              since the index won't be visible until this transaction commits
51  *              and since building is guaranteed to be single-threaded.
52  */
53 Datum
54 hashbuild(PG_FUNCTION_ARGS)
55 {
56         Relation        heap = (Relation) PG_GETARG_POINTER(0);
57         Relation        index = (Relation) PG_GETARG_POINTER(1);
58         IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
59         double          reltuples;
60         HashBuildState buildstate;
61
62         /*
63          * We expect to be called exactly once for any index relation. If
64          * that's not the case, big trouble's what we have.
65          */
66         if (RelationGetNumberOfBlocks(index) != 0)
67                 elog(ERROR, "index \"%s\" already contains data",
68                          RelationGetRelationName(index));
69
70         /* initialize the hash index metadata page */
71         _hash_metapinit(index);
72
73         /* build the index */
74         buildstate.indtuples = 0;
75
76         /* do the heap scan */
77         reltuples = IndexBuildHeapScan(heap, index, indexInfo,
78                                                                 hashbuildCallback, (void *) &buildstate);
79
80         /*
81          * Since we just counted the tuples in the heap, we update its stats
82          * in pg_class to guarantee that the planner takes advantage of the
83          * index we just created.  But, only update statistics during normal
84          * index definitions, not for indices on system catalogs created
85          * during bootstrap processing.  We must close the relations before
86          * updating statistics to guarantee that the relcache entries are
87          * flushed when we increment the command counter in UpdateStats(). But
88          * we do not release any locks on the relations; those will be held
89          * until end of transaction.
90          */
91         if (IsNormalProcessingMode())
92         {
93                 Oid                     hrelid = RelationGetRelid(heap);
94                 Oid                     irelid = RelationGetRelid(index);
95
96                 heap_close(heap, NoLock);
97                 index_close(index);
98                 UpdateStats(hrelid, reltuples);
99                 UpdateStats(irelid, buildstate.indtuples);
100         }
101
102         PG_RETURN_VOID();
103 }
104
105 /*
106  * Per-tuple callback from IndexBuildHeapScan
107  */
108 static void
109 hashbuildCallback(Relation index,
110                                   HeapTuple htup,
111                                   Datum *attdata,
112                                   char *nulls,
113                                   bool tupleIsAlive,
114                                   void *state)
115 {
116         HashBuildState *buildstate = (HashBuildState *) state;
117         IndexTuple      itup;
118         HashItem        hitem;
119         InsertIndexResult res;
120
121         /* form an index tuple and point it at the heap tuple */
122         itup = index_formtuple(RelationGetDescr(index), attdata, nulls);
123         itup->t_tid = htup->t_self;
124
125         /* Hash indexes don't index nulls, see notes in hashinsert */
126         if (IndexTupleHasNulls(itup))
127         {
128                 pfree(itup);
129                 return;
130         }
131
132         hitem = _hash_formitem(itup);
133
134         res = _hash_doinsert(index, hitem);
135
136         if (res)
137                 pfree(res);
138
139         buildstate->indtuples += 1;
140
141         pfree(hitem);
142         pfree(itup);
143 }
144
145 /*
146  *      hashinsert() -- insert an index tuple into a hash table.
147  *
148  *      Hash on the index tuple's key, find the appropriate location
149  *      for the new tuple, put it there, and return an InsertIndexResult
150  *      to the caller.
151  */
152 Datum
153 hashinsert(PG_FUNCTION_ARGS)
154 {
155         Relation        rel = (Relation) PG_GETARG_POINTER(0);
156         Datum      *datum = (Datum *) PG_GETARG_POINTER(1);
157         char       *nulls = (char *) PG_GETARG_POINTER(2);
158         ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
159
160 #ifdef NOT_USED
161         Relation        heapRel = (Relation) PG_GETARG_POINTER(4);
162         bool            checkUnique = PG_GETARG_BOOL(5);
163 #endif
164         InsertIndexResult res;
165         HashItem        hitem;
166         IndexTuple      itup;
167
168         /* generate an index tuple */
169         itup = index_formtuple(RelationGetDescr(rel), datum, nulls);
170         itup->t_tid = *ht_ctid;
171
172         /*
173          * If the single index key is null, we don't insert it into the index.
174          * Hash tables support scans on '='. Relational algebra says that A =
175          * B returns null if either A or B is null.  This means that no
176          * qualification used in an index scan could ever return true on a
177          * null attribute.      It also means that indices can't be used by ISNULL
178          * or NOTNULL scans, but that's an artifact of the strategy map
179          * architecture chosen in 1986, not of the way nulls are handled here.
180          */
181         if (IndexTupleHasNulls(itup))
182         {
183                 pfree(itup);
184                 PG_RETURN_POINTER(NULL);
185         }
186
187         hitem = _hash_formitem(itup);
188
189         res = _hash_doinsert(rel, hitem);
190
191         pfree(hitem);
192         pfree(itup);
193
194         PG_RETURN_POINTER(res);
195 }
196
197
198 /*
199  *      hashgettuple() -- Get the next tuple in the scan.
200  */
201 Datum
202 hashgettuple(PG_FUNCTION_ARGS)
203 {
204         IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
205         ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
206         HashScanOpaque so = (HashScanOpaque) scan->opaque;
207         Relation        rel = scan->indexRelation;
208         Page            page;
209         OffsetNumber offnum;
210         bool            res;
211
212         /*
213          * We hold pin but not lock on current buffer while outside the hash AM.
214          * Reacquire the read lock here.
215          */
216         if (BufferIsValid(so->hashso_curbuf))
217                 _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ);
218
219         /*
220          * If we've already initialized this scan, we can just advance it in
221          * the appropriate direction.  If we haven't done so yet, we call a
222          * routine to get the first item in the scan.
223          */
224         if (ItemPointerIsValid(&(scan->currentItemData)))
225         {
226                 /*
227                  * Check to see if we should kill the previously-fetched tuple.
228                  */
229                 if (scan->kill_prior_tuple)
230                 {
231                         /*
232                          * Yes, so mark it by setting the LP_DELETE bit in the item
233                          * flags.
234                          */
235                         offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
236                         page = BufferGetPage(so->hashso_curbuf);
237                         PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
238
239                         /*
240                          * Since this can be redone later if needed, it's treated the
241                          * same as a commit-hint-bit status update for heap tuples: we
242                          * mark the buffer dirty but don't make a WAL log entry.
243                          */
244                         SetBufferCommitInfoNeedsSave(so->hashso_curbuf);
245                 }
246
247                 /*
248                  * Now continue the scan.
249                  */
250                 res = _hash_next(scan, dir);
251         }
252         else
253                 res = _hash_first(scan, dir);
254
255         /*
256          * Skip killed tuples if asked to.
257          */
258         if (scan->ignore_killed_tuples)
259         {
260                 while (res)
261                 {
262                         offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
263                         page = BufferGetPage(so->hashso_curbuf);
264                         if (!ItemIdDeleted(PageGetItemId(page, offnum)))
265                                 break;
266                         res = _hash_next(scan, dir);
267                 }
268         }
269
270         /* Release read lock on current buffer, but keep it pinned */
271         if (BufferIsValid(so->hashso_curbuf))
272                 _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK);
273
274         PG_RETURN_BOOL(res);
275 }
276
277
278 /*
279  *      hashbeginscan() -- start a scan on a hash index
280  */
281 Datum
282 hashbeginscan(PG_FUNCTION_ARGS)
283 {
284         Relation        rel = (Relation) PG_GETARG_POINTER(0);
285         int                     keysz = PG_GETARG_INT32(1);
286         ScanKey         scankey = (ScanKey) PG_GETARG_POINTER(2);
287         IndexScanDesc scan;
288         HashScanOpaque so;
289
290         scan = RelationGetIndexScan(rel, keysz, scankey);
291         so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
292         so->hashso_bucket_valid = false;
293         so->hashso_bucket_blkno = 0;
294         so->hashso_curbuf = so->hashso_mrkbuf = InvalidBuffer;
295         scan->opaque = so;
296
297         /* register scan in case we change pages it's using */
298         _hash_regscan(scan);
299
300         PG_RETURN_POINTER(scan);
301 }
302
303 /*
304  *      hashrescan() -- rescan an index relation
305  */
306 Datum
307 hashrescan(PG_FUNCTION_ARGS)
308 {
309         IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
310         ScanKey         scankey = (ScanKey) PG_GETARG_POINTER(1);
311         HashScanOpaque so = (HashScanOpaque) scan->opaque;
312         Relation        rel = scan->indexRelation;
313
314         /* if we are called from beginscan, so is still NULL */
315         if (so)
316         {
317                 /* release any pins we still hold */
318                 if (BufferIsValid(so->hashso_curbuf))
319                         _hash_dropbuf(rel, so->hashso_curbuf);
320                 so->hashso_curbuf = InvalidBuffer;
321
322                 if (BufferIsValid(so->hashso_mrkbuf))
323                         _hash_dropbuf(rel, so->hashso_mrkbuf);
324                 so->hashso_mrkbuf = InvalidBuffer;
325
326                 /* release lock on bucket, too */
327                 if (so->hashso_bucket_blkno)
328                         _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
329                 so->hashso_bucket_blkno = 0;
330         }
331
332         /* set positions invalid (this will cause _hash_first call) */
333         ItemPointerSetInvalid(&(scan->currentItemData));
334         ItemPointerSetInvalid(&(scan->currentMarkData));
335
336         /* Update scan key, if a new one is given */
337         if (scankey && scan->numberOfKeys > 0)
338         {
339                 memmove(scan->keyData,
340                                 scankey,
341                                 scan->numberOfKeys * sizeof(ScanKeyData));
342                 if (so)
343                         so->hashso_bucket_valid = false;
344         }
345
346         PG_RETURN_VOID();
347 }
348
349 /*
350  *      hashendscan() -- close down a scan
351  */
352 Datum
353 hashendscan(PG_FUNCTION_ARGS)
354 {
355         IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
356         HashScanOpaque so = (HashScanOpaque) scan->opaque;
357         Relation        rel = scan->indexRelation;
358
359         /* don't need scan registered anymore */
360         _hash_dropscan(scan);
361
362         /* release any pins we still hold */
363         if (BufferIsValid(so->hashso_curbuf))
364                 _hash_dropbuf(rel, so->hashso_curbuf);
365         so->hashso_curbuf = InvalidBuffer;
366
367         if (BufferIsValid(so->hashso_mrkbuf))
368                 _hash_dropbuf(rel, so->hashso_mrkbuf);
369         so->hashso_mrkbuf = InvalidBuffer;
370
371         /* release lock on bucket, too */
372         if (so->hashso_bucket_blkno)
373                 _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
374         so->hashso_bucket_blkno = 0;
375
376         /* be tidy */
377         ItemPointerSetInvalid(&(scan->currentItemData));
378         ItemPointerSetInvalid(&(scan->currentMarkData));
379
380         pfree(so);
381         scan->opaque = NULL;
382
383         PG_RETURN_VOID();
384 }
385
386 /*
387  *      hashmarkpos() -- save current scan position
388  */
389 Datum
390 hashmarkpos(PG_FUNCTION_ARGS)
391 {
392         IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
393         HashScanOpaque so = (HashScanOpaque) scan->opaque;
394         Relation        rel = scan->indexRelation;
395
396         /* release pin on old marked data, if any */
397         if (BufferIsValid(so->hashso_mrkbuf))
398                 _hash_dropbuf(rel, so->hashso_mrkbuf);
399         so->hashso_mrkbuf = InvalidBuffer;
400         ItemPointerSetInvalid(&(scan->currentMarkData));
401
402         /* bump pin count on currentItemData and copy to currentMarkData */
403         if (ItemPointerIsValid(&(scan->currentItemData)))
404         {
405                 so->hashso_mrkbuf = _hash_getbuf(rel,
406                                                                  BufferGetBlockNumber(so->hashso_curbuf),
407                                                                                  HASH_NOLOCK);
408                 scan->currentMarkData = scan->currentItemData;
409         }
410
411         PG_RETURN_VOID();
412 }
413
414 /*
415  *      hashrestrpos() -- restore scan to last saved position
416  */
417 Datum
418 hashrestrpos(PG_FUNCTION_ARGS)
419 {
420         IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
421         HashScanOpaque so = (HashScanOpaque) scan->opaque;
422         Relation        rel = scan->indexRelation;
423
424         /* release pin on current data, if any */
425         if (BufferIsValid(so->hashso_curbuf))
426                 _hash_dropbuf(rel, so->hashso_curbuf);
427         so->hashso_curbuf = InvalidBuffer;
428         ItemPointerSetInvalid(&(scan->currentItemData));
429
430         /* bump pin count on currentMarkData and copy to currentItemData */
431         if (ItemPointerIsValid(&(scan->currentMarkData)))
432         {
433                 so->hashso_curbuf = _hash_getbuf(rel,
434                                                                  BufferGetBlockNumber(so->hashso_mrkbuf),
435                                                                                  HASH_NOLOCK);
436                 scan->currentItemData = scan->currentMarkData;
437         }
438
439         PG_RETURN_VOID();
440 }
441
442 /*
443  * Bulk deletion of all index entries pointing to a set of heap tuples.
444  * The set of target tuples is specified via a callback routine that tells
445  * whether any given heap tuple (identified by ItemPointer) is being deleted.
446  *
447  * Result: a palloc'd struct containing statistical info for VACUUM displays.
448  */
449 Datum
450 hashbulkdelete(PG_FUNCTION_ARGS)
451 {
452         Relation        rel = (Relation) PG_GETARG_POINTER(0);
453         IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
454         void       *callback_state = (void *) PG_GETARG_POINTER(2);
455         IndexBulkDeleteResult *result;
456         BlockNumber num_pages;
457         double          tuples_removed;
458         double          num_index_tuples;
459         double          orig_ntuples;
460         Bucket          orig_maxbucket;
461         Bucket          cur_maxbucket;
462         Bucket          cur_bucket;
463         Buffer          metabuf;
464         HashMetaPage metap;
465         HashMetaPageData local_metapage;
466
467         tuples_removed = 0;
468         num_index_tuples = 0;
469
470         /*
471          * Read the metapage to fetch original bucket and tuple counts.  Also,
472          * we keep a copy of the last-seen metapage so that we can use its
473          * hashm_spares[] values to compute bucket page addresses.  This is a
474          * bit hokey but perfectly safe, since the interesting entries in the
475          * spares array cannot change under us; and it beats rereading the
476          * metapage for each bucket.
477          */
478         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
479         metap = (HashMetaPage) BufferGetPage(metabuf);
480         _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
481         orig_maxbucket = metap->hashm_maxbucket;
482         orig_ntuples = metap->hashm_ntuples;
483         memcpy(&local_metapage, metap, sizeof(local_metapage));
484         _hash_relbuf(rel, metabuf);
485
486         /* Scan the buckets that we know exist */
487         cur_bucket = 0;
488         cur_maxbucket = orig_maxbucket;
489
490 loop_top:
491         while (cur_bucket <= cur_maxbucket)
492         {
493                 BlockNumber bucket_blkno;
494                 BlockNumber blkno;
495                 bool            bucket_dirty = false;
496
497                 /* Get address of bucket's start page */
498                 bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
499
500                 /* Exclusive-lock the bucket so we can shrink it */
501                 _hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE);
502
503                 /* Shouldn't have any active scans locally, either */
504                 if (_hash_has_active_scan(rel, cur_bucket))
505                         elog(ERROR, "hash index has active scan during VACUUM");
506
507                 /* Scan each page in bucket */
508                 blkno = bucket_blkno;
509                 while (BlockNumberIsValid(blkno))
510                 {
511                         Buffer          buf;
512                         Page            page;
513                         HashPageOpaque opaque;
514                         OffsetNumber offno;
515                         OffsetNumber maxoffno;
516                         bool            page_dirty = false;
517
518                         vacuum_delay_point();
519
520                         buf = _hash_getbuf(rel, blkno, HASH_WRITE);
521                         page = BufferGetPage(buf);
522                         _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
523                         opaque = (HashPageOpaque) PageGetSpecialPointer(page);
524                         Assert(opaque->hasho_bucket == cur_bucket);
525
526                         /* Scan each tuple in page */
527                         offno = FirstOffsetNumber;
528                         maxoffno = PageGetMaxOffsetNumber(page);
529                         while (offno <= maxoffno)
530                         {
531                                 HashItem        hitem;
532                                 ItemPointer htup;
533
534                                 hitem = (HashItem) PageGetItem(page,
535                                                                                            PageGetItemId(page, offno));
536                                 htup = &(hitem->hash_itup.t_tid);
537                                 if (callback(htup, callback_state))
538                                 {
539                                         /* delete the item from the page */
540                                         PageIndexTupleDelete(page, offno);
541                                         bucket_dirty = page_dirty = true;
542
543                                         /* don't increment offno, instead decrement maxoffno */
544                                         maxoffno = OffsetNumberPrev(maxoffno);
545
546                                         tuples_removed += 1;
547                                 }
548                                 else
549                                 {
550                                         offno = OffsetNumberNext(offno);
551
552                                         num_index_tuples += 1;
553                                 }
554                         }
555
556                         /*
557                          * Write page if needed, advance to next page.
558                          */
559                         blkno = opaque->hasho_nextblkno;
560
561                         if (page_dirty)
562                                 _hash_wrtbuf(rel, buf);
563                         else
564                                 _hash_relbuf(rel, buf);
565                 }
566
567                 /* If we deleted anything, try to compact free space */
568                 if (bucket_dirty)
569                         _hash_squeezebucket(rel, cur_bucket, bucket_blkno);
570
571                 /* Release bucket lock */
572                 _hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE);
573
574                 /* Advance to next bucket */
575                 cur_bucket++;
576         }
577
578         /* Write-lock metapage and check for split since we started */
579         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
580         metap = (HashMetaPage) BufferGetPage(metabuf);
581         _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
582
583         if (cur_maxbucket != metap->hashm_maxbucket)
584         {
585                 /* There's been a split, so process the additional bucket(s) */
586                 cur_maxbucket = metap->hashm_maxbucket;
587                 memcpy(&local_metapage, metap, sizeof(local_metapage));
588                 _hash_relbuf(rel, metabuf);
589                 goto loop_top;
590         }
591
592         /* Okay, we're really done.  Update tuple count in metapage. */
593
594         if (orig_maxbucket == metap->hashm_maxbucket &&
595                 orig_ntuples == metap->hashm_ntuples)
596         {
597                 /*
598                  * No one has split or inserted anything since start of scan,
599                  * so believe our count as gospel.
600                  */
601                 metap->hashm_ntuples = num_index_tuples;
602         }
603         else
604         {
605                 /*
606                  * Otherwise, our count is untrustworthy since we may have
607                  * double-scanned tuples in split buckets.  Proceed by
608                  * dead-reckoning.
609                  */
610                 if (metap->hashm_ntuples > tuples_removed)
611                         metap->hashm_ntuples -= tuples_removed;
612                 else
613                         metap->hashm_ntuples = 0;
614                 num_index_tuples = metap->hashm_ntuples;
615         }
616
617         _hash_wrtbuf(rel, metabuf);
618
619         /* return statistics */
620         num_pages = RelationGetNumberOfBlocks(rel);
621
622         result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
623         result->num_pages = num_pages;
624         result->num_index_tuples = num_index_tuples;
625         result->tuples_removed = tuples_removed;
626
627         PG_RETURN_POINTER(result);
628 }
629
630
631 void
632 hash_redo(XLogRecPtr lsn, XLogRecord *record)
633 {
634         elog(PANIC, "hash_redo: unimplemented");
635 }
636
637 void
638 hash_undo(XLogRecPtr lsn, XLogRecord *record)
639 {
640         elog(PANIC, "hash_undo: unimplemented");
641 }
642
643 void
644 hash_desc(char *buf, uint8 xl_info, char *rec)
645 {
646 }