OSDN Git Service

Optimize multi-batch hash joins when the outer relation has a nonuniform
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 21 Mar 2009 00:04:40 +0000 (00:04 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 21 Mar 2009 00:04:40 +0000 (00:04 +0000)
distribution, by creating a special fast path for the (first few) most common
values of the outer relation.  Tuples having hashvalues matching the MCVs
are effectively forced to be in the first batch, so that we never write
them out to the batch temp files.

Bryce Cutt and Ramon Lawrence, with some editorialization by me.

src/backend/executor/nodeHash.c
src/backend/executor/nodeHashjoin.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/plan/createplan.c
src/include/executor/hashjoin.h
src/include/executor/nodeHash.h
src/include/nodes/execnodes.h
src/include/nodes/plannodes.h

index 83be33b..dd97ef4 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/executor/nodeHash.c,v 1.117 2009/01/01 17:23:41 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/executor/nodeHash.c,v 1.118 2009/03/21 00:04:38 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -24,6 +24,7 @@
 #include <math.h>
 #include <limits.h>
 
+#include "catalog/pg_statistic.h"
 #include "commands/tablespace.h"
 #include "executor/execdebug.h"
 #include "executor/hashjoin.h"
 #include "utils/dynahash.h"
 #include "utils/memutils.h"
 #include "utils/lsyscache.h"
+#include "utils/syscache.h"
 
 
 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
+static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
+                                                                 int mcvsToUse);
+static void ExecHashSkewTableInsert(HashJoinTable hashtable,
+                                                                       TupleTableSlot *slot,
+                                                                       uint32 hashvalue,
+                                                                       int bucketNumber);
+static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
 
 
 /* ----------------------------------------------------------------
@@ -99,7 +108,20 @@ MultiExecHash(HashState *node)
                if (ExecHashGetHashValue(hashtable, econtext, hashkeys, false, false,
                                                                 &hashvalue))
                {
-                       ExecHashTableInsert(hashtable, slot, hashvalue);
+                       int             bucketNumber;
+
+                       bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
+                       if (bucketNumber != INVALID_SKEW_BUCKET_NO)
+                       {
+                               /* It's a skew tuple, so put it into that hash table */
+                               ExecHashSkewTableInsert(hashtable, slot, hashvalue,
+                                                                               bucketNumber);
+                       }
+                       else
+                       {
+                               /* Not subject to skew optimization, so insert normally */
+                               ExecHashTableInsert(hashtable, slot, hashvalue);
+                       }
                        hashtable->totalTuples += 1;
                }
        }
@@ -225,6 +247,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
        Plan       *outerNode;
        int                     nbuckets;
        int                     nbatch;
+       int                     num_skew_mcvs;
        int                     log2_nbuckets;
        int                     nkeys;
        int                     i;
@@ -239,7 +262,8 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
        outerNode = outerPlan(node);
 
        ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
-                                                       &nbuckets, &nbatch);
+                                                       OidIsValid(node->skewTable),
+                                                       &nbuckets, &nbatch, &num_skew_mcvs);
 
 #ifdef HJDEBUG
        printf("nbatch = %d, nbuckets = %d\n", nbatch, nbuckets);
@@ -259,6 +283,11 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
        hashtable->nbuckets = nbuckets;
        hashtable->log2_nbuckets = log2_nbuckets;
        hashtable->buckets = NULL;
+       hashtable->skewEnabled = false;
+       hashtable->skewBucket = NULL;
+       hashtable->skewBucketLen = 0;
+       hashtable->nSkewBuckets = 0;
+       hashtable->skewBucketNums = NULL;
        hashtable->nbatch = nbatch;
        hashtable->curbatch = 0;
        hashtable->nbatch_original = nbatch;
@@ -269,6 +298,9 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
        hashtable->outerBatchFile = NULL;
        hashtable->spaceUsed = 0;
        hashtable->spaceAllowed = work_mem * 1024L;
+       hashtable->spaceUsedSkew = 0;
+       hashtable->spaceAllowedSkew =
+               hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
 
        /*
         * Get info about the hash functions to be used for each hash key. Also
@@ -339,6 +371,13 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
        hashtable->buckets = (HashJoinTuple *)
                palloc0(nbuckets * sizeof(HashJoinTuple));
 
+       /*
+        * Set up for skew optimization, if possible and there's a need for more
+        * than one batch.  (In a one-batch join, there's no point in it.)
+        */
+       if (nbatch > 1)
+               ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
+
        MemoryContextSwitchTo(oldcxt);
 
        return hashtable;
