* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.64 2006/03/08 16:59:03 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.65 2006/03/10 23:19:00 tgl Exp $
*
*-------------------------------------------------------------------------
*/
* have not yet exhausted that run. mergenext[i] is the memtuples index
* of the next pre-read tuple (next to be loaded into the heap) for tape
* i, or 0 if we are out of pre-read tuples. mergelast[i] similarly
- * points to the last pre-read tuple from each tape. mergeavailmem[i] is
- * the amount of unused space allocated for tape i. mergefreelist and
- * mergefirstfree keep track of unused locations in the memtuples[] array.
- * The memtuples[].tupindex fields link together pre-read tuples for each
- * tape as well as recycled locations in mergefreelist. It is OK to use 0
- * as a null link in these lists, because memtuples[0] is part of the
- * merge heap and is never a pre-read tuple. mergeslotsfree counts the
- * total number of free memtuples[] slots, both those in the freelist and
- * those beyond mergefirstfree.
+ * points to the last pre-read tuple from each tape. mergeavailslots[i]
+ * is the number of unused memtuples[] slots reserved for tape i, and
+ * mergeavailmem[i] is the amount of unused space allocated for tape i.
+ * mergefreelist and mergefirstfree keep track of unused locations in the
+ * memtuples[] array. The memtuples[].tupindex fields link together
+ * pre-read tuples for each tape as well as recycled locations in
+ * mergefreelist. It is OK to use 0 as a null link in these lists, because
+ * memtuples[0] is part of the merge heap and is never a pre-read tuple.
*/
- bool *mergeactive; /* Active input run source? */
+ bool *mergeactive; /* active input run source? */
int *mergenext; /* first preread tuple for each source */
int *mergelast; /* last preread tuple for each source */
- long *mergeavailmem; /* availMem for prereading tapes */
- long spacePerTape; /* actual per-tape target usage */
+ int *mergeavailslots; /* slots left for prereading each tape */
+ long *mergeavailmem; /* availMem for prereading each tape */
int mergefreelist; /* head of freelist of recycled slots */
int mergefirstfree; /* first slot never used in this merge */
- int mergeslotsfree; /* number of free slots during merge */
/*
* Variables for Algorithm D. Note that destTape is a "logical" tape
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
static void mergepreread(Tuplesortstate *state);
+static void mergeprereadone(Tuplesortstate *state, int srcTape);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
int tupleindex, bool checkIndex);
{
/*
* out of preloaded data on this tape, try to read more
+ *
+ * Unlike mergeonerun(), we only preload from the single
+ * tape that's run dry. See mergepreread() comments.
*/
- mergepreread(state);
+ mergeprereadone(state, srcTape);
/*
* if still no data, we've reached end of run on this tape
/* put the now-unused memtuples entry on the freelist */
newtup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex;
- state->mergeslotsfree++;
+ state->mergeavailslots[srcTape]++;
return true;
}
return false;
state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
+ state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
/* put the now-unused memtuples entry on the freelist */
tup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex;
- state->mergeslotsfree++;
+ state->mergeavailslots[srcTape]++;
}
/*
int activeTapes;
int tapenum;
int srcTape;
+ int slotsPerTape;
+ long spacePerTape;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
- /* Clear merge-pass state variables */
- memset(state->mergeactive, 0, state->maxTapes * sizeof(*state->mergeactive));
- memset(state->mergenext, 0, state->maxTapes * sizeof(*state->mergenext));
- memset(state->mergelast, 0, state->maxTapes * sizeof(*state->mergelast));
- memset(state->mergeavailmem, 0, state->maxTapes * sizeof(*state->mergeavailmem));
- state->mergefreelist = 0; /* nothing in the freelist */
- state->mergefirstfree = state->maxTapes; /* 1st slot avail for preread */
- state->mergeslotsfree = state->memtupsize - state->mergefirstfree;
- Assert(state->mergeslotsfree >= state->maxTapes);
-
/* Adjust run counts and mark the active tapes */
+ memset(state->mergeactive, 0,
+ state->maxTapes * sizeof(*state->mergeactive));
activeTapes = 0;
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
}
state->activeTapes = activeTapes;
+ /* Clear merge-pass state variables */
+ memset(state->mergenext, 0,
+ state->maxTapes * sizeof(*state->mergenext));
+ memset(state->mergelast, 0,
+ state->maxTapes * sizeof(*state->mergelast));
+ state->mergefreelist = 0; /* nothing in the freelist */
+ state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
+
/*
* Initialize space allocation to let each active input tape have an equal
* share of preread space.
*/
Assert(activeTapes > 0);
- state->spacePerTape = state->availMem / activeTapes;
+ slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
+ Assert(slotsPerTape > 0);
+ spacePerTape = state->availMem / activeTapes;
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
if (state->mergeactive[srcTape])
- state->mergeavailmem[srcTape] = state->spacePerTape;
+ {
+ state->mergeavailslots[srcTape] = slotsPerTape;
+ state->mergeavailmem[srcTape] = spacePerTape;
+ }
}
/*
/* put the now-unused memtuples entry on the freelist */
tup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex;
- state->mergeslotsfree++;
+ state->mergeavailslots[srcTape]++;
}
}
}
* active source tape until the tape's run is exhausted or it has used up
* its fair share of available memory. In any case, we guarantee that there
* is at least one preread tuple available from each unexhausted input tape.
+ *
+ * We invoke this routine at the start of a merge pass for initial load,
+ * and then whenever any tape's preread data runs out. Note that we load
+ * as much data as possible from all tapes, not just the one that ran out.
+ * This is because logtape.c works best with a usage pattern that alternates
+ * between reading a lot of data and writing a lot of data, so whenever we
+ * are forced to read, we should fill working memory completely.
+ *
+ * In FINALMERGE state, we *don't* use this routine, but instead just preread
+ * from the single tape that ran dry. There's no read/write alternation in
+ * that state and so no point in scanning through all the tapes to fix one.
+ * (Moreover, there may be quite a lot of inactive tapes in that state, since
+ * we might have had many fewer runs than tapes. In a regular tape-to-tape
+ * merge we can expect most of the tapes to be active.)
*/
static void
mergepreread(Tuplesortstate *state)
{
int srcTape;
+
+ for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+ mergeprereadone(state, srcTape);
+}
+
+/*
+ * mergeprereadone - load tuples from one merge input tape
+ *
+ * Read tuples from the specified tape until it has used up its free memory
+ * or array slots; but ensure that we have at least one tuple, if any are
+ * to be had.
+ */
+static void
+mergeprereadone(Tuplesortstate *state, int srcTape)
+{
unsigned int tuplen;
SortTuple stup;
int tupIndex;
long priorAvail,
spaceUsed;
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+ if (!state->mergeactive[srcTape])
+ return; /* tape's run is already exhausted */
+ priorAvail = state->availMem;
+ state->availMem = state->mergeavailmem[srcTape];
+ while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
+ state->mergenext[srcTape] == 0)
{
- if (!state->mergeactive[srcTape])
- continue;
-
- /*
- * Skip reading from any tape that still has at least half of its
- * target memory filled with tuples (threshold fraction may need
- * adjustment?). This avoids reading just a few tuples when the
- * incoming runs are not being consumed evenly.
- */
- if (state->mergenext[srcTape] != 0 &&
- state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
- continue;
-
- /*
- * Read tuples from this tape until it has used up its free memory,
- * or we are low on memtuples slots; but ensure that we have at least
- * one tuple.
- */
- priorAvail = state->availMem;
- state->availMem = state->mergeavailmem[srcTape];
- while ((!LACKMEM(state) && state->mergeslotsfree > state->tapeRange) ||
- state->mergenext[srcTape] == 0)
+ /* read next tuple, if any */
+ if ((tuplen = getlen(state, srcTape, true)) == 0)
{
- /* read next tuple, if any */
- if ((tuplen = getlen(state, srcTape, true)) == 0)
- {
- state->mergeactive[srcTape] = false;
- break;
- }
- READTUP(state, &stup, srcTape, tuplen);
- /* find a free slot in memtuples[] for it */
- tupIndex = state->mergefreelist;
- if (tupIndex)
- state->mergefreelist = state->memtuples[tupIndex].tupindex;
- else
- {
- tupIndex = state->mergefirstfree++;
- Assert(tupIndex < state->memtupsize);
- }
- state->mergeslotsfree--;
- /* store tuple, append to list for its tape */
- stup.tupindex = 0;
- state->memtuples[tupIndex] = stup;
- if (state->mergelast[srcTape])
- state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
- else
- state->mergenext[srcTape] = tupIndex;
- state->mergelast[srcTape] = tupIndex;
+ state->mergeactive[srcTape] = false;
+ break;
}
- /* update per-tape and global availmem counts */
- spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
- state->mergeavailmem[srcTape] = state->availMem;
- state->availMem = priorAvail - spaceUsed;
+ READTUP(state, &stup, srcTape, tuplen);
+ /* find a free slot in memtuples[] for it */
+ tupIndex = state->mergefreelist;
+ if (tupIndex)
+ state->mergefreelist = state->memtuples[tupIndex].tupindex;
+ else
+ {
+ tupIndex = state->mergefirstfree++;
+ Assert(tupIndex < state->memtupsize);
+ }
+ state->mergeavailslots[srcTape]--;
+ /* store tuple, append to list for its tape */
+ stup.tupindex = 0;
+ state->memtuples[tupIndex] = stup;
+ if (state->mergelast[srcTape])
+ state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
+ else
+ state->mergenext[srcTape] = tupIndex;
+ state->mergelast[srcTape] = tupIndex;
}
+ /* update per-tape and global availmem counts */
+ spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
+ state->mergeavailmem[srcTape] = state->availMem;
+ state->availMem = priorAvail - spaceUsed;
}
/*