OSDN Git Service

Don't assume that max offset number stays fixed on a page when we're
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 5 Jul 2001 19:33:35 +0000 (19:33 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 5 Jul 2001 19:33:35 +0000 (19:33 +0000)
not holding a pin on the page.  Use double instead of long to count
rows in relation, so that code still works for > LONG_MAX rows in rel.

src/backend/commands/analyze.c

index 08a4f65..e7a7d5c 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/analyze.c,v 1.21 2001/06/22 19:16:21 wieck Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/analyze.c,v 1.22 2001/07/05 19:33:35 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -97,8 +97,8 @@ typedef struct
 } ScalarMCVItem;
 
 
-#define swapInt(a,b)   {int _tmp; _tmp=a; a=b; b=_tmp;}
-#define swapDatum(a,b) {Datum _tmp; _tmp=a; a=b; b=_tmp;}
+#define swapInt(a,b)   do {int _tmp; _tmp=a; a=b; b=_tmp;} while(0)
+#define swapDatum(a,b) do {Datum _tmp; _tmp=a; a=b; b=_tmp;} while(0)
 
 
 static int MESSAGE_LEVEL;
@@ -111,20 +111,18 @@ static int *datumCmpTupnoLink;
 
 static VacAttrStats *examine_attribute(Relation onerel, int attnum);
 static int acquire_sample_rows(Relation onerel, HeapTuple *rows,
-                                                          int targrows, long *totalrows);
+                                                          int targrows, double *totalrows);
 static double random_fract(void);
 static double init_selection_state(int n);
-static long select_next_random_record(long t, int n, double *stateptr);
+static double select_next_random_record(double t, int n, double *stateptr);
 static int compare_rows(const void *a, const void *b);
 static int compare_scalars(const void *a, const void *b);
 static int compare_mcvs(const void *a, const void *b);
-static OffsetNumber get_page_max_offset(Relation relation,
-                                                                               BlockNumber blocknumber);
 static void compute_minimal_stats(VacAttrStats *stats,
-                                                                 TupleDesc tupDesc, long totalrows,
+                                                                 TupleDesc tupDesc, double totalrows,
                                                                  HeapTuple *rows, int numrows);
 static void compute_scalar_stats(VacAttrStats *stats,
-                                                                TupleDesc tupDesc, long totalrows,
+                                                                TupleDesc tupDesc, double totalrows,
                                                                 HeapTuple *rows, int numrows);
 static void update_attstats(Oid relid, int natts, VacAttrStats **vacattrstats);
 
@@ -143,7 +141,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt)
        VacAttrStats **vacattrstats;
        int                     targrows,
                                numrows;
-       long            totalrows;
+       double          totalrows;
        HeapTuple  *rows;
        HeapTuple       tuple;
 
@@ -298,7 +296,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt)
        if (!vacstmt->vacuum)
                vac_update_relstats(RelationGetRelid(onerel),
                                                        onerel->rd_nblocks,
-                                                       (double) totalrows,
+                                                       totalrows,
                                                        RelationGetForm(onerel)->relhasindex);
 
        /*
@@ -488,7 +486,7 @@ examine_attribute(Relation onerel, int attnum)
  */
 static int
 acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