@@ -356,13 +395,15 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
 #define NTUP_PER_BUCKET                        10
 
 void
-ExecChooseHashTableSize(double ntuples, int tupwidth,
+ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
                                                int *numbuckets,
-                                               int *numbatches)
+                                               int *numbatches,
+                                               int *num_skew_mcvs)
 {
        int                     tupsize;
        double          inner_rel_bytes;
        long            hash_table_bytes;
+       long            skew_table_bytes;
        int                     nbatch;
        int                     nbuckets;
        int                     i;
@@ -387,6 +428,41 @@ ExecChooseHashTableSize(double ntuples, int tupwidth,
        hash_table_bytes = work_mem * 1024L;
 
        /*
+        * If skew optimization is possible, estimate the number of skew buckets
+        * that will fit in the memory allowed, and decrement the assumed space
+        * available for the main hash table accordingly.
+        *
+        * We make the optimistic assumption that each skew bucket will contain
+        * one inner-relation tuple.  If that turns out to be low, we will recover
+        * at runtime by reducing the number of skew buckets.
+        *
+        * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
+        * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
+        * will round up to the next power of 2 and then multiply by 4 to reduce
+        * collisions.
+        */
+       if (useskew)
+       {
+               skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
+
+               *num_skew_mcvs = skew_table_bytes / (
+                       /* size of a hash tuple */
+                       tupsize +
+                       /* worst-case size of skewBucket[] per MCV */
+                       (8 * sizeof(HashSkewBucket *)) +
+                       /* size of skewBucketNums[] entry */
+                       sizeof(int) +
+                       /* size of skew bucket struct itself */
+                       SKEW_BUCKET_OVERHEAD
+                       );
+
+               if (*num_skew_mcvs > 0)
+                       hash_table_bytes -= skew_table_bytes;
+       }
+       else
+               *num_skew_mcvs = 0;
+
+       /*
         * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
         * memory is filled.  Set nbatch to the smallest power of 2 that appears
         * sufficient.
@@ -813,13 +889,18 @@ ExecScanHashBucket(HashJoinState *hjstate,
        uint32          hashvalue = hjstate->hj_CurHashValue;
 
        /*
-        * hj_CurTuple is NULL to start scanning a new bucket, or the address of
-        * the last tuple returned from the current bucket.
+        * hj_CurTuple is the address of the tuple last returned from the current
+        * bucket, or NULL if it's time to start scanning a new bucket.
+        *
+        * If the tuple hashed to a skew bucket then scan the skew bucket
+        * otherwise scan the standard hashtable bucket.
         */
-       if (hashTuple == NULL)
-               hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
-       else
+       if (hashTuple != NULL)
                hashTuple = hashTuple->next;
+       else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
+               hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
+       else
+               hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
 
        while (hashTuple != NULL)
        {
@@ -889,3 +970,347 @@ ExecReScanHash(HashState *node, ExprContext *exprCtxt)
        if (((PlanState *) node)->lefttree->chgParam == NULL)
                ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
 }
+
+
+/*
+ * ExecHashBuildSkewHash
+ *
+ *             Set up for skew optimization if we can identify the most common values
+ *             (MCVs) of the outer relation's join key.  We make a skew hash bucket
+ *             for the hash value of each MCV, up to the number of slots allowed
+ *             based on available memory.
+ */
+static void
+ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
+{
+       HeapTupleData   *statsTuple;
+       Datum                   *values;
+       int                             nvalues;
+       float4                  *numbers;
+       int                             nnumbers;
+
+       /* Do nothing if planner didn't identify the outer relation's join key */
+       if (!OidIsValid(node->skewTable))
+               return;
+       /* Also, do nothing if we don't have room for at least one skew bucket */
+       if (mcvsToUse <= 0)
+               return;
+
+       /*
+        * Try to find the MCV statistics for the outer relation's join key.
+        */
+       statsTuple = SearchSysCache(STATRELATT,
+                                                               ObjectIdGetDatum(node->skewTable),
+                                                               Int16GetDatum(node->skewColumn),
+                                                               0, 0);
+       if (!HeapTupleIsValid(statsTuple))
+               return;
+
+       if (get_attstatsslot(statsTuple, node->skewColType, node->skewColTypmod,
+                                                STATISTIC_KIND_MCV, InvalidOid,
+                                                &values, &nvalues,
+                                                &numbers, &nnumbers))
+       {
+               double          frac;
+               int                     nbuckets;
+               FmgrInfo   *hashfunctions;
+               int                     i;
+
+               if (mcvsToUse > nvalues)
+                       mcvsToUse = nvalues;
+
+               /*
+                * Calculate the expected fraction of outer relation that will
+                * participate in the skew optimization.  If this isn't at least
+                * SKEW_MIN_OUTER_FRACTION, don't use skew optimization.
+                */
+               frac = 0;
+               for (i = 0; i < mcvsToUse; i++)
+                       frac += numbers[i];
+               if (frac < SKEW_MIN_OUTER_FRACTION)
+               {
+                       free_attstatsslot(node->skewColType,
+                                                         values, nvalues, numbers, nnumbers);
+                       ReleaseSysCache(statsTuple);
+                       return;
+               }
+
+               /*
+                * Okay, set up the skew hashtable.
+                *
+                * skewBucket[] is an open addressing hashtable with a power of 2 size
+                * that is greater than the number of MCV values.  (This ensures there
+                * will be at least one null entry, so searches will always terminate.)
+                *
+                * Note: this code could fail if mcvsToUse exceeds INT_MAX/8, but
+                * that is not currently possible since we limit pg_statistic entries
+                * to much less than that.
+                */
+               nbuckets = 2;
+               while (nbuckets <= mcvsToUse)
+                       nbuckets <<= 1;
+               /* use two more bits just to help avoid collisions */
+               nbuckets <<= 2;
+
+               hashtable->skewEnabled = true;
+               hashtable->skewBucketLen = nbuckets;
+
+               /*
+                * We allocate the bucket memory in the hashtable's batch context.
+                * It is only needed during the first batch, and this ensures it
+                * will be automatically removed once the first batch is done.
+                */
+               hashtable->skewBucket = (HashSkewBucket **)
+                       MemoryContextAllocZero(hashtable->batchCxt,
+                                                                  nbuckets * sizeof(HashSkewBucket *));
+               hashtable->skewBucketNums = (int *)
+                       MemoryContextAllocZero(hashtable->batchCxt,
+                                                                  mcvsToUse * sizeof(int));
+
+               hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *)
+                       + mcvsToUse * sizeof(int);
+               hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+                       + mcvsToUse * sizeof(int);
+
+               /*
+                * Create a skew bucket for each MCV hash value.
+                *
+                * Note: it is very important that we create the buckets in order
+                * of decreasing MCV frequency.  If we have to remove some buckets,
+                * they must be removed in reverse order of creation (see notes in
+                * ExecHashRemoveNextSkewBucket) and we want the least common MCVs
+                * to be removed first.
+                */
+               hashfunctions = hashtable->outer_hashfunctions;
+
+               for (i = 0; i < mcvsToUse; i++)
+               {
+                       uint32 hashvalue;
+                       int bucket;
+
+                       hashvalue = DatumGetUInt32(FunctionCall1(&hashfunctions[0],
+                                                                                                        values[i]));
+
+                       /*
+                        * While we have not hit a hole in the hashtable and have not hit
+                        * the desired bucket, we have collided with some previous hash
+                        * value, so try the next bucket location.  NB: this code must
+                        * match ExecHashGetSkewBucket.
+                        */
+                       bucket = hashvalue & (nbuckets - 1);
+                       while (hashtable->skewBucket[bucket] != NULL &&
+                                  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
+                               bucket = (bucket + 1) & (nbuckets - 1);
+
+                       /*
+                        * If we found an existing bucket with the same hashvalue,
+                        * leave it alone.  It's okay for two MCVs to share a hashvalue.
+                        */
+                       if (hashtable->skewBucket[bucket] != NULL)
+                               continue;
+
+                       /* Okay, create a new skew bucket for this hashvalue. */
+                       hashtable->skewBucket[bucket] = (HashSkewBucket *)
+                               MemoryContextAlloc(hashtable->batchCxt,
+                                                                  sizeof(HashSkewBucket));
+                       hashtable->skewBucket[bucket]->hashvalue = hashvalue;
+                       hashtable->skewBucket[bucket]->tuples = NULL;
+                       hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
+                       hashtable->nSkewBuckets++;
+                       hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
+                       hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
+               }
+
+               free_attstatsslot(node->skewColType,
+                                                 values, nvalues, numbers, nnumbers);
+       }
+
+       ReleaseSysCache(statsTuple);
+}
+
+/*
+ * ExecHashGetSkewBucket
+ *
+ *             Returns the index of the skew bucket for this hashvalue,
+ *             or INVALID_SKEW_BUCKET_NO if the hashvalue is not
+ *             associated with any active skew bucket.
+ */
+int
+ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
+{
+       int                     bucket;
+
+       /*
+        * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization
+        * (in particular, this happens after the initial batch is done).
+        */
+       if (!hashtable->skewEnabled)
+               return INVALID_SKEW_BUCKET_NO;
+
+       /*
+        * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
+        */
+       bucket = hashvalue & (hashtable->skewBucketLen - 1);
+
+       /*
+        * While we have not hit a hole in the hashtable and have not hit the
+        * desired bucket, we have collided with some other hash value, so try
+        * the next bucket location.
+        */
+       while (hashtable->skewBucket[bucket] != NULL &&
+                  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
+               bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
+
+       /*
+        * Found the desired bucket?
+        */
+       if (hashtable->skewBucket[bucket] != NULL)
+               return bucket;
+
+       /*
+        * There must not be any hashtable entry for this hash value.
+        */
+       return INVALID_SKEW_BUCKET_NO;
+}
+
+/*
+ * ExecHashSkewTableInsert
+ *
+ *             Insert a tuple into the skew hashtable.
+ *
+ * This should generally match up with the current-batch case in
+ * ExecHashTableInsert.
+ */
+static void
+ExecHashSkewTableInsert(HashJoinTable hashtable,
+                                               TupleTableSlot *slot,
+                                               uint32 hashvalue,
+                                               int bucketNumber)
+{
+       MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
+       HashJoinTuple hashTuple;
+       int                     hashTupleSize;
+
+       /* Create the HashJoinTuple */
+       hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
+       hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
+                                                                                                  hashTupleSize);
+       hashTuple->hashvalue = hashvalue;
+       memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
+
+       /* Push it onto the front of the skew bucket's list */
+       hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples;
+       hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
+
+       /* Account for space used, and back off if we've used too much */
+       hashtable->spaceUsed += hashTupleSize;
+       hashtable->spaceUsedSkew += hashTupleSize;
+       while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
+               ExecHashRemoveNextSkewBucket(hashtable);
+
+       /* Check we are not over the total spaceAllowed, either */
+       if (hashtable->spaceUsed > hashtable->spaceAllowed)
+               ExecHashIncreaseNumBatches(hashtable);
+}
+
+/*
+ *             ExecHashRemoveNextSkewBucket
+ *
+ *             Remove the least valuable skew bucket by pushing its tuples into
+ *             the main hash table.
+ */
+static void
+ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
+{
+       int bucketToRemove;
+       HashSkewBucket *bucket;
+       uint32 hashvalue;
+       int bucketno;
+       int batchno;
+       HashJoinTuple hashTuple;
+
+       /* Locate the bucket to remove */
+       bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1];
+       bucket = hashtable->skewBucket[bucketToRemove];
+
+       /*
+        * Calculate which bucket and batch the tuples belong to in the main
+        * hashtable.  They all have the same hash value, so it's the same for all
+        * of them.  Also note that it's not possible for nbatch to increase
+        * while we are processing the tuples.
+        */
+       hashvalue = bucket->hashvalue;
+       ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
+
+       /* Process all tuples in the bucket */
+       hashTuple = bucket->tuples;
+       while (hashTuple != NULL)
+       {
+               HashJoinTuple nextHashTuple = hashTuple->next;
+               MinimalTuple tuple;
+               Size    tupleSize;
+
+               /*
+                * This code must agree with ExecHashTableInsert.  We do not use
+                * ExecHashTableInsert directly as ExecHashTableInsert expects a
+                * TupleTableSlot while we already have HashJoinTuples.
+                */
+               tuple = HJTUPLE_MINTUPLE(hashTuple);
+               tupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
+
+               /* Decide whether to put the tuple in the hash table or a temp file */
+               if (batchno == hashtable->curbatch)
+               {
+                       /* Move the tuple to the main hash table */
+                       hashTuple->next = hashtable->buckets[bucketno];
+                       hashtable->buckets[bucketno] = hashTuple;
+                       /* We have reduced skew space, but overall space doesn't change */
+                       hashtable->spaceUsedSkew -= tupleSize;
+               }
+               else
+               {
+                       /* Put the tuple into a temp file for later batches */
+                       Assert(batchno > hashtable->curbatch);
+                       ExecHashJoinSaveTuple(tuple, hashvalue,
+                                                                 &hashtable->innerBatchFile[batchno]);
+                       pfree(hashTuple);
+                       hashtable->spaceUsed -= tupleSize;
+                       hashtable->spaceUsedSkew -= tupleSize;
+               }
+
+               hashTuple = nextHashTuple;
+       }
+
+       /*
+        * Free the bucket struct itself and reset the hashtable entry to NULL.
+        *
+        * NOTE: this is not nearly as simple as it looks on the surface, because
+        * of the possibility of collisions in the hashtable.  Suppose that hash
+        * values A and B collide at a particular hashtable entry, and that A
+        * was entered first so B gets shifted to a different table entry.  If
+        * we were to remove A first then ExecHashGetSkewBucket would mistakenly
+        * start reporting that B is not in the hashtable, because it would hit
+        * the NULL before finding B.  However, we always remove entries in the
+        * reverse order of creation, so this failure cannot happen.
+        */
+       hashtable->skewBucket[bucketToRemove] = NULL;
+       hashtable->nSkewBuckets--;
+       pfree(bucket);
+       hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD;
+       hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD;
+
+       /*
+        * If we have removed all skew buckets then give up on skew optimization.
+        * Release the arrays since they aren't useful any more.
+        */
+       if (hashtable->nSkewBuckets == 0)
+       {
+               hashtable->skewEnabled = false;
+               pfree(hashtable->skewBucket);
+               pfree(hashtable->skewBucketNums);
+               hashtable->skewBucket = NULL;
+               hashtable->skewBucketNums = NULL;
+               hashtable->spaceUsed -= hashtable->spaceUsedSkew;
+               hashtable->spaceUsedSkew = 0;
+       }
+}
index ad0c302..aea2ab3 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/executor/nodeHashjoin.c,v 1.97 2009/01/01 17:23:41 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/executor/nodeHashjoin.c,v 1.98 2009/03/21 00:04:38 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -198,19 +198,23 @@ ExecHashJoin(HashJoinState *node)
                        node->hj_MatchedOuter = false;
 
                        /*
-                        * now we have an outer tuple, find the corresponding bucket for
-                        * this tuple from the hash table
+                        * Now we have an outer tuple; find the corresponding bucket for
+                        * this tuple in the main hash table or skew hash table.
                         */
                        node->hj_CurHashValue = hashvalue;
                        ExecHashGetBucketAndBatch(hashtable, hashvalue,
                                                                          &node->hj_CurBucketNo, &batchno);
+                       node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
+                                                                                                                        hashvalue);
                        node->hj_CurTuple = NULL;
 
                        /*
                         * Now we've got an outer tuple and the corresponding hash bucket,
-                        * but this tuple may not belong to the current batch.
+                        * but it might not belong to the current batch, or it might
+                        * match a skew bucket.
                         */
