OSDN Git Service

f87eadcdec2dbd3c9ca95859ef5f4cb899e5bb28
[pg-rex/syncrep.git] / src / backend / access / nbtree / nbtutils.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtutils.c
4  *        Utility code for Postgres btree implementation.
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/nbtree/nbtutils.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include <time.h>
19
20 #include "access/genam.h"
21 #include "access/nbtree.h"
22 #include "access/reloptions.h"
23 #include "access/relscan.h"
24 #include "executor/execdebug.h"
25 #include "miscadmin.h"
26 #include "storage/bufmgr.h"
27 #include "storage/lwlock.h"
28 #include "storage/shmem.h"
29 #include "utils/lsyscache.h"
30
31
32 static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
33                                                  ScanKey leftarg, ScanKey rightarg,
34                                                  bool *result);
35 static bool _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption);
36 static void _bt_mark_scankey_required(ScanKey skey);
37 static bool _bt_check_rowcompare(ScanKey skey,
38                                          IndexTuple tuple, TupleDesc tupdesc,
39                                          ScanDirection dir, bool *continuescan);
40
41
42 /*
43  * _bt_mkscankey
44  *              Build an insertion scan key that contains comparison data from itup
45  *              as well as comparator routines appropriate to the key datatypes.
46  *
47  *              The result is intended for use with _bt_compare().
48  */
49 ScanKey
50 _bt_mkscankey(Relation rel, IndexTuple itup)
51 {
52         ScanKey         skey;
53         TupleDesc       itupdesc;
54         int                     natts;
55         int16      *indoption;
56         int                     i;
57
58         itupdesc = RelationGetDescr(rel);
59         natts = RelationGetNumberOfAttributes(rel);
60         indoption = rel->rd_indoption;
61
62         skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
63
64         for (i = 0; i < natts; i++)
65         {
66                 FmgrInfo   *procinfo;
67                 Datum           arg;
68                 bool            null;
69                 int                     flags;
70
71                 /*
72                  * We can use the cached (default) support procs since no cross-type
73                  * comparison can be needed.
74                  */
75                 procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
76                 arg = index_getattr(itup, i + 1, itupdesc, &null);
77                 flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
78                 ScanKeyEntryInitializeWithInfo(&skey[i],
79                                                                            flags,
80                                                                            (AttrNumber) (i + 1),
81                                                                            InvalidStrategy,
82                                                                            InvalidOid,
83                                                                            rel->rd_indcollation[i],
84                                                                            procinfo,
85                                                                            arg);
86         }
87
88         return skey;
89 }
90
91 /*
92  * _bt_mkscankey_nodata
93  *              Build an insertion scan key that contains 3-way comparator routines
94  *              appropriate to the key datatypes, but no comparison data.  The
95  *              comparison data ultimately used must match the key datatypes.
96  *
97  *              The result cannot be used with _bt_compare(), unless comparison
98  *              data is first stored into the key entries.      Currently this
99  *              routine is only called by nbtsort.c and tuplesort.c, which have
100  *              their own comparison routines.
101  */
102 ScanKey
103 _bt_mkscankey_nodata(Relation rel)
104 {
105         ScanKey         skey;
106         int                     natts;
107         int16      *indoption;
108         int                     i;
109
110         natts = RelationGetNumberOfAttributes(rel);
111         indoption = rel->rd_indoption;
112
113         skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
114
115         for (i = 0; i < natts; i++)
116         {
117                 FmgrInfo   *procinfo;
118                 int                     flags;
119
120                 /*
121                  * We can use the cached (default) support procs since no cross-type
122                  * comparison can be needed.
123                  */
124                 procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
125                 flags = SK_ISNULL | (indoption[i] << SK_BT_INDOPTION_SHIFT);
126                 ScanKeyEntryInitializeWithInfo(&skey[i],
127                                                                            flags,
128                                                                            (AttrNumber) (i + 1),
129                                                                            InvalidStrategy,
130                                                                            InvalidOid,
131                                                                            rel->rd_indcollation[i],
132                                                                            procinfo,
133                                                                            (Datum) 0);
134         }
135
136         return skey;
137 }
138
139 /*
140  * free a scan key made by either _bt_mkscankey or _bt_mkscankey_nodata.
141  */
142 void
143 _bt_freeskey(ScanKey skey)
144 {
145         pfree(skey);
146 }
147
148 /*
149  * free a retracement stack made by _bt_search.
150  */
151 void
152 _bt_freestack(BTStack stack)
153 {
154         BTStack         ostack;
155
156         while (stack != NULL)
157         {
158                 ostack = stack;
159                 stack = stack->bts_parent;
160                 pfree(ostack);
161         }
162 }
163
164
165 /*
166  *      _bt_preprocess_keys() -- Preprocess scan keys
167  *
168  * The caller-supplied search-type keys (in scan->keyData[]) are copied to
169  * so->keyData[] with possible transformation.  scan->numberOfKeys is
170  * the number of input keys, so->numberOfKeys gets the number of output
171  * keys (possibly less, never greater).
172  *
173  * The output keys are marked with additional sk_flag bits beyond the
174  * system-standard bits supplied by the caller.  The DESC and NULLS_FIRST
175  * indoption bits for the relevant index attribute are copied into the flags.
176  * Also, for a DESC column, we commute (flip) all the sk_strategy numbers
177  * so that the index sorts in the desired direction.
178  *
179  * One key purpose of this routine is to discover how many scan keys
180  * must be satisfied to continue the scan.      It also attempts to eliminate
181  * redundant keys and detect contradictory keys.  (If the index opfamily
182  * provides incomplete sets of cross-type operators, we may fail to detect
183  * redundant or contradictory keys, but we can survive that.)
184  *
185  * The output keys must be sorted by index attribute.  Presently we expect
186  * (but verify) that the input keys are already so sorted --- this is done
187  * by group_clauses_by_indexkey() in indxpath.c.  Some reordering of the keys
188  * within each attribute may be done as a byproduct of the processing here,
189  * but no other code depends on that.
190  *
191  * The output keys are marked with flags SK_BT_REQFWD and/or SK_BT_REQBKWD
192  * if they must be satisfied in order to continue the scan forward or backward
193  * respectively.  _bt_checkkeys uses these flags.  For example, if the quals
194  * are "x = 1 AND y < 4 AND z < 5", then _bt_checkkeys will reject a tuple
195  * (1,2,7), but we must continue the scan in case there are tuples (1,3,z).
196  * But once we reach tuples like (1,4,z) we can stop scanning because no
197  * later tuples could match.  This is reflected by marking the x and y keys,
198  * but not the z key, with SK_BT_REQFWD.  In general, the keys for leading
199  * attributes with "=" keys are marked both SK_BT_REQFWD and SK_BT_REQBKWD.
200  * For the first attribute without an "=" key, any "<" and "<=" keys are
201  * marked SK_BT_REQFWD while any ">" and ">=" keys are marked SK_BT_REQBKWD.
202  * This can be seen to be correct by considering the above example.  Note
203  * in particular that if there are no keys for a given attribute, the keys for
204  * subsequent attributes can never be required; for instance "WHERE y = 4"
205  * requires a full-index scan.
206  *
207  * If possible, redundant keys are eliminated: we keep only the tightest
208  * >/>= bound and the tightest </<= bound, and if there's an = key then
209  * that's the only one returned.  (So, we return either a single = key,
210  * or one or two boundary-condition keys for each attr.)  However, if we
211  * cannot compare two keys for lack of a suitable cross-type operator,
212  * we cannot eliminate either.  If there are two such keys of the same
213  * operator strategy, the second one is just pushed into the output array
214  * without further processing here.  We may also emit both >/>= or both
215  * </<= keys if we can't compare them.  The logic about required keys still
216  * works if we don't eliminate redundant keys.
217  *
218  * As a byproduct of this work, we can detect contradictory quals such
219  * as "x = 1 AND x > 2".  If we see that, we return so->qual_ok = FALSE,
220  * indicating the scan need not be run at all since no tuples can match.
221  * (In this case we do not bother completing the output key array!)
222  * Again, missing cross-type operators might cause us to fail to prove the
223  * quals contradictory when they really are, but the scan will work correctly.
224  *
225  * Row comparison keys are currently also treated without any smarts:
226  * we just transfer them into the preprocessed array without any
227  * editorialization.  We can treat them the same as an ordinary inequality
228  * comparison on the row's first index column, for the purposes of the logic
229  * about required keys.
230  *
231  * Note: the reason we have to copy the preprocessed scan keys into private
232  * storage is that we are modifying the array based on comparisons of the
233  * key argument values, which could change on a rescan.  Therefore we can't
234  * overwrite the caller's data structure.
235  */
236 void
237 _bt_preprocess_keys(IndexScanDesc scan)
238 {
239         BTScanOpaque so = (BTScanOpaque) scan->opaque;
240         int                     numberOfKeys = scan->numberOfKeys;
241         int16      *indoption = scan->indexRelation->rd_indoption;
242         int                     new_numberOfKeys;
243         int                     numberOfEqualCols;
244         ScanKey         inkeys;
245         ScanKey         outkeys;
246         ScanKey         cur;
247         ScanKey         xform[BTMaxStrategyNumber];
248         bool            test_result;
249         int                     i,
250                                 j;
251         AttrNumber      attno;
252
253         /* initialize result variables */
254         so->qual_ok = true;
255         so->numberOfKeys = 0;
256
257         if (numberOfKeys < 1)
258                 return;                                 /* done if qual-less scan */
259
260         inkeys = scan->keyData;
261         outkeys = so->keyData;
262         cur = &inkeys[0];
263         /* we check that input keys are correctly ordered */
264         if (cur->sk_attno < 1)
265                 elog(ERROR, "btree index keys must be ordered by attribute");
266
267         /* We can short-circuit most of the work if there's just one key */
268         if (numberOfKeys == 1)
269         {
270                 /* Apply indoption to scankey (might change sk_strategy!) */
271                 if (!_bt_fix_scankey_strategy(cur, indoption))
272                         so->qual_ok = false;
273                 memcpy(outkeys, cur, sizeof(ScanKeyData));
274                 so->numberOfKeys = 1;
275                 /* We can mark the qual as required if it's for first index col */
276                 if (cur->sk_attno == 1)
277                         _bt_mark_scankey_required(outkeys);
278                 return;
279         }
280
281         /*
282          * Otherwise, do the full set of pushups.
283          */
284         new_numberOfKeys = 0;
285         numberOfEqualCols = 0;
286
287         /*
288          * Initialize for processing of keys for attr 1.
289          *
290          * xform[i] points to the currently best scan key of strategy type i+1; it
291          * is NULL if we haven't yet found such a key for this attr.
292          */
293         attno = 1;
294         memset(xform, 0, sizeof(xform));
295
296         /*
297          * Loop iterates from 0 to numberOfKeys inclusive; we use the last pass to
298          * handle after-last-key processing.  Actual exit from the loop is at the
299          * "break" statement below.
300          */
301         for (i = 0;; cur++, i++)
302         {
303                 if (i < numberOfKeys)
304                 {
305                         /* Apply indoption to scankey (might change sk_strategy!) */
306                         if (!_bt_fix_scankey_strategy(cur, indoption))
307                         {
308                                 /* NULL can't be matched, so give up */
309                                 so->qual_ok = false;
310                                 return;
311                         }
312                 }
313
314                 /*
315                  * If we are at the end of the keys for a particular attr, finish up
316                  * processing and emit the cleaned-up keys.
317                  */
318                 if (i == numberOfKeys || cur->sk_attno != attno)
319                 {
320                         int                     priorNumberOfEqualCols = numberOfEqualCols;
321
322                         /* check input keys are correctly ordered */
323                         if (i < numberOfKeys && cur->sk_attno < attno)
324                                 elog(ERROR, "btree index keys must be ordered by attribute");
325
326                         /*
327                          * If = has been specified, all other keys can be eliminated as
328                          * redundant.  If we have a case like key = 1 AND key > 2, we can
329                          * set qual_ok to false and abandon further processing.
330                          *
331                          * We also have to deal with the case of "key IS NULL", which is
332                          * unsatisfiable in combination with any other index condition.
333                          * By the time we get here, that's been classified as an equality
334                          * check, and we've rejected any combination of it with a regular
335                          * equality condition; but not with other types of conditions.
336                          */
337                         if (xform[BTEqualStrategyNumber - 1])
338                         {
339                                 ScanKey         eq = xform[BTEqualStrategyNumber - 1];
340
341                                 for (j = BTMaxStrategyNumber; --j >= 0;)
342                                 {
343                                         ScanKey         chk = xform[j];
344
345                                         if (!chk || j == (BTEqualStrategyNumber - 1))
346                                                 continue;
347
348                                         if (eq->sk_flags & SK_SEARCHNULL)
349                                         {
350                                                 /* IS NULL is contradictory to anything else */
351                                                 so->qual_ok = false;
352                                                 return;
353                                         }
354
355                                         if (_bt_compare_scankey_args(scan, chk, eq, chk,
356                                                                                                  &test_result))
357                                         {
358                                                 if (!test_result)
359                                                 {
360                                                         /* keys proven mutually contradictory */
361                                                         so->qual_ok = false;
362                                                         return;
363                                                 }
364                                                 /* else discard the redundant non-equality key */
365                                                 xform[j] = NULL;
366                                         }
367                                         /* else, cannot determine redundancy, keep both keys */
368                                 }
369                                 /* track number of attrs for which we have "=" keys */
370                                 numberOfEqualCols++;
371                         }
372
373                         /* try to keep only one of <, <= */
374                         if (xform[BTLessStrategyNumber - 1]
375                                 && xform[BTLessEqualStrategyNumber - 1])
376                         {
377                                 ScanKey         lt = xform[BTLessStrategyNumber - 1];
378                                 ScanKey         le = xform[BTLessEqualStrategyNumber - 1];
379
380                                 if (_bt_compare_scankey_args(scan, le, lt, le,
381                                                                                          &test_result))
382                                 {
383                                         if (test_result)
384                                                 xform[BTLessEqualStrategyNumber - 1] = NULL;
385                                         else
386                                                 xform[BTLessStrategyNumber - 1] = NULL;
387                                 }
388                         }
389
390                         /* try to keep only one of >, >= */
391                         if (xform[BTGreaterStrategyNumber - 1]
392                                 && xform[BTGreaterEqualStrategyNumber - 1])
393                         {
394                                 ScanKey         gt = xform[BTGreaterStrategyNumber - 1];
395                                 ScanKey         ge = xform[BTGreaterEqualStrategyNumber - 1];
396
397                                 if (_bt_compare_scankey_args(scan, ge, gt, ge,
398                                                                                          &test_result))
399                                 {
400                                         if (test_result)
401                                                 xform[BTGreaterEqualStrategyNumber - 1] = NULL;
402                                         else
403                                                 xform[BTGreaterStrategyNumber - 1] = NULL;
404                                 }
405                         }
406
407                         /*
408                          * Emit the cleaned-up keys into the outkeys[] array, and then
409                          * mark them if they are required.      They are required (possibly
410                          * only in one direction) if all attrs before this one had "=".
411                          */
412                         for (j = BTMaxStrategyNumber; --j >= 0;)
413                         {
414                                 if (xform[j])
415                                 {
416                                         ScanKey         outkey = &outkeys[new_numberOfKeys++];
417
418                                         memcpy(outkey, xform[j], sizeof(ScanKeyData));
419                                         if (priorNumberOfEqualCols == attno - 1)
420                                                 _bt_mark_scankey_required(outkey);
421                                 }
422                         }
423
424                         /*
425                          * Exit loop here if done.
426                          */
427                         if (i == numberOfKeys)
428                                 break;
429
430                         /* Re-initialize for new attno */
431                         attno = cur->sk_attno;
432                         memset(xform, 0, sizeof(xform));
433                 }
434
435                 /* check strategy this key's operator corresponds to */
436                 j = cur->sk_strategy - 1;
437
438                 /* if row comparison, push it directly to the output array */
439                 if (cur->sk_flags & SK_ROW_HEADER)
440                 {
441                         ScanKey         outkey = &outkeys[new_numberOfKeys++];
442
443                         memcpy(outkey, cur, sizeof(ScanKeyData));
444                         if (numberOfEqualCols == attno - 1)
445                                 _bt_mark_scankey_required(outkey);
446
447                         /*
448                          * We don't support RowCompare using equality; such a qual would
449                          * mess up the numberOfEqualCols tracking.
450                          */
451                         Assert(j != (BTEqualStrategyNumber - 1));
452                         continue;
453                 }
454
455                 /* have we seen one of these before? */
456                 if (xform[j] == NULL)
457                 {
458                         /* nope, so remember this scankey */
459                         xform[j] = cur;
460                 }
461                 else
462                 {
463                         /* yup, keep only the more restrictive key */
464                         if (_bt_compare_scankey_args(scan, cur, cur, xform[j],
465                                                                                  &test_result))
466                         {
467                                 if (test_result)
468                                         xform[j] = cur;
469                                 else if (j == (BTEqualStrategyNumber - 1))
470                                 {
471                                         /* key == a && key == b, but a != b */
472                                         so->qual_ok = false;
473                                         return;
474                                 }
475                                 /* else old key is more restrictive, keep it */
476                         }
477                         else
478                         {
479                                 /*
480                                  * We can't determine which key is more restrictive.  Keep the
481                                  * previous one in xform[j] and push this one directly to the
482                                  * output array.
483                                  */
484                                 ScanKey         outkey = &outkeys[new_numberOfKeys++];
485
486                                 memcpy(outkey, cur, sizeof(ScanKeyData));
487                                 if (numberOfEqualCols == attno - 1)
488                                         _bt_mark_scankey_required(outkey);
489                         }
490                 }
491         }
492
493         so->numberOfKeys = new_numberOfKeys;
494 }
495
496 /*
497  * Compare two scankey values using a specified operator.
498  *
499  * The test we want to perform is logically "leftarg op rightarg", where
500  * leftarg and rightarg are the sk_argument values in those ScanKeys, and
501  * the comparison operator is the one in the op ScanKey.  However, in
502  * cross-data-type situations we may need to look up the correct operator in
503  * the index's opfamily: it is the one having amopstrategy = op->sk_strategy
504  * and amoplefttype/amoprighttype equal to the two argument datatypes.
505  *
506  * If the opfamily doesn't supply a complete set of cross-type operators we
507  * may not be able to make the comparison.      If we can make the comparison
508  * we store the operator result in *result and return TRUE.  We return FALSE
509  * if the comparison could not be made.
510  *
511  * Note: op always points at the same ScanKey as either leftarg or rightarg.
512  * Since we don't scribble on the scankeys, this aliasing should cause no
513  * trouble.
514  *
515  * Note: this routine needs to be insensitive to any DESC option applied
516  * to the index column.  For example, "x < 4" is a tighter constraint than
517  * "x < 5" regardless of which way the index is sorted.
518  */
519 static bool
520 _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
521                                                  ScanKey leftarg, ScanKey rightarg,
522                                                  bool *result)
523 {
524         Relation        rel = scan->indexRelation;
525         Oid                     lefttype,
526                                 righttype,
527                                 optype,
528                                 opcintype,
529                                 cmp_op;
530         StrategyNumber strat;
531
532         /*
533          * First, deal with cases where one or both args are NULL.      This should
534          * only happen when the scankeys represent IS NULL/NOT NULL conditions.
535          */
536         if ((leftarg->sk_flags | rightarg->sk_flags) & SK_ISNULL)
537         {
538                 bool            leftnull,
539                                         rightnull;
540
541                 if (leftarg->sk_flags & SK_ISNULL)
542                 {
543                         Assert(leftarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
544                         leftnull = true;
545                 }
546                 else
547                         leftnull = false;
548                 if (rightarg->sk_flags & SK_ISNULL)
549                 {
550                         Assert(rightarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
551                         rightnull = true;
552                 }
553                 else
554                         rightnull = false;
555
556                 /*
557                  * We treat NULL as either greater than or less than all other values.
558                  * Since true > false, the tests below work correctly for NULLS LAST
559                  * logic.  If the index is NULLS FIRST, we need to flip the strategy.
560                  */
561                 strat = op->sk_strategy;
562                 if (op->sk_flags & SK_BT_NULLS_FIRST)
563                         strat = BTCommuteStrategyNumber(strat);
564
565                 switch (strat)
566                 {
567                         case BTLessStrategyNumber:
568                                 *result = (leftnull < rightnull);
569                                 break;
570                         case BTLessEqualStrategyNumber:
571                                 *result = (leftnull <= rightnull);
572                                 break;
573                         case BTEqualStrategyNumber:
574                                 *result = (leftnull == rightnull);
575                                 break;
576                         case BTGreaterEqualStrategyNumber:
577                                 *result = (leftnull >= rightnull);
578                                 break;
579                         case BTGreaterStrategyNumber:
580                                 *result = (leftnull > rightnull);
581                                 break;
582                         default:
583                                 elog(ERROR, "unrecognized StrategyNumber: %d", (int) strat);
584                                 *result = false;        /* keep compiler quiet */
585                                 break;
586                 }
587                 return true;
588         }
589
590         /*
591          * The opfamily we need to worry about is identified by the index column.
592          */
593         Assert(leftarg->sk_attno == rightarg->sk_attno);
594
595         opcintype = rel->rd_opcintype[leftarg->sk_attno - 1];
596
597         /*
598          * Determine the actual datatypes of the ScanKey arguments.  We have to
599          * support the convention that sk_subtype == InvalidOid means the opclass
600          * input type; this is a hack to simplify life for ScanKeyInit().
601          */
602         lefttype = leftarg->sk_subtype;
603         if (lefttype == InvalidOid)
604                 lefttype = opcintype;
605         righttype = rightarg->sk_subtype;
606         if (righttype == InvalidOid)
607                 righttype = opcintype;
608         optype = op->sk_subtype;
609         if (optype == InvalidOid)
610                 optype = opcintype;
611
612         /*
613          * If leftarg and rightarg match the types expected for the "op" scankey,
614          * we can use its already-looked-up comparison function.
615          */
616         if (lefttype == opcintype && righttype == optype)
617         {
618                 *result = DatumGetBool(FunctionCall2Coll(&op->sk_func,
619                                                                                                  op->sk_collation,
620                                                                                                  leftarg->sk_argument,
621                                                                                                  rightarg->sk_argument));
622                 return true;
623         }
624
625         /*
626          * Otherwise, we need to go to the syscache to find the appropriate
627          * operator.  (This cannot result in infinite recursion, since no
628          * indexscan initiated by syscache lookup will use cross-data-type
629          * operators.)
630          *
631          * If the sk_strategy was flipped by _bt_fix_scankey_strategy, we have to
632          * un-flip it to get the correct opfamily member.
633          */
634         strat = op->sk_strategy;
635         if (op->sk_flags & SK_BT_DESC)
636                 strat = BTCommuteStrategyNumber(strat);
637
638         cmp_op = get_opfamily_member(rel->rd_opfamily[leftarg->sk_attno - 1],
639                                                                  lefttype,
640                                                                  righttype,
641                                                                  strat);
642         if (OidIsValid(cmp_op))
643         {
644                 RegProcedure cmp_proc = get_opcode(cmp_op);
645
646                 if (RegProcedureIsValid(cmp_proc))
647                 {
648                         *result = DatumGetBool(OidFunctionCall2Coll(cmp_proc,
649                                                                                                                 op->sk_collation,
650                                                                                                                 leftarg->sk_argument,
651                                                                                                          rightarg->sk_argument));
652                         return true;
653                 }
654         }
655
656         /* Can't make the comparison */
657         *result = false;                        /* suppress compiler warnings */
658         return false;
659 }
660
661 /*
662  * Adjust a scankey's strategy and flags setting as needed for indoptions.
663  *
664  * We copy the appropriate indoption value into the scankey sk_flags
665  * (shifting to avoid clobbering system-defined flag bits).  Also, if
666  * the DESC option is set, commute (flip) the operator strategy number.
667  *
668  * A secondary purpose is to check for IS NULL/NOT NULL scankeys and set up
669  * the strategy field correctly for them.
670  *
671  * Lastly, for ordinary scankeys (not IS NULL/NOT NULL), we check for a
672  * NULL comparison value.  Since all btree operators are assumed strict,
673  * a NULL means that the qual cannot be satisfied.      We return TRUE if the
674  * comparison value isn't NULL, or FALSE if the scan should be abandoned.
675  *
676  * This function is applied to the *input* scankey structure; therefore
677  * on a rescan we will be looking at already-processed scankeys.  Hence
678  * we have to be careful not to re-commute the strategy if we already did it.
679  * It's a bit ugly to modify the caller's copy of the scankey but in practice
680  * there shouldn't be any problem, since the index's indoptions are certainly
681  * not going to change while the scankey survives.
682  */
683 static bool
684 _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption)
685 {
686         int                     addflags;
687
688         addflags = indoption[skey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
689
690         /*
691          * We treat all btree operators as strict (even if they're not so marked
692          * in pg_proc). This means that it is impossible for an operator condition
693          * with a NULL comparison constant to succeed, and we can reject it right
694          * away.
695          *
696          * However, we now also support "x IS NULL" clauses as search conditions,
697          * so in that case keep going. The planner has not filled in any
698          * particular strategy in this case, so set it to BTEqualStrategyNumber
699          * --- we can treat IS NULL as an equality operator for purposes of search
700          * strategy.
701          *
702          * Likewise, "x IS NOT NULL" is supported.      We treat that as either "less
703          * than NULL" in a NULLS LAST index, or "greater than NULL" in a NULLS
704          * FIRST index.
705          *
706          * Note: someday we might have to fill in sk_collation from the index
707          * column's collation.  At the moment this is a non-issue because we'll
708          * never actually call the comparison operator on a NULL.
709          */
710         if (skey->sk_flags & SK_ISNULL)
711         {
712                 /* SK_ISNULL shouldn't be set in a row header scankey */
713                 Assert(!(skey->sk_flags & SK_ROW_HEADER));
714
715                 /* Set indoption flags in scankey (might be done already) */
716                 skey->sk_flags |= addflags;
717
718                 /* Set correct strategy for IS NULL or NOT NULL search */
719                 if (skey->sk_flags & SK_SEARCHNULL)
720                 {
721                         skey->sk_strategy = BTEqualStrategyNumber;
722                         skey->sk_subtype = InvalidOid;
723                         skey->sk_collation = InvalidOid;
724                 }
725                 else if (skey->sk_flags & SK_SEARCHNOTNULL)
726                 {
727                         if (skey->sk_flags & SK_BT_NULLS_FIRST)
728                                 skey->sk_strategy = BTGreaterStrategyNumber;
729                         else
730                                 skey->sk_strategy = BTLessStrategyNumber;
731                         skey->sk_subtype = InvalidOid;
732                         skey->sk_collation = InvalidOid;
733                 }
734                 else
735                 {
736                         /* regular qual, so it cannot be satisfied */
737                         return false;
738                 }
739
740                 /* Needn't do the rest */
741                 return true;
742         }
743
744         /* Adjust strategy for DESC, if we didn't already */
745         if ((addflags & SK_BT_DESC) && !(skey->sk_flags & SK_BT_DESC))
746                 skey->sk_strategy = BTCommuteStrategyNumber(skey->sk_strategy);
747         skey->sk_flags |= addflags;
748
749         /* If it's a row header, fix row member flags and strategies similarly */
750         if (skey->sk_flags & SK_ROW_HEADER)
751         {
752                 ScanKey         subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
753
754                 for (;;)
755                 {
756                         Assert(subkey->sk_flags & SK_ROW_MEMBER);
757                         addflags = indoption[subkey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
758                         if ((addflags & SK_BT_DESC) && !(subkey->sk_flags & SK_BT_DESC))
759                                 subkey->sk_strategy = BTCommuteStrategyNumber(subkey->sk_strategy);
760                         subkey->sk_flags |= addflags;
761                         if (subkey->sk_flags & SK_ROW_END)
762                                 break;
763                         subkey++;
764                 }
765         }
766
767         return true;
768 }
769
770 /*
771  * Mark a scankey as "required to continue the scan".
772  *
773  * Depending on the operator type, the key may be required for both scan
774  * directions or just one.      Also, if the key is a row comparison header,
775  * we have to mark the appropriate subsidiary ScanKeys as required.  In
776  * such cases, the first subsidiary key is required, but subsequent ones
777  * are required only as long as they correspond to successive index columns
778  * and match the leading column as to sort direction.
779  * Otherwise the row comparison ordering is different from the index ordering
780  * and so we can't stop the scan on the basis of those lower-order columns.
781  *
782  * Note: when we set required-key flag bits in a subsidiary scankey, we are
783  * scribbling on a data structure belonging to the index AM's caller, not on
784  * our private copy.  This should be OK because the marking will not change
785  * from scan to scan within a query, and so we'd just re-mark the same way
786  * anyway on a rescan.  Something to keep an eye on though.
787  */
788 static void
789 _bt_mark_scankey_required(ScanKey skey)
790 {
791         int                     addflags;
792
793         switch (skey->sk_strategy)
794         {
795                 case BTLessStrategyNumber:
796                 case BTLessEqualStrategyNumber:
797                         addflags = SK_BT_REQFWD;
798                         break;
799                 case BTEqualStrategyNumber:
800                         addflags = SK_BT_REQFWD | SK_BT_REQBKWD;
801                         break;
802                 case BTGreaterEqualStrategyNumber:
803                 case BTGreaterStrategyNumber:
804                         addflags = SK_BT_REQBKWD;
805                         break;
806                 default:
807                         elog(ERROR, "unrecognized StrategyNumber: %d",
808                                  (int) skey->sk_strategy);
809                         addflags = 0;           /* keep compiler quiet */
810                         break;
811         }
812
813         skey->sk_flags |= addflags;
814
815         if (skey->sk_flags & SK_ROW_HEADER)
816         {
817                 ScanKey         subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
818                 AttrNumber      attno = skey->sk_attno;
819
820                 /* First subkey should be same as the header says */
821                 Assert(subkey->sk_attno == attno);
822
823                 for (;;)
824                 {
825                         Assert(subkey->sk_flags & SK_ROW_MEMBER);
826                         if (subkey->sk_attno != attno)
827                                 break;                  /* non-adjacent key, so not required */
828                         if (subkey->sk_strategy != skey->sk_strategy)
829                                 break;                  /* wrong direction, so not required */
830                         subkey->sk_flags |= addflags;
831                         if (subkey->sk_flags & SK_ROW_END)
832                                 break;
833                         subkey++;
834                         attno++;
835                 }
836         }
837 }
838
839 /*
840  * Test whether an indextuple satisfies all the scankey conditions.
841  *
842  * If so, copy its TID into scan->xs_ctup.t_self, and return TRUE.
843  * If not, return FALSE (xs_ctup is not changed).
844  *
845  * If the tuple fails to pass the qual, we also determine whether there's
846  * any need to continue the scan beyond this tuple, and set *continuescan
847  * accordingly.  See comments for _bt_preprocess_keys(), above, about how
848  * this is done.
849  *
850  * scan: index scan descriptor (containing a search-type scankey)
851  * page: buffer page containing index tuple
852  * offnum: offset number of index tuple (must be a valid item!)
853  * dir: direction we are scanning in
854  * continuescan: output parameter (will be set correctly in all cases)
855  */
856 bool
857 _bt_checkkeys(IndexScanDesc scan,
858                           Page page, OffsetNumber offnum,
859                           ScanDirection dir, bool *continuescan)
860 {
861         ItemId          iid = PageGetItemId(page, offnum);
862         bool            tuple_valid;
863         IndexTuple      tuple;
864         TupleDesc       tupdesc;
865         BTScanOpaque so;
866         int                     keysz;
867         int                     ikey;
868         ScanKey         key;
869
870         *continuescan = true;           /* default assumption */
871
872         /*
873          * If the scan specifies not to return killed tuples, then we treat a
874          * killed tuple as not passing the qual.  Most of the time, it's a win to
875          * not bother examining the tuple's index keys, but just return
876          * immediately with continuescan = true to proceed to the next tuple.
877          * However, if this is the last tuple on the page, we should check the
878          * index keys to prevent uselessly advancing to the next page.
879          */
880         if (scan->ignore_killed_tuples && ItemIdIsDead(iid))
881         {
882                 /* return immediately if there are more tuples on the page */
883                 if (ScanDirectionIsForward(dir))
884                 {
885                         if (offnum < PageGetMaxOffsetNumber(page))
886                                 return false;
887                 }
888                 else
889                 {
890                         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
891
892                         if (offnum > P_FIRSTDATAKEY(opaque))
893                                 return false;
894                 }
895
896                 /*
897                  * OK, we want to check the keys, but we'll return FALSE even if the
898                  * tuple passes the key tests.
899                  */
900                 tuple_valid = false;
901         }
902         else
903                 tuple_valid = true;
904
905         tuple = (IndexTuple) PageGetItem(page, iid);
906
907         tupdesc = RelationGetDescr(scan->indexRelation);
908         so = (BTScanOpaque) scan->opaque;
909         keysz = so->numberOfKeys;
910
911         for (key = so->keyData, ikey = 0; ikey < keysz; key++, ikey++)
912         {
913                 Datum           datum;
914                 bool            isNull;
915                 Datum           test;
916
917                 /* row-comparison keys need special processing */
918                 if (key->sk_flags & SK_ROW_HEADER)
919                 {
920                         if (_bt_check_rowcompare(key, tuple, tupdesc, dir, continuescan))
921                                 continue;
922                         return false;
923                 }
924
925                 datum = index_getattr(tuple,
926                                                           key->sk_attno,
927                                                           tupdesc,
928                                                           &isNull);
929
930                 if (key->sk_flags & SK_ISNULL)
931                 {
932                         /* Handle IS NULL/NOT NULL tests */
933                         if (key->sk_flags & SK_SEARCHNULL)
934                         {
935                                 if (isNull)
936                                         continue;       /* tuple satisfies this qual */
937                         }
938                         else
939                         {
940                                 Assert(key->sk_flags & SK_SEARCHNOTNULL);
941                                 if (!isNull)
942                                         continue;       /* tuple satisfies this qual */
943                         }
944
945                         /*
946                          * Tuple fails this qual.  If it's a required qual for the current
947                          * scan direction, then we can conclude no further tuples will
948                          * pass, either.
949                          */
950                         if ((key->sk_flags & SK_BT_REQFWD) &&
951                                 ScanDirectionIsForward(dir))
952                                 *continuescan = false;
953                         else if ((key->sk_flags & SK_BT_REQBKWD) &&
954                                          ScanDirectionIsBackward(dir))
955                                 *continuescan = false;
956
957                         /*
958                          * In any case, this indextuple doesn't match the qual.
959                          */
960                         return false;
961                 }
962
963                 if (isNull)
964                 {
965                         if (key->sk_flags & SK_BT_NULLS_FIRST)
966                         {
967                                 /*
968                                  * Since NULLs are sorted before non-NULLs, we know we have
969                                  * reached the lower limit of the range of values for this
970                                  * index attr.  On a backward scan, we can stop if this qual
971                                  * is one of the "must match" subset.  On a forward scan,
972                                  * however, we should keep going.
973                                  */
974                                 if ((key->sk_flags & SK_BT_REQBKWD) &&
975                                         ScanDirectionIsBackward(dir))
976                                         *continuescan = false;
977                         }
978                         else
979                         {
980                                 /*
981                                  * Since NULLs are sorted after non-NULLs, we know we have
982                                  * reached the upper limit of the range of values for this
983                                  * index attr.  On a forward scan, we can stop if this qual is
984                                  * one of the "must match" subset.      On a backward scan,
985                                  * however, we should keep going.
986                                  */
987                                 if ((key->sk_flags & SK_BT_REQFWD) &&
988                                         ScanDirectionIsForward(dir))
989                                         *continuescan = false;
990                         }
991
992                         /*
993                          * In any case, this indextuple doesn't match the qual.
994                          */
995                         return false;
996                 }
997
998                 test = FunctionCall2Coll(&key->sk_func, key->sk_collation,
999                                                                  datum, key->sk_argument);
1000
1001                 if (!DatumGetBool(test))
1002                 {
1003                         /*
1004                          * Tuple fails this qual.  If it's a required qual for the current
1005                          * scan direction, then we can conclude no further tuples will
1006                          * pass, either.
1007                          *
1008                          * Note: because we stop the scan as soon as any required equality
1009                          * qual fails, it is critical that equality quals be used for the
1010                          * initial positioning in _bt_first() when they are available. See
1011                          * comments in _bt_first().
1012                          */
1013                         if ((key->sk_flags & SK_BT_REQFWD) &&
1014                                 ScanDirectionIsForward(dir))
1015                                 *continuescan = false;
1016                         else if ((key->sk_flags & SK_BT_REQBKWD) &&
1017                                          ScanDirectionIsBackward(dir))
1018                                 *continuescan = false;
1019
1020                         /*
1021                          * In any case, this indextuple doesn't match the qual.
1022                          */
1023                         return false;
1024                 }
1025         }
1026
1027         /* If we get here, the tuple passes all index quals. */
1028         if (tuple_valid)
1029                 scan->xs_ctup.t_self = tuple->t_tid;
1030
1031         return tuple_valid;
1032 }
1033
1034 /*
1035  * Test whether an indextuple satisfies a row-comparison scan condition.
1036  *
1037  * Return true if so, false if not.  If not, also clear *continuescan if
1038  * it's not possible for any future tuples in the current scan direction
1039  * to pass the qual.
1040  *
1041  * This is a subroutine for _bt_checkkeys, which see for more info.
1042  */
1043 static bool
1044 _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc,
1045                                          ScanDirection dir, bool *continuescan)
1046 {
1047         ScanKey         subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1048         int32           cmpresult = 0;
1049         bool            result;
1050
1051         /* First subkey should be same as the header says */
1052         Assert(subkey->sk_attno == skey->sk_attno);
1053
1054         /* Loop over columns of the row condition */
1055         for (;;)
1056         {
1057                 Datum           datum;
1058                 bool            isNull;
1059
1060                 Assert(subkey->sk_flags & SK_ROW_MEMBER);
1061
1062                 datum = index_getattr(tuple,
1063                                                           subkey->sk_attno,
1064                                                           tupdesc,
1065                                                           &isNull);
1066
1067                 if (isNull)
1068                 {
1069                         if (subkey->sk_flags & SK_BT_NULLS_FIRST)
1070                         {
1071                                 /*
1072                                  * Since NULLs are sorted before non-NULLs, we know we have
1073                                  * reached the lower limit of the range of values for this
1074                                  * index attr. On a backward scan, we can stop if this qual is
1075                                  * one of the "must match" subset.      On a forward scan,
1076                                  * however, we should keep going.
1077                                  */
1078                                 if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1079                                         ScanDirectionIsBackward(dir))
1080                                         *continuescan = false;
1081                         }
1082                         else
1083                         {
1084                                 /*
1085                                  * Since NULLs are sorted after non-NULLs, we know we have
1086                                  * reached the upper limit of the range of values for this
1087                                  * index attr. On a forward scan, we can stop if this qual is
1088                                  * one of the "must match" subset.      On a backward scan,
1089                                  * however, we should keep going.
1090                                  */
1091                                 if ((subkey->sk_flags & SK_BT_REQFWD) &&
1092                                         ScanDirectionIsForward(dir))
1093                                         *continuescan = false;
1094                         }
1095
1096                         /*
1097                          * In any case, this indextuple doesn't match the qual.
1098                          */
1099                         return false;
1100                 }
1101
1102                 if (subkey->sk_flags & SK_ISNULL)
1103                 {
1104                         /*
1105                          * Unlike the simple-scankey case, this isn't a disallowed case.
1106                          * But it can never match.      If all the earlier row comparison
1107                          * columns are required for the scan direction, we can stop the
1108                          * scan, because there can't be another tuple that will succeed.
1109                          */
1110                         if (subkey != (ScanKey) DatumGetPointer(skey->sk_argument))
1111                                 subkey--;
1112                         if ((subkey->sk_flags & SK_BT_REQFWD) &&
1113                                 ScanDirectionIsForward(dir))
1114                                 *continuescan = false;
1115                         else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1116                                          ScanDirectionIsBackward(dir))
1117                                 *continuescan = false;
1118                         return false;
1119                 }
1120
1121                 /* Perform the test --- three-way comparison not bool operator */
1122                 cmpresult = DatumGetInt32(FunctionCall2Coll(&subkey->sk_func,
1123                                                                                                         subkey->sk_collation,
1124                                                                                                         datum,
1125                                                                                                         subkey->sk_argument));
1126
1127                 if (subkey->sk_flags & SK_BT_DESC)
1128                         cmpresult = -cmpresult;
1129
1130                 /* Done comparing if unequal, else advance to next column */
1131                 if (cmpresult != 0)
1132                         break;
1133
1134                 if (subkey->sk_flags & SK_ROW_END)
1135                         break;
1136                 subkey++;
1137         }
1138
1139         /*
1140          * At this point cmpresult indicates the overall result of the row
1141          * comparison, and subkey points to the deciding column (or the last
1142          * column if the result is "=").
1143          */
1144         switch (subkey->sk_strategy)
1145         {
1146                         /* EQ and NE cases aren't allowed here */
1147                 case BTLessStrategyNumber:
1148                         result = (cmpresult < 0);
1149                         break;
1150                 case BTLessEqualStrategyNumber:
1151                         result = (cmpresult <= 0);
1152                         break;
1153                 case BTGreaterEqualStrategyNumber:
1154                         result = (cmpresult >= 0);
1155                         break;
1156                 case BTGreaterStrategyNumber:
1157                         result = (cmpresult > 0);
1158                         break;
1159                 default:
1160                         elog(ERROR, "unrecognized RowCompareType: %d",
1161                                  (int) subkey->sk_strategy);
1162                         result = 0;                     /* keep compiler quiet */
1163                         break;
1164         }
1165
1166         if (!result)
1167         {
1168                 /*
1169                  * Tuple fails this qual.  If it's a required qual for the current
1170                  * scan direction, then we can conclude no further tuples will pass,
1171                  * either.      Note we have to look at the deciding column, not
1172                  * necessarily the first or last column of the row condition.
1173                  */
1174                 if ((subkey->sk_flags & SK_BT_REQFWD) &&
1175                         ScanDirectionIsForward(dir))
1176                         *continuescan = false;
1177                 else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1178                                  ScanDirectionIsBackward(dir))
1179                         *continuescan = false;
1180         }
1181
1182         return result;
1183 }
1184
1185 /*
1186  * _bt_killitems - set LP_DEAD state for items an indexscan caller has
1187  * told us were killed
1188  *
1189  * scan->so contains information about the current page and killed tuples
1190  * thereon (generally, this should only be called if so->numKilled > 0).
1191  *
1192  * The caller must have pin on so->currPos.buf, but may or may not have
1193  * read-lock, as indicated by haveLock.  Note that we assume read-lock
1194  * is sufficient for setting LP_DEAD status (which is only a hint).
1195  *
1196  * We match items by heap TID before assuming they are the right ones to
1197  * delete.      We cope with cases where items have moved right due to insertions.
1198  * If an item has moved off the current page due to a split, we'll fail to
1199  * find it and do nothing (this is not an error case --- we assume the item
1200  * will eventually get marked in a future indexscan).  Note that because we
1201  * hold pin on the target page continuously from initially reading the items
1202  * until applying this function, VACUUM cannot have deleted any items from
1203  * the page, and so there is no need to search left from the recorded offset.
1204  * (This observation also guarantees that the item is still the right one
1205  * to delete, which might otherwise be questionable since heap TIDs can get
1206  * recycled.)
1207  */
1208 void
1209 _bt_killitems(IndexScanDesc scan, bool haveLock)
1210 {
1211         BTScanOpaque so = (BTScanOpaque) scan->opaque;
1212         Page            page;
1213         BTPageOpaque opaque;
1214         OffsetNumber minoff;
1215         OffsetNumber maxoff;
1216         int                     i;
1217         bool            killedsomething = false;
1218
1219         Assert(BufferIsValid(so->currPos.buf));
1220
1221         if (!haveLock)
1222                 LockBuffer(so->currPos.buf, BT_READ);
1223
1224         page = BufferGetPage(so->currPos.buf);
1225         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1226         minoff = P_FIRSTDATAKEY(opaque);
1227         maxoff = PageGetMaxOffsetNumber(page);
1228
1229         for (i = 0; i < so->numKilled; i++)
1230         {
1231                 int                     itemIndex = so->killedItems[i];
1232                 BTScanPosItem *kitem = &so->currPos.items[itemIndex];
1233                 OffsetNumber offnum = kitem->indexOffset;
1234
1235                 Assert(itemIndex >= so->currPos.firstItem &&
1236                            itemIndex <= so->currPos.lastItem);
1237                 if (offnum < minoff)
1238                         continue;                       /* pure paranoia */
1239                 while (offnum <= maxoff)
1240                 {
1241                         ItemId          iid = PageGetItemId(page, offnum);
1242                         IndexTuple      ituple = (IndexTuple) PageGetItem(page, iid);
1243
1244                         if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
1245                         {
1246                                 /* found the item */
1247                                 ItemIdMarkDead(iid);
1248                                 killedsomething = true;
1249                                 break;                  /* out of inner search loop */
1250                         }
1251                         offnum = OffsetNumberNext(offnum);
1252                 }
1253         }
1254
1255         /*
1256          * Since this can be redone later if needed, it's treated the same as a
1257          * commit-hint-bit status update for heap tuples: we mark the buffer dirty
1258          * but don't make a WAL log entry.
1259          *
1260          * Whenever we mark anything LP_DEAD, we also set the page's
1261          * BTP_HAS_GARBAGE flag, which is likewise just a hint.
1262          */
1263         if (killedsomething)
1264         {
1265                 opaque->btpo_flags |= BTP_HAS_GARBAGE;
1266                 SetBufferCommitInfoNeedsSave(so->currPos.buf);
1267         }
1268
1269         if (!haveLock)
1270                 LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
1271
1272         /*
1273          * Always reset the scan state, so we don't look for same items on other
1274          * pages.
1275          */
1276         so->numKilled = 0;
1277 }
1278
1279
1280 /*
1281  * The following routines manage a shared-memory area in which we track
1282  * assignment of "vacuum cycle IDs" to currently-active btree vacuuming
1283  * operations.  There is a single counter which increments each time we
1284  * start a vacuum to assign it a cycle ID.      Since multiple vacuums could
1285  * be active concurrently, we have to track the cycle ID for each active
1286  * vacuum; this requires at most MaxBackends entries (usually far fewer).
1287  * We assume at most one vacuum can be active for a given index.
1288  *
1289  * Access to the shared memory area is controlled by BtreeVacuumLock.
1290  * In principle we could use a separate lmgr locktag for each index,
1291  * but a single LWLock is much cheaper, and given the short time that
1292  * the lock is ever held, the concurrency hit should be minimal.
1293  */
1294
1295 typedef struct BTOneVacInfo
1296 {
1297         LockRelId       relid;                  /* global identifier of an index */
1298         BTCycleId       cycleid;                /* cycle ID for its active VACUUM */
1299 } BTOneVacInfo;
1300
1301 typedef struct BTVacInfo
1302 {
1303         BTCycleId       cycle_ctr;              /* cycle ID most recently assigned */
1304         int                     num_vacuums;    /* number of currently active VACUUMs */
1305         int                     max_vacuums;    /* allocated length of vacuums[] array */
1306         BTOneVacInfo vacuums[1];        /* VARIABLE LENGTH ARRAY */
1307 } BTVacInfo;
1308
1309 static BTVacInfo *btvacinfo;
1310
1311
1312 /*
1313  * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index,
1314  *              or zero if there is no active VACUUM
1315  *
1316  * Note: for correct interlocking, the caller must already hold pin and
1317  * exclusive lock on each buffer it will store the cycle ID into.  This
1318  * ensures that even if a VACUUM starts immediately afterwards, it cannot
1319  * process those pages until the page split is complete.
1320  */
1321 BTCycleId
1322 _bt_vacuum_cycleid(Relation rel)
1323 {
1324         BTCycleId       result = 0;
1325         int                     i;
1326
1327         /* Share lock is enough since this is a read-only operation */
1328         LWLockAcquire(BtreeVacuumLock, LW_SHARED);
1329
1330         for (i = 0; i < btvacinfo->num_vacuums; i++)
1331         {
1332                 BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1333
1334                 if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1335                         vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1336                 {
1337                         result = vac->cycleid;
1338                         break;
1339                 }
1340         }
1341
1342         LWLockRelease(BtreeVacuumLock);
1343         return result;
1344 }
1345
1346 /*
1347  * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation
1348  *
1349  * Note: the caller must guarantee that it will eventually call
1350  * _bt_end_vacuum, else we'll permanently leak an array slot.  To ensure
1351  * that this happens even in elog(FATAL) scenarios, the appropriate coding
1352  * is not just a PG_TRY, but
1353  *              PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel))
1354  */
1355 BTCycleId
1356 _bt_start_vacuum(Relation rel)
1357 {
1358         BTCycleId       result;
1359         int                     i;
1360         BTOneVacInfo *vac;
1361
1362         LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1363
1364         /*
1365          * Assign the next cycle ID, being careful to avoid zero as well as the
1366          * reserved high values.
1367          */
1368         result = ++(btvacinfo->cycle_ctr);
1369         if (result == 0 || result > MAX_BT_CYCLE_ID)
1370                 result = btvacinfo->cycle_ctr = 1;
1371
1372         /* Let's just make sure there's no entry already for this index */
1373         for (i = 0; i < btvacinfo->num_vacuums; i++)
1374         {
1375                 vac = &btvacinfo->vacuums[i];
1376                 if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1377                         vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1378                 {
1379                         /*
1380                          * Unlike most places in the backend, we have to explicitly
1381                          * release our LWLock before throwing an error.  This is because
1382                          * we expect _bt_end_vacuum() to be called before transaction
1383                          * abort cleanup can run to release LWLocks.
1384                          */
1385                         LWLockRelease(BtreeVacuumLock);
1386                         elog(ERROR, "multiple active vacuums for index \"%s\"",
1387                                  RelationGetRelationName(rel));
1388                 }
1389         }
1390
1391         /* OK, add an entry */
1392         if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums)
1393         {
1394                 LWLockRelease(BtreeVacuumLock);
1395                 elog(ERROR, "out of btvacinfo slots");
1396         }
1397         vac = &btvacinfo->vacuums[btvacinfo->num_vacuums];
1398         vac->relid = rel->rd_lockInfo.lockRelId;
1399         vac->cycleid = result;
1400         btvacinfo->num_vacuums++;
1401
1402         LWLockRelease(BtreeVacuumLock);
1403         return result;
1404 }
1405
1406 /*
1407  * _bt_end_vacuum --- mark a btree VACUUM operation as done
1408  *
1409  * Note: this is deliberately coded not to complain if no entry is found;
1410  * this allows the caller to put PG_TRY around the start_vacuum operation.
1411  */
1412 void
1413 _bt_end_vacuum(Relation rel)
1414 {
1415         int                     i;
1416
1417         LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1418
1419         /* Find the array entry */
1420         for (i = 0; i < btvacinfo->num_vacuums; i++)
1421         {
1422                 BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1423
1424                 if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1425                         vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1426                 {
1427                         /* Remove it by shifting down the last entry */
1428                         *vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1];
1429                         btvacinfo->num_vacuums--;
1430                         break;
1431                 }
1432         }
1433
1434         LWLockRelease(BtreeVacuumLock);
1435 }
1436
1437 /*
1438  * _bt_end_vacuum wrapped as an on_shmem_exit callback function
1439  */
1440 void
1441 _bt_end_vacuum_callback(int code, Datum arg)
1442 {
1443         _bt_end_vacuum((Relation) DatumGetPointer(arg));
1444 }
1445
1446 /*
1447  * BTreeShmemSize --- report amount of shared memory space needed
1448  */
1449 Size
1450 BTreeShmemSize(void)
1451 {
1452         Size            size;
1453
1454         size = offsetof(BTVacInfo, vacuums[0]);
1455         size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo)));
1456         return size;
1457 }
1458
1459 /*
1460  * BTreeShmemInit --- initialize this module's shared memory
1461  */
1462 void
1463 BTreeShmemInit(void)
1464 {
1465         bool            found;
1466
1467         btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State",
1468                                                                                           BTreeShmemSize(),
1469                                                                                           &found);
1470
1471         if (!IsUnderPostmaster)
1472         {
1473                 /* Initialize shared memory area */
1474                 Assert(!found);
1475
1476                 /*
1477                  * It doesn't really matter what the cycle counter starts at, but
1478                  * having it always start the same doesn't seem good.  Seed with
1479                  * low-order bits of time() instead.
1480                  */
1481                 btvacinfo->cycle_ctr = (BTCycleId) time(NULL);
1482
1483                 btvacinfo->num_vacuums = 0;
1484                 btvacinfo->max_vacuums = MaxBackends;
1485         }
1486         else
1487                 Assert(found);
1488 }
1489
1490 Datum
1491 btoptions(PG_FUNCTION_ARGS)
1492 {
1493         Datum           reloptions = PG_GETARG_DATUM(0);
1494         bool            validate = PG_GETARG_BOOL(1);
1495         bytea      *result;
1496
1497         result = default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
1498         if (result)
1499                 PG_RETURN_BYTEA_P(result);
1500         PG_RETURN_NULL();
1501 }