-                                       long *totalrows)
+                                       double *totalrows)
 {
        int                     numrows = 0;
        HeapScanDesc scan;
@@ -499,7 +497,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
        OffsetNumber lastoffset;
        int                     numest;
        double          tuplesperpage;
-       long            t;
+       double          t;
        double          rstate;
 
        Assert(targrows > 1);
@@ -520,7 +518,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
         */
        if (!HeapTupleIsValid(tuple))
        {
-               *totalrows = numrows;
+               *totalrows = (double) numrows;
                return numrows;
        }
        /*
@@ -565,20 +563,22 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
        }
        tuplesperpage = (double) numest / (double) estblock;
 
-       t = numrows;                            /* t is the # of records processed so far */
+       t = (double) numrows;           /* t is the # of records processed so far */
        rstate = init_selection_state(targrows);
        for (;;)
        {
                double                  targpos;
                BlockNumber             targblock;
+               Buffer                  targbuffer;
+               Page                    targpage;
                OffsetNumber    targoffset,
                                                maxoffset;
 
                t = select_next_random_record(t, targrows, &rstate);
                /* Try to read the t'th record in the table */
-               targpos = (double) t / tuplesperpage;
+               targpos = t / tuplesperpage;
                targblock = (BlockNumber) targpos;
-               targoffset = ((int) (targpos - targblock) * tuplesperpage) + 
+               targoffset = ((int) ((targpos - targblock) * tuplesperpage)) + 
                        FirstOffsetNumber;
                /* Make sure we are past the last selected record */
                if (targblock <= lastblock)
@@ -595,21 +595,37 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
                 */
                if (targblock >= onerel->rd_nblocks)
                        break;
-               maxoffset = get_page_max_offset(onerel, targblock);
+               /*
+                * We must maintain a pin on the target page's buffer to ensure that
+                * the maxoffset value stays good (else concurrent VACUUM might
+                * delete tuples out from under us).  Hence, pin the page until we
+                * are done looking at it.  We don't maintain a lock on the page,
+                * so tuples could get added to it, but we ignore such tuples.
+                */
+               targbuffer = ReadBuffer(onerel, targblock);
+               if (!BufferIsValid(targbuffer))
+                       elog(ERROR, "acquire_sample_rows: ReadBuffer(%s,%u) failed",
+                                RelationGetRelationName(onerel), targblock);
+               LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
+               targpage = BufferGetPage(targbuffer);
+               maxoffset = PageGetMaxOffsetNumber(targpage);
+               LockBuffer(targbuffer, BUFFER_LOCK_UNLOCK);
+
                for (;;)
                {
                        HeapTupleData targtuple;
-                       Buffer          targbuffer;
+                       Buffer          tupbuffer;
 
                        if (targoffset > maxoffset)
                        {
                                /* Fell off end of this page, try next */
+                               ReleaseBuffer(targbuffer);
                                targblock++;
                                targoffset = FirstOffsetNumber;
                                goto pageloop;
                        }
                        ItemPointerSet(&targtuple.t_self, targblock, targoffset);
-                       heap_fetch(onerel, SnapshotNow, &targtuple, &targbuffer, NULL);
+                       heap_fetch(onerel, SnapshotNow, &targtuple, &tupbuffer, NULL);
                        if (targtuple.t_data != NULL)
                        {
                                /*
@@ -621,6 +637,9 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
                                Assert(k >= 0 && k < targrows);
                                heap_freetuple(rows[k]);
                                rows[k] = heap_copytuple(&targtuple);
+                               /* this releases the second pin acquired by heap_fetch: */
+                               ReleaseBuffer(tupbuffer);
+                               /* this releases the initial pin: */
                                ReleaseBuffer(targbuffer);
                                lastblock = targblock;
                                lastoffset = targoffset;
@@ -639,7 +658,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
        /*
         * Estimate total number of valid rows in relation.
         */
-       *totalrows = (long) (onerel->rd_nblocks * tuplesperpage + 0.5);
+       *totalrows = floor((double) onerel->rd_nblocks * tuplesperpage + 0.5);
 
        return numrows;
 }
@@ -667,6 +686,12 @@ random_fract(void)
  * of the last record processed and next record to process.  The only extra
  * state needed between calls is W, a random state variable.
  *
+ * Note: the original algorithm defines t, S, numer, and denom as integers.
+ * Here we express them as doubles to avoid overflow if the number of rows
+ * in the table exceeds INT_MAX.  The algorithm should work as long as the
+ * row count does not become so large that it is not represented accurately
+ * in a double (on IEEE-math machines this would be around 2^52 rows).
+ *
  * init_selection_state computes the initial W value.
  *
  * Given that we've already processed t records (t >= n),
@@ -680,36 +705,36 @@ init_selection_state(int n)
        return exp(- log(random_fract())/n);
 }
 
-static long
-select_next_random_record(long t, int n, double *stateptr)
+static double
+select_next_random_record(double t, int n, double *stateptr)
 {
        /* The magic constant here is T from Vitter's paper */
-       if (t <= (22 * n))
+       if (t <= (22.0 * n))
        {
                /* Process records using Algorithm X until t is large enough */
                double  V,
                                quot;
 
                V = random_fract();             /* Generate V */
-               t++;
-               quot = (double) (t - n) / (double) t;
+               t += 1;
+               quot = (t - (double) n) / t;
                /* Find min S satisfying (4.1) */
                while (quot > V)
                {
-                       t++;
-                       quot *= (double) (t - n) / (double) t;
+                       t += 1;
+                       quot *= (t - (double) n) / t;
                }
        }
        else
        {
                /* Now apply Algorithm Z */
                double  W = *stateptr;
-               long    term = t - n + 1;
-               int             S;
+               double  term = t - (double) n + 1;
+               double  S;
 
                for (;;)
                {
-                       long    numer,
+                       double  numer,
                                        numer_lim,
                                        denom;
                        double  U,
@@ -722,9 +747,9 @@ select_next_random_record(long t, int n, double *stateptr)
                        /* Generate U and X */
                        U = random_fract();
                        X = t * (W - 1.0);
-                       S = X;                          /* S is tentatively set to floor(X) */
+                       S = floor(X);           /* S is tentatively set to floor(X) */
                        /* Test if U <= h(S)/cg(X) in the manner of (6.3) */
-                       tmp = (double) (t + 1) / (double) term;
+                       tmp = (t + 1) / term;
                        lhs = exp(log(((U * tmp * tmp) * (term + S))/(t + X))/n);
                        rhs = (((t + X)/(term + S)) * term)/t;
                        if (lhs <= rhs)
@@ -734,20 +759,20 @@ select_next_random_record(long t, int n, double *stateptr)
                        }
                        /* Test if U <= f(S)/cg(X) */
                        y = (((U * (t + 1))/term) * (t + S + 1))/(t + X);
-                       if (n < S)
+                       if ((double) n < S)
                        {
                                denom = t;
                                numer_lim = term + S;
                        }
                        else
                        {
-                               denom = t - n + S;
+                               denom = t - (double) n + S;
                                numer_lim = t + 1;
                        }
-                       for (numer = t + S; numer >= numer_lim; numer--)
+                       for (numer = t + S; numer >= numer_lim; numer -= 1)
                        {
-                               y *= (double) numer / (double) denom;
-                               denom--;
+                               y *= numer / denom;
+                               denom -= 1;
                        }
                        W = exp(- log(random_fract())/n); /* Generate W in advance */
                        if (exp(log(y)/n) <= (t + X)/t)
@@ -783,30 +808,6 @@ compare_rows(const void *a, const void *b)
        return 0;
 }
 
-/*
- * Discover the largest valid tuple offset number on the given page
- *
- * This code probably ought to live in some other module.
- */
-static OffsetNumber
-get_page_max_offset(Relation relation, BlockNumber blocknumber)
-{
-       Buffer          buffer;
-       Page            p;
-       OffsetNumber offnum;
-
-       buffer = ReadBuffer(relation, blocknumber);
-       if (!BufferIsValid(buffer))
-               elog(ERROR, "get_page_max_offset: %s relation: ReadBuffer(%ld) failed",
-                        RelationGetRelationName(relation), (long) blocknumber);
-       LockBuffer(buffer, BUFFER_LOCK_SHARE);
-       p = BufferGetPage(buffer);
-       offnum = PageGetMaxOffsetNumber(p);
-       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-       ReleaseBuffer(buffer);
-       return offnum;
-}
-
 
 /*
  *     compute_minimal_stats() -- compute minimal column statistics
@@ -825,7 +826,7 @@ get_page_max_offset(Relation relation, BlockNumber blocknumber)
  */
 static void
 compute_minimal_stats(VacAttrStats *stats,
-                                         TupleDesc tupDesc, long totalrows,
+                                         TupleDesc tupDesc, double totalrows,
                                          HeapTuple *rows, int numrows)
 {
        int                     i;
@@ -1002,7 +1003,7 @@ compute_minimal_stats(VacAttrStats *stats,
 
                        if (f1 < 1)
                                f1 = 1;
-                       term1 = sqrt((double) totalrows / (double) numrows) * f1;
+                       term1 = sqrt(totalrows / (double) numrows) * f1;
                        stats->stadistinct = floor(term1 + nmultiple + 0.5);
                }
 
@@ -1104,7 +1105,7 @@ compute_minimal_stats(VacAttrStats *stats,
  */
 static void
 compute_scalar_stats(VacAttrStats *stats,
-                                        TupleDesc tupDesc, long totalrows,
+                                        TupleDesc tupDesc, double totalrows,
                                         HeapTuple *rows, int numrows)
 {
        int                     i;
@@ -1298,7 +1299,7 @@ compute_scalar_stats(VacAttrStats *stats,
 
                        if (f1 < 1)
                                f1 = 1;
-                       term1 = sqrt((double) totalrows / (double) numrows) * f1;
+                       term1 = sqrt(totalrows / (double) numrows) * f1;
                        stats->stadistinct = floor(term1 + nmultiple + 0.5);
                }