-                       if (batchno != hashtable->curbatch)
+                       if (batchno != hashtable->curbatch &&
+                               node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
                        {
                                /*
                                 * Need to postpone this outer tuple to a later batch. Save it
@@ -452,6 +456,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
 
        hjstate->hj_CurHashValue = 0;
        hjstate->hj_CurBucketNo = 0;
+       hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
        hjstate->hj_CurTuple = NULL;
 
        /*
@@ -651,6 +656,19 @@ start_over:
                        BufFileClose(hashtable->outerBatchFile[curbatch]);
                hashtable->outerBatchFile[curbatch] = NULL;
        }
+       else                                            /* we just finished the first batch */
+       {
+               /*
+                * Reset some of the skew optimization state variables, since we
+                * no longer need to consider skew tuples after the first batch.
+                * The memory context reset we are about to do will release the
+                * skew hashtable itself.
+                */
+               hashtable->skewEnabled = false;
+               hashtable->skewBucket = NULL;
+               hashtable->skewBucketNums = NULL;
+               hashtable->spaceUsedSkew = 0;
+       }
 
        /*
         * We can always skip over any batches that are completely empty on both
@@ -880,6 +898,7 @@ ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
        /* Always reset intra-tuple state */
        node->hj_CurHashValue = 0;
        node->hj_CurBucketNo = 0;
+       node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
        node->hj_CurTuple = NULL;
 
        node->js.ps.ps_TupFromTlist = false;
index 3c8bf6f..f9a1efd 100644 (file)
@@ -15,7 +15,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.426 2009/03/10 22:09:25 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.427 2009/03/21 00:04:39 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -735,6 +735,10 @@ _copyHash(Hash *from)
        /*
         * copy remainder of node
         */
+       COPY_SCALAR_FIELD(skewTable);
+       COPY_SCALAR_FIELD(skewColumn);
+       COPY_SCALAR_FIELD(skewColType);
+       COPY_SCALAR_FIELD(skewColTypmod);
 
        return newnode;
 }
index d64f0ad..212fc06 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.354 2009/03/10 22:09:25 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.355 2009/03/21 00:04:39 tgl Exp $
  *
  * NOTES
  *       Every node type that can appear in stored rules' parsetrees *must*
@@ -675,6 +675,11 @@ _outHash(StringInfo str, Hash *node)
        WRITE_NODE_TYPE("HASH");
 
        _outPlanInfo(str, (Plan *) node);
+
+       WRITE_OID_FIELD(skewTable);
+       WRITE_INT_FIELD(skewColumn);
+       WRITE_OID_FIELD(skewColType);
+       WRITE_INT_FIELD(skewColTypmod);
 }
 
 static void
