From: Tom Lane Date: Thu, 5 Jul 2001 19:33:35 +0000 (+0000) Subject: Don't assume that max offset number stays fixed on a page when we're X-Git-Tag: REL9_0_0~20202 X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=fb0919fb83cdf0e121eccbd69ac8c57d80b64322;p=pg-rex%2Fsyncrep.git Don't assume that max offset number stays fixed on a page when we're not holding a pin on the page. Use double instead of long to count rows in relation, so that code still works for > LONG_MAX rows in rel. --- diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index 08a4f656ff..e7a7d5c066 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/analyze.c,v 1.21 2001/06/22 19:16:21 wieck Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/analyze.c,v 1.22 2001/07/05 19:33:35 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -97,8 +97,8 @@ typedef struct } ScalarMCVItem; -#define swapInt(a,b) {int _tmp; _tmp=a; a=b; b=_tmp;} -#define swapDatum(a,b) {Datum _tmp; _tmp=a; a=b; b=_tmp;} +#define swapInt(a,b) do {int _tmp; _tmp=a; a=b; b=_tmp;} while(0) +#define swapDatum(a,b) do {Datum _tmp; _tmp=a; a=b; b=_tmp;} while(0) static int MESSAGE_LEVEL; @@ -111,20 +111,18 @@ static int *datumCmpTupnoLink; static VacAttrStats *examine_attribute(Relation onerel, int attnum); static int acquire_sample_rows(Relation onerel, HeapTuple *rows, - int targrows, long *totalrows); + int targrows, double *totalrows); static double random_fract(void); static double init_selection_state(int n); -static long select_next_random_record(long t, int n, double *stateptr); +static double select_next_random_record(double t, int n, double *stateptr); static int compare_rows(const void *a, const void *b); static int compare_scalars(const void *a, const void *b); static int compare_mcvs(const void *a, const void *b); -static OffsetNumber get_page_max_offset(Relation relation, - BlockNumber blocknumber); static void compute_minimal_stats(VacAttrStats *stats, - TupleDesc tupDesc, long totalrows, + TupleDesc tupDesc, double totalrows, HeapTuple *rows, int numrows); static void compute_scalar_stats(VacAttrStats *stats, - TupleDesc tupDesc, long totalrows, + TupleDesc tupDesc, double totalrows, HeapTuple *rows, int numrows); static void update_attstats(Oid relid, int natts, VacAttrStats **vacattrstats); @@ -143,7 +141,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt) VacAttrStats **vacattrstats; int targrows, numrows; - long totalrows; + double totalrows; HeapTuple *rows; HeapTuple tuple; @@ -298,7 +296,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt) if (!vacstmt->vacuum) vac_update_relstats(RelationGetRelid(onerel), onerel->rd_nblocks, - (double) totalrows, + totalrows, RelationGetForm(onerel)->relhasindex); /* @@ -488,7 +486,7 @@ examine_attribute(Relation onerel, int attnum) */ static int acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, - long *totalrows) + double *totalrows) { int numrows = 0; HeapScanDesc scan; @@ -499,7 +497,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, OffsetNumber lastoffset; int numest; double tuplesperpage; - long t; + double t; double rstate; Assert(targrows > 1); @@ -520,7 +518,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, */ if (!HeapTupleIsValid(tuple)) { - *totalrows = numrows; + *totalrows = (double) numrows; return numrows; } /* @@ -565,20 +563,22 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, } tuplesperpage = (double) numest / (double) estblock; - t = numrows; /* t is the # of records processed so far */ + t = (double) numrows; /* t is the # of records processed so far */ rstate = init_selection_state(targrows); for (;;) { double targpos; BlockNumber targblock; + Buffer targbuffer; + Page targpage; OffsetNumber targoffset, maxoffset; t = select_next_random_record(t, targrows, &rstate); /* Try to read the t'th record in the table */ - targpos = (double) t / tuplesperpage; + targpos = t / tuplesperpage; targblock = (BlockNumber) targpos; - targoffset = ((int) (targpos - targblock) * tuplesperpage) + + targoffset = ((int) ((targpos - targblock) * tuplesperpage)) + FirstOffsetNumber; /* Make sure we are past the last selected record */ if (targblock <= lastblock) @@ -595,21 +595,37 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, */ if (targblock >= onerel->rd_nblocks) break; - maxoffset = get_page_max_offset(onerel, targblock); + /* + * We must maintain a pin on the target page's buffer to ensure that + * the maxoffset value stays good (else concurrent VACUUM might + * delete tuples out from under us). Hence, pin the page until we + * are done looking at it. We don't maintain a lock on the page, + * so tuples could get added to it, but we ignore such tuples. + */ + targbuffer = ReadBuffer(onerel, targblock); + if (!BufferIsValid(targbuffer)) + elog(ERROR, "acquire_sample_rows: ReadBuffer(%s,%u) failed", + RelationGetRelationName(onerel), targblock); + LockBuffer(targbuffer, BUFFER_LOCK_SHARE); + targpage = BufferGetPage(targbuffer); + maxoffset = PageGetMaxOffsetNumber(targpage); + LockBuffer(targbuffer, BUFFER_LOCK_UNLOCK); + for (;;) { HeapTupleData targtuple; - Buffer targbuffer; + Buffer tupbuffer; if (targoffset > maxoffset) { /* Fell off end of this page, try next */ + ReleaseBuffer(targbuffer); targblock++; targoffset = FirstOffsetNumber; goto pageloop; } ItemPointerSet(&targtuple.t_self, targblock, targoffset); - heap_fetch(onerel, SnapshotNow, &targtuple, &targbuffer, NULL); + heap_fetch(onerel, SnapshotNow, &targtuple, &tupbuffer, NULL); if (targtuple.t_data != NULL) { /* @@ -621,6 +637,9 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, Assert(k >= 0 && k < targrows); heap_freetuple(rows[k]); rows[k] = heap_copytuple(&targtuple); + /* this releases the second pin acquired by heap_fetch: */ + ReleaseBuffer(tupbuffer); + /* this releases the initial pin: */ ReleaseBuffer(targbuffer); lastblock = targblock; lastoffset = targoffset; @@ -639,7 +658,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows, /* * Estimate total number of valid rows in relation. */ - *totalrows = (long) (onerel->rd_nblocks * tuplesperpage + 0.5); + *totalrows = floor((double) onerel->rd_nblocks * tuplesperpage + 0.5); return numrows; } @@ -667,6 +686,12 @@ random_fract(void) * of the last record processed and next record to process. The only extra * state needed between calls is W, a random state variable. * + * Note: the original algorithm defines t, S, numer, and denom as integers. + * Here we express them as doubles to avoid overflow if the number of rows + * in the table exceeds INT_MAX. The algorithm should work as long as the + * row count does not become so large that it is not represented accurately + * in a double (on IEEE-math machines this would be around 2^52 rows). + * * init_selection_state computes the initial W value. * * Given that we've already processed t records (t >= n), @@ -680,36 +705,36 @@ init_selection_state(int n) return exp(- log(random_fract())/n); } -static long -select_next_random_record(long t, int n, double *stateptr) +static double +select_next_random_record(double t, int n, double *stateptr) { /* The magic constant here is T from Vitter's paper */ - if (t <= (22 * n)) + if (t <= (22.0 * n)) { /* Process records using Algorithm X until t is large enough */ double V, quot; V = random_fract(); /* Generate V */ - t++; - quot = (double) (t - n) / (double) t; + t += 1; + quot = (t - (double) n) / t; /* Find min S satisfying (4.1) */ while (quot > V) { - t++; - quot *= (double) (t - n) / (double) t; + t += 1; + quot *= (t - (double) n) / t; } } else { /* Now apply Algorithm Z */ double W = *stateptr; - long term = t - n + 1; - int S; + double term = t - (double) n + 1; + double S; for (;;) { - long numer, + double numer, numer_lim, denom; double U, @@ -722,9 +747,9 @@ select_next_random_record(long t, int n, double *stateptr) /* Generate U and X */ U = random_fract(); X = t * (W - 1.0); - S = X; /* S is tentatively set to floor(X) */ + S = floor(X); /* S is tentatively set to floor(X) */ /* Test if U <= h(S)/cg(X) in the manner of (6.3) */ - tmp = (double) (t + 1) / (double) term; + tmp = (t + 1) / term; lhs = exp(log(((U * tmp * tmp) * (term + S))/(t + X))/n); rhs = (((t + X)/(term + S)) * term)/t; if (lhs <= rhs) @@ -734,20 +759,20 @@ select_next_random_record(long t, int n, double *stateptr) } /* Test if U <= f(S)/cg(X) */ y = (((U * (t + 1))/term) * (t + S + 1))/(t + X); - if (n < S) + if ((double) n < S) { denom = t; numer_lim = term + S; } else { - denom = t - n + S; + denom = t - (double) n + S; numer_lim = t + 1; } - for (numer = t + S; numer >= numer_lim; numer--) + for (numer = t + S; numer >= numer_lim; numer -= 1) { - y *= (double) numer / (double) denom; - denom--; + y *= numer / denom; + denom -= 1; } W = exp(- log(random_fract())/n); /* Generate W in advance */ if (exp(log(y)/n) <= (t + X)/t) @@ -783,30 +808,6 @@ compare_rows(const void *a, const void *b) return 0; } -/* - * Discover the largest valid tuple offset number on the given page - * - * This code probably ought to live in some other module. - */ -static OffsetNumber -get_page_max_offset(Relation relation, BlockNumber blocknumber) -{ - Buffer buffer; - Page p; - OffsetNumber offnum; - - buffer = ReadBuffer(relation, blocknumber); - if (!BufferIsValid(buffer)) - elog(ERROR, "get_page_max_offset: %s relation: ReadBuffer(%ld) failed", - RelationGetRelationName(relation), (long) blocknumber); - LockBuffer(buffer, BUFFER_LOCK_SHARE); - p = BufferGetPage(buffer); - offnum = PageGetMaxOffsetNumber(p); - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - return offnum; -} - /* * compute_minimal_stats() -- compute minimal column statistics @@ -825,7 +826,7 @@ get_page_max_offset(Relation relation, BlockNumber blocknumber) */ static void compute_minimal_stats(VacAttrStats *stats, - TupleDesc tupDesc, long totalrows, + TupleDesc tupDesc, double totalrows, HeapTuple *rows, int numrows) { int i; @@ -1002,7 +1003,7 @@ compute_minimal_stats(VacAttrStats *stats, if (f1 < 1) f1 = 1; - term1 = sqrt((double) totalrows / (double) numrows) * f1; + term1 = sqrt(totalrows / (double) numrows) * f1; stats->stadistinct = floor(term1 + nmultiple + 0.5); } @@ -1104,7 +1105,7 @@ compute_minimal_stats(VacAttrStats *stats, */ static void compute_scalar_stats(VacAttrStats *stats, - TupleDesc tupDesc, long totalrows, + TupleDesc tupDesc, double totalrows, HeapTuple *rows, int numrows) { int i; @@ -1298,7 +1299,7 @@ compute_scalar_stats(VacAttrStats *stats, if (f1 < 1) f1 = 1; - term1 = sqrt((double) totalrows / (double) numrows) * f1; + term1 = sqrt(totalrows / (double) numrows) * f1; stats->stadistinct = floor(term1 + nmultiple + 0.5); }