index 07ddf43..b07a259 100644 (file)
@@ -54,7 +54,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/optimizer/path/costsize.c,v 1.204 2009/02/06 23:43:23 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/optimizer/path/costsize.c,v 1.205 2009/03/21 00:04:39 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1821,6 +1821,7 @@ cost_hashjoin(HashPath *path, PlannerInfo *root, SpecialJoinInfo *sjinfo)
        int                     num_hashclauses = list_length(hashclauses);
        int                     numbuckets;
        int                     numbatches;
+       int                     num_skew_mcvs;
        double          virtualbuckets;
        Selectivity innerbucketsize;
        ListCell   *hcl;
@@ -1862,11 +1863,22 @@ cost_hashjoin(HashPath *path, PlannerInfo *root, SpecialJoinInfo *sjinfo)
                * inner_path_rows;
        run_cost += cpu_operator_cost * num_hashclauses * outer_path_rows;
 
-       /* Get hash table size that executor would use for inner relation */
+       /*
+        * Get hash table size that executor would use for inner relation.
+        *
+        * XXX for the moment, always assume that skew optimization will be
+        * performed.  As long as SKEW_WORK_MEM_PERCENT is small, it's not worth
+        * trying to determine that for sure.
+        *
+        * XXX at some point it might be interesting to try to account for skew
+        * optimization in the cost estimate, but for now, we don't.
+        */
        ExecChooseHashTableSize(inner_path_rows,
                                                        inner_path->parent->width,
+                                                       true,   /* useskew */
                                                        &numbuckets,
-                                                       &numbatches);
+                                                       &numbatches,
+                                                       &num_skew_mcvs);
        virtualbuckets = (double) numbuckets *(double) numbatches;
 
        /*
index a243ca8..be4d79f 100644 (file)
@@ -10,7 +10,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/optimizer/plan/createplan.c,v 1.255 2009/01/01 17:23:44 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/optimizer/plan/createplan.c,v 1.256 2009/03/21 00:04:39 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -112,7 +112,11 @@ static HashJoin *make_hashjoin(List *tlist,
                          List *hashclauses,
                          Plan *lefttree, Plan *righttree,
                          JoinType jointype);
-static Hash *make_hash(Plan *lefttree);
+static Hash *make_hash(Plan *lefttree,
+                                          Oid skewTable,
+                                          AttrNumber skewColumn,
+                                          Oid skewColType,
+                                          int32 skewColTypmod);
 static MergeJoin *make_mergejoin(List *tlist,
                           List *joinclauses, List *otherclauses,
                           List *mergeclauses,
@@ -1864,6 +1868,10 @@ create_hashjoin_plan(PlannerInfo *root,
        List       *joinclauses;
        List       *otherclauses;
        List       *hashclauses;
+       Oid                     skewTable = InvalidOid;
+       AttrNumber      skewColumn = InvalidAttrNumber;
+       Oid                     skewColType = InvalidOid;
+       int32           skewColTypmod = -1;
        HashJoin   *join_plan;
        Hash       *hash_plan;
 
@@ -1903,9 +1911,46 @@ create_hashjoin_plan(PlannerInfo *root,
        disuse_physical_tlist(inner_plan, best_path->jpath.innerjoinpath);
 
        /*
+        * If there is a single join clause and we can identify the outer
+        * variable as a simple column reference, supply its identity for
+        * possible use in skew optimization.  (Note: in principle we could
+        * do skew optimization with multiple join clauses, but we'd have to
+        * be able to determine the most common combinations of outer values,
+        * which we don't currently have enough stats for.)
+        */
+       if (list_length(hashclauses) == 1)
+       {
+               OpExpr     *clause = (OpExpr *) linitial(hashclauses);
+               Node       *node;
+
+               Assert(is_opclause(clause));
+               node = (Node *) linitial(clause->args);
+               if (IsA(node, RelabelType))
+                       node = (Node *) ((RelabelType *) node)->arg;
+               if (IsA(node, Var))
+               {
+                       Var        *var = (Var *) node;
+                       RangeTblEntry *rte;
+
+                       rte = root->simple_rte_array[var->varno];
+                       if (rte->rtekind == RTE_RELATION)
+                       {
+                               skewTable = rte->relid;
+                               skewColumn = var->varattno;
+                               skewColType = var->vartype;
+                               skewColTypmod = var->vartypmod;
+                       }
+               }
+       }
+
+       /*
         * Build the hash node and hash join node.
         */
-       hash_plan = make_hash(inner_plan);
+       hash_plan = make_hash(inner_plan,
+                                                 skewTable,
+                                                 skewColumn,
+                                                 skewColType,
+                                                 skewColTypmod);
        join_plan = make_hashjoin(tlist,
                                                          joinclauses,
                                                          otherclauses,
@@ -2713,7 +2758,11 @@ make_hashjoin(List *tlist,
 }
 
 static Hash *
-make_hash(Plan *lefttree)
+make_hash(Plan *lefttree,
+                 Oid skewTable,
+                 AttrNumber skewColumn,
+                 Oid skewColType,
+                 int32 skewColTypmod)
 {
        Hash       *node = makeNode(Hash);
        Plan       *plan = &node->plan;
@@ -2730,6 +2779,11 @@ make_hash(Plan *lefttree)
        plan->lefttree = lefttree;
        plan->righttree = NULL;
 
+       node->skewTable = skewTable;
+       node->skewColumn = skewColumn;
+       node->skewColType = skewColType;
+       node->skewColTypmod = skewColTypmod;
+
        return node;
 }
 
index 40a5244..5b18282 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/executor/hashjoin.h,v 1.49 2009/01/01 17:23:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/executor/hashjoin.h,v 1.50 2009/03/21 00:04:40 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -72,6 +72,36 @@ typedef struct HashJoinTupleData
 #define HJTUPLE_MINTUPLE(hjtup)  \
        ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
 
+/*
+ * If the outer relation's distribution is sufficiently nonuniform, we attempt
+ * to optimize the join by treating the hash values corresponding to the outer
+ * relation's MCVs specially.  Inner relation tuples matching these hash
+ * values go into the "skew" hashtable instead of the main hashtable, and
+ * outer relation tuples with these hash values are matched against that
+ * table instead of the main one.  Thus, tuples with these hash values are
+ * effectively handled as part of the first batch and will never go to disk.
+ * The skew hashtable is limited to SKEW_WORK_MEM_PERCENT of the total memory
+ * allowed for the join; while building the hashtables, we decrease the number
+ * of MCVs being specially treated if needed to stay under this limit.
+ *
+ * Note: you might wonder why we look at the outer relation stats for this,
+ * rather than the inner.  One reason is that the outer relation is typically
+ * bigger, so we get more I/O savings by optimizing for its most common values.
+ * Also, for similarly-sized relations, the planner prefers to put the more
+ * uniformly distributed relation on the inside, so we're more likely to find
+ * interesting skew in the outer relation.
+ */
+typedef struct HashSkewBucket
+{
+       uint32          hashvalue;              /* common hash value */
+       HashJoinTuple tuples;           /* linked list of inner-relation tuples */
+} HashSkewBucket;
+
+#define SKEW_BUCKET_OVERHEAD  MAXALIGN(sizeof(HashSkewBucket))
+#define INVALID_SKEW_BUCKET_NO  (-1)
+#define SKEW_WORK_MEM_PERCENT  2
+#define SKEW_MIN_OUTER_FRACTION  0.01
+
 
 typedef struct HashJoinTableData
 {
@@ -82,6 +112,12 @@ typedef struct HashJoinTableData
        struct HashJoinTupleData **buckets;
        /* buckets array is per-batch storage, as are all the tuples */
 
+       bool            skewEnabled;    /* are we using skew optimization? */
+       HashSkewBucket **skewBucket;    /* hashtable of skew buckets */
+       int                     skewBucketLen;  /* size of skewBucket array (a power of 2!) */
+       int                     nSkewBuckets;   /* number of active skew buckets */
+       int                *skewBucketNums;     /* array indexes of active skew buckets */
+
        int                     nbatch;                 /* number of batches */
        int                     curbatch;               /* current batch #; 0 during 1st pass */
 
@@ -113,6 +149,8 @@ typedef struct HashJoinTableData
 
        Size            spaceUsed;              /* memory space currently used by tuples */
        Size            spaceAllowed;   /* upper limit for space used */
+       Size            spaceUsedSkew;  /* skew hash table's current space usage */
+       Size            spaceAllowedSkew;       /* upper limit for skew hashtable */
 
        MemoryContext hashCxt;          /* context for whole-hash-join storage */
        MemoryContext batchCxt;         /* context for this-batch-only storage */
index ae08880..7c8ca56 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/executor/nodeHash.h,v 1.46 2009/01/01 17:23:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/executor/nodeHash.h,v 1.47 2009/03/21 00:04:40 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -41,8 +41,10 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 extern HashJoinTuple ExecScanHashBucket(HashJoinState *hjstate,
                                   ExprContext *econtext);
 extern void ExecHashTableReset(HashJoinTable hashtable);
-extern void ExecChooseHashTableSize(double ntuples, int tupwidth,
+extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
                                                int *numbuckets,
-                                               int *numbatches);
+                                               int *numbatches,
+                                               int *num_skew_mcvs);
+extern int     ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
 
 #endif   /* NODEHASH_H */
index 8d87ec1..996efa8 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.201 2009/01/12 05:10:45 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.202 2009/03/21 00:04:40 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1374,11 +1374,12 @@ typedef struct MergeJoinState
  *             hj_HashTable                    hash table for the hashjoin
  *                                                             (NULL if table not built yet)
  *             hj_CurHashValue                 hash value for current outer tuple
- *             hj_CurBucketNo                  bucket# for current outer tuple
+ *             hj_CurBucketNo                  regular bucket# for current outer tuple
+ *             hj_CurSkewBucketNo              skew bucket# for current outer tuple
  *             hj_CurTuple                             last inner tuple matched to current outer
  *                                                             tuple, or NULL if starting search
- *                                                             (CurHashValue, CurBucketNo and CurTuple are
- *                                                              undefined if OuterTupleSlot is empty!)
+ *                                                             (hj_CurXXX variables are undefined if
+ *                                                             OuterTupleSlot is empty!)
  *             hj_OuterHashKeys                the outer hash keys in the hashjoin condition
  *             hj_InnerHashKeys                the inner hash keys in the hashjoin condition
  *             hj_HashOperators                the join operators in the hashjoin condition
@@ -1403,6 +1404,7 @@ typedef struct HashJoinState
        HashJoinTable hj_HashTable;
        uint32          hj_CurHashValue;
        int                     hj_CurBucketNo;
+       int                     hj_CurSkewBucketNo;
        HashJoinTuple hj_CurTuple;
        List       *hj_OuterHashKeys;           /* list of ExprState nodes */
        List       *hj_InnerHashKeys;           /* list of ExprState nodes */
index 12742b5..9caf089 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/nodes/plannodes.h,v 1.108 2009/01/01 17:24:00 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/nodes/plannodes.h,v 1.109 2009/03/21 00:04:40 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -460,7 +460,7 @@ typedef struct MergeJoin
 } MergeJoin;
 
 /* ----------------
- *             hash join (probe) node
+ *             hash join node
  * ----------------
  */
 typedef struct HashJoin
@@ -567,11 +567,20 @@ typedef struct Unique
 
 /* ----------------
  *             hash build node
+ *
+ * If the executor is supposed to try to apply skew join optimization, then
+ * skewTable/skewColumn identify the outer relation's join key column, from
+ * which the relevant MCV statistics can be fetched.  Also, its type
+ * information is provided to save a lookup.
  * ----------------
  */
 typedef struct Hash
 {
        Plan            plan;
+       Oid                     skewTable;              /* outer join key's table OID, or InvalidOid */
+       AttrNumber      skewColumn;             /* outer join key's column #, or zero */
+       Oid                     skewColType;    /* datatype of the outer key column */
+       int32           skewColTypmod;  /* typmod of the outer key column */
        /* all other info is in the parent HashJoin node */
 } Hash;