Optimize multi-batch hash joins when the outer relation has a nonuniform
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 21 Mar 2009 00:04:40 +0000 (00:04 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 21 Mar 2009 00:04:40 +0000 (00:04 +0000)
distribution, by creating a special fast path for the (first few) most common
values of the outer relation.  Tuples having hashvalues matching the MCVs
are effectively forced to be in the first batch, so that we never write
them out to the batch temp files.

Bryce Cutt and Ramon Lawrence, with some editorialization by me.

src/backend/executor/nodeHash.c
src/backend/executor/nodeHashjoin.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/plan/createplan.c
src/include/executor/hashjoin.h
src/include/executor/nodeHash.h
src/include/nodes/execnodes.h
src/include/nodes/plannodes.h

index 83be33b999a30559817fde1461882c07c42b6cec..dd97ef45725aba610b8dfa843c3367670048b27e 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/executor/nodeHash.c,v 1.117 2009/01/01 17:23:41 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/executor/nodeHash.c,v 1.118 2009/03/21 00:04:38 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -24,6 +24,7 @@
 #include <math.h>
 #include <limits.h>
 
+#include "catalog/pg_statistic.h"
 #include "commands/tablespace.h"
 #include "executor/execdebug.h"
 #include "executor/hashjoin.h"
 #include "utils/dynahash.h"
 #include "utils/memutils.h"
 #include "utils/lsyscache.h"
+#include "utils/syscache.h"
 
 
 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
+static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
+                                                                 int mcvsToUse);
+static void ExecHashSkewTableInsert(HashJoinTable hashtable,
+                                                                       TupleTableSlot *slot,
+                                                                       uint32 hashvalue,
+                                                                       int bucketNumber);
+static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
 
 
 /* ----------------------------------------------------------------
@@ -99,7 +108,20 @@ MultiExecHash(HashState *node)
                if (ExecHashGetHashValue(hashtable, econtext, hashkeys, false, false,
                                                                 &hashvalue))
                {
-                       ExecHashTableInsert(hashtable, slot, hashvalue);
+                       int             bucketNumber;
+
+                       bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
+                       if (bucketNumber != INVALID_SKEW_BUCKET_NO)
+                       {
+                               /* It's a skew tuple, so put it into that hash table */
+                               ExecHashSkewTableInsert(hashtable, slot, hashvalue,
+                                                                               bucketNumber);
+                       }
+                       else
+                       {
+                               /* Not subject to skew optimization, so insert normally */
+                               ExecHashTableInsert(hashtable, slot, hashvalue);
+                       }
                        hashtable->totalTuples += 1;
                }
        }
@@ -225,6 +247,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
        Plan       *outerNode;
        int                     nbuckets;
        int                     nbatch;
+       int                     num_skew_mcvs;
        int                     log2_nbuckets;
        int                     nkeys;
        int                     i;
@@ -239,7 +262,8 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
        outerNode = outerPlan(node);
 
        ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
-                                                       &nbuckets, &nbatch);
+                                                       OidIsValid(node->skewTable),
+                                                       &nbuckets, &nbatch, &num_skew_mcvs);
 
 #ifdef HJDEBUG
        printf("nbatch = %d, nbuckets = %d\n", nbatch, nbuckets);
@@ -259,6 +283,11 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
        hashtable->nbuckets = nbuckets;
        hashtable->log2_nbuckets = log2_nbuckets;
        hashtable->buckets = NULL;
+       hashtable->skewEnabled = false;
+       hashtable->skewBucket = NULL;
+       hashtable->skewBucketLen = 0;
+       hashtable->nSkewBuckets = 0;
+       hashtable->skewBucketNums = NULL;
        hashtable->nbatch = nbatch;
        hashtable->curbatch = 0;
        hashtable->nbatch_original = nbatch;
@@ -269,6 +298,9 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
        hashtable->outerBatchFile = NULL;
        hashtable->spaceUsed = 0;
        hashtable->spaceAllowed = work_mem * 1024L;
+       hashtable->spaceUsedSkew = 0;
+       hashtable->spaceAllowedSkew =
+               hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
 
        /*
         * Get info about the hash functions to be used for each hash key. Also
@@ -339,6 +371,13 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
        hashtable->buckets = (HashJoinTuple *)
                palloc0(nbuckets * sizeof(HashJoinTuple));
 
+       /*
+        * Set up for skew optimization, if possible and there's a need for more
+        * than one batch.  (In a one-batch join, there's no point in it.)
+        */
+       if (nbatch > 1)
+               ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
+
        MemoryContextSwitchTo(oldcxt);
 
        return hashtable;
@@ -356,13 +395,15 @@ ExecHashTableCreate(Hash *node, List *hashOperators)
 #define NTUP_PER_BUCKET                        10
 
 void
-ExecChooseHashTableSize(double ntuples, int tupwidth,
+ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
                                                int *numbuckets,
-                                               int *numbatches)
+                                               int *numbatches,
+                                               int *num_skew_mcvs)
 {
        int                     tupsize;
        double          inner_rel_bytes;
        long            hash_table_bytes;
+       long            skew_table_bytes;
        int                     nbatch;
        int                     nbuckets;
        int                     i;
@@ -386,6 +427,41 @@ ExecChooseHashTableSize(double ntuples, int tupwidth,
         */
        hash_table_bytes = work_mem * 1024L;
 
+       /*
+        * If skew optimization is possible, estimate the number of skew buckets
+        * that will fit in the memory allowed, and decrement the assumed space
+        * available for the main hash table accordingly.
+        *
+        * We make the optimistic assumption that each skew bucket will contain
+        * one inner-relation tuple.  If that turns out to be low, we will recover
+        * at runtime by reducing the number of skew buckets.
+        *
+        * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
+        * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
+        * will round up to the next power of 2 and then multiply by 4 to reduce
+        * collisions.
+        */
+       if (useskew)
+       {
+               skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
+
+               *num_skew_mcvs = skew_table_bytes / (
+                       /* size of a hash tuple */
+                       tupsize +
+                       /* worst-case size of skewBucket[] per MCV */
+                       (8 * sizeof(HashSkewBucket *)) +
+                       /* size of skewBucketNums[] entry */
+                       sizeof(int) +
+                       /* size of skew bucket struct itself */
+                       SKEW_BUCKET_OVERHEAD
+                       );
+
+               if (*num_skew_mcvs > 0)
+                       hash_table_bytes -= skew_table_bytes;
+       }
+       else
+               *num_skew_mcvs = 0;
+
        /*
         * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
         * memory is filled.  Set nbatch to the smallest power of 2 that appears
@@ -813,13 +889,18 @@ ExecScanHashBucket(HashJoinState *hjstate,
        uint32          hashvalue = hjstate->hj_CurHashValue;
 
        /*
-        * hj_CurTuple is NULL to start scanning a new bucket, or the address of
-        * the last tuple returned from the current bucket.
+        * hj_CurTuple is the address of the tuple last returned from the current
+        * bucket, or NULL if it's time to start scanning a new bucket.
+        *
+        * If the tuple hashed to a skew bucket then scan the skew bucket
+        * otherwise scan the standard hashtable bucket.
         */
-       if (hashTuple == NULL)
-               hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
-       else
+       if (hashTuple != NULL)
                hashTuple = hashTuple->next;
+       else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
+               hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
+       else
+               hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
 
        while (hashTuple != NULL)
        {
@@ -889,3 +970,347 @@ ExecReScanHash(HashState *node, ExprContext *exprCtxt)
        if (((PlanState *) node)->lefttree->chgParam == NULL)
                ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
 }
+
+
+/*
+ * ExecHashBuildSkewHash
+ *
+ *             Set up for skew optimization if we can identify the most common values
+ *             (MCVs) of the outer relation's join key.  We make a skew hash bucket
+ *             for the hash value of each MCV, up to the number of slots allowed
+ *             based on available memory.
+ */
+static void
+ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
+{
+       HeapTupleData   *statsTuple;
+       Datum                   *values;
+       int                             nvalues;
+       float4                  *numbers;
+       int                             nnumbers;
+
+       /* Do nothing if planner didn't identify the outer relation's join key */
+       if (!OidIsValid(node->skewTable))
+               return;
+       /* Also, do nothing if we don't have room for at least one skew bucket */
+       if (mcvsToUse <= 0)
+               return;
+
+       /*
+        * Try to find the MCV statistics for the outer relation's join key.
+        */
+       statsTuple = SearchSysCache(STATRELATT,
+                                                               ObjectIdGetDatum(node->skewTable),
+                                                               Int16GetDatum(node->skewColumn),
+                                                               0, 0);
+       if (!HeapTupleIsValid(statsTuple))
+               return;
+
+       if (get_attstatsslot(statsTuple, node->skewColType, node->skewColTypmod,
+                                                STATISTIC_KIND_MCV, InvalidOid,
+                                                &values, &nvalues,
+                                                &numbers, &nnumbers))
+       {
+               double          frac;
+               int                     nbuckets;
+               FmgrInfo   *hashfunctions;
+               int                     i;
+
+               if (mcvsToUse > nvalues)
+                       mcvsToUse = nvalues;
+
+               /*
+                * Calculate the expected fraction of outer relation that will
+                * participate in the skew optimization.  If this isn't at least
+                * SKEW_MIN_OUTER_FRACTION, don't use skew optimization.
+                */
+               frac = 0;
+               for (i = 0; i < mcvsToUse; i++)
+                       frac += numbers[i];
+               if (frac < SKEW_MIN_OUTER_FRACTION)
+               {
+                       free_attstatsslot(node->skewColType,
+                                                         values, nvalues, numbers, nnumbers);
+                       ReleaseSysCache(statsTuple);
+                       return;
+               }
+
+               /*
+                * Okay, set up the skew hashtable.
+                *
+                * skewBucket[] is an open addressing hashtable with a power of 2 size
+                * that is greater than the number of MCV values.  (This ensures there
+                * will be at least one null entry, so searches will always terminate.)
+                *
+                * Note: this code could fail if mcvsToUse exceeds INT_MAX/8, but
+                * that is not currently possible since we limit pg_statistic entries
+                * to much less than that.
+                */
+               nbuckets = 2;
+               while (nbuckets <= mcvsToUse)
+                       nbuckets <<= 1;
+               /* use two more bits just to help avoid collisions */
+               nbuckets <<= 2;
+
+               hashtable->skewEnabled = true;
+               hashtable->skewBucketLen = nbuckets;
+
+               /*
+                * We allocate the bucket memory in the hashtable's batch context.
+                * It is only needed during the first batch, and this ensures it
+                * will be automatically removed once the first batch is done.
+                */
+               hashtable->skewBucket = (HashSkewBucket **)
+                       MemoryContextAllocZero(hashtable->batchCxt,
+                                                                  nbuckets * sizeof(HashSkewBucket *));
+               hashtable->skewBucketNums = (int *)
+                       MemoryContextAllocZero(hashtable->batchCxt,
+                                                                  mcvsToUse * sizeof(int));
+
+               hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *)
+                       + mcvsToUse * sizeof(int);
+               hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+                       + mcvsToUse * sizeof(int);
+
+               /*
+                * Create a skew bucket for each MCV hash value.
+                *
+                * Note: it is very important that we create the buckets in order
+                * of decreasing MCV frequency.  If we have to remove some buckets,
+                * they must be removed in reverse order of creation (see notes in
+                * ExecHashRemoveNextSkewBucket) and we want the least common MCVs
+                * to be removed first.
+                */
+               hashfunctions = hashtable->outer_hashfunctions;
+
+               for (i = 0; i < mcvsToUse; i++)
+               {
+                       uint32 hashvalue;
+                       int bucket;
+
+                       hashvalue = DatumGetUInt32(FunctionCall1(&hashfunctions[0],
+                                                                                                        values[i]));
+
+                       /*
+                        * While we have not hit a hole in the hashtable and have not hit
+                        * the desired bucket, we have collided with some previous hash
+                        * value, so try the next bucket location.  NB: this code must
+                        * match ExecHashGetSkewBucket.
+                        */
+                       bucket = hashvalue & (nbuckets - 1);
+                       while (hashtable->skewBucket[bucket] != NULL &&
+                                  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
+                               bucket = (bucket + 1) & (nbuckets - 1);
+
+                       /*
+                        * If we found an existing bucket with the same hashvalue,
+                        * leave it alone.  It's okay for two MCVs to share a hashvalue.
+                        */
+                       if (hashtable->skewBucket[bucket] != NULL)
+                               continue;
+
+                       /* Okay, create a new skew bucket for this hashvalue. */
+                       hashtable->skewBucket[bucket] = (HashSkewBucket *)
+                               MemoryContextAlloc(hashtable->batchCxt,
+                                                                  sizeof(HashSkewBucket));
+                       hashtable->skewBucket[bucket]->hashvalue = hashvalue;
+                       hashtable->skewBucket[bucket]->tuples = NULL;
+                       hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
+                       hashtable->nSkewBuckets++;
+                       hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
+                       hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
+               }
+
+               free_attstatsslot(node->skewColType,
+                                                 values, nvalues, numbers, nnumbers);
+       }
+
+       ReleaseSysCache(statsTuple);
+}
+
+/*
+ * ExecHashGetSkewBucket
+ *
+ *             Returns the index of the skew bucket for this hashvalue,
+ *             or INVALID_SKEW_BUCKET_NO if the hashvalue is not
+ *             associated with any active skew bucket.
+ */
+int
+ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
+{
+       int                     bucket;
+
+       /*
+        * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization
+        * (in particular, this happens after the initial batch is done).
+        */
+       if (!hashtable->skewEnabled)
+               return INVALID_SKEW_BUCKET_NO;
+
+       /*
+        * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
+        */
+       bucket = hashvalue & (hashtable->skewBucketLen - 1);
+
+       /*
+        * While we have not hit a hole in the hashtable and have not hit the
+        * desired bucket, we have collided with some other hash value, so try
+        * the next bucket location.
+        */
+       while (hashtable->skewBucket[bucket] != NULL &&
+                  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
+               bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
+
+       /*
+        * Found the desired bucket?
+        */
+       if (hashtable->skewBucket[bucket] != NULL)
+               return bucket;
+
+       /*
+        * There must not be any hashtable entry for this hash value.
+        */
+       return INVALID_SKEW_BUCKET_NO;
+}
+
+/*
+ * ExecHashSkewTableInsert
+ *
+ *             Insert a tuple into the skew hashtable.
+ *
+ * This should generally match up with the current-batch case in
+ * ExecHashTableInsert.
+ */
+static void
+ExecHashSkewTableInsert(HashJoinTable hashtable,
+                                               TupleTableSlot *slot,
+                                               uint32 hashvalue,
+                                               int bucketNumber)
+{
+       MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
+       HashJoinTuple hashTuple;
+       int                     hashTupleSize;
+
+       /* Create the HashJoinTuple */
+       hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
+       hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
+                                                                                                  hashTupleSize);
+       hashTuple->hashvalue = hashvalue;
+       memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
+
+       /* Push it onto the front of the skew bucket's list */
+       hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples;
+       hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
+
+       /* Account for space used, and back off if we've used too much */
+       hashtable->spaceUsed += hashTupleSize;
+       hashtable->spaceUsedSkew += hashTupleSize;
+       while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
+               ExecHashRemoveNextSkewBucket(hashtable);
+
+       /* Check we are not over the total spaceAllowed, either */
+       if (hashtable->spaceUsed > hashtable->spaceAllowed)
+               ExecHashIncreaseNumBatches(hashtable);
+}
+
+/*
+ *             ExecHashRemoveNextSkewBucket
+ *
+ *             Remove the least valuable skew bucket by pushing its tuples into
+ *             the main hash table.
+ */
+static void
+ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
+{
+       int bucketToRemove;
+       HashSkewBucket *bucket;
+       uint32 hashvalue;
+       int bucketno;
+       int batchno;
+       HashJoinTuple hashTuple;
+
+       /* Locate the bucket to remove */
+       bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1];
+       bucket = hashtable->skewBucket[bucketToRemove];
+
+       /*
+        * Calculate which bucket and batch the tuples belong to in the main
+        * hashtable.  They all have the same hash value, so it's the same for all
+        * of them.  Also note that it's not possible for nbatch to increase
+        * while we are processing the tuples.
+        */
+       hashvalue = bucket->hashvalue;
+       ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
+
+       /* Process all tuples in the bucket */
+       hashTuple = bucket->tuples;
+       while (hashTuple != NULL)
+       {
+               HashJoinTuple nextHashTuple = hashTuple->next;
+               MinimalTuple tuple;
+               Size    tupleSize;
+
+               /*
+                * This code must agree with ExecHashTableInsert.  We do not use
+                * ExecHashTableInsert directly as ExecHashTableInsert expects a
+                * TupleTableSlot while we already have HashJoinTuples.
+                */
+               tuple = HJTUPLE_MINTUPLE(hashTuple);
+               tupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
+
+               /* Decide whether to put the tuple in the hash table or a temp file */
+               if (batchno == hashtable->curbatch)
+               {
+                       /* Move the tuple to the main hash table */
+                       hashTuple->next = hashtable->buckets[bucketno];
+                       hashtable->buckets[bucketno] = hashTuple;
+                       /* We have reduced skew space, but overall space doesn't change */
+                       hashtable->spaceUsedSkew -= tupleSize;
+               }
+               else
+               {
+                       /* Put the tuple into a temp file for later batches */
+                       Assert(batchno > hashtable->curbatch);
+                       ExecHashJoinSaveTuple(tuple, hashvalue,
+                                                                 &hashtable->innerBatchFile[batchno]);
+                       pfree(hashTuple);
+                       hashtable->spaceUsed -= tupleSize;
+                       hashtable->spaceUsedSkew -= tupleSize;
+               }
+
+               hashTuple = nextHashTuple;
+       }
+
+       /*
+        * Free the bucket struct itself and reset the hashtable entry to NULL.
+        *
+        * NOTE: this is not nearly as simple as it looks on the surface, because
+        * of the possibility of collisions in the hashtable.  Suppose that hash
+        * values A and B collide at a particular hashtable entry, and that A
+        * was entered first so B gets shifted to a different table entry.  If
+        * we were to remove A first then ExecHashGetSkewBucket would mistakenly
+        * start reporting that B is not in the hashtable, because it would hit
+        * the NULL before finding B.  However, we always remove entries in the
+        * reverse order of creation, so this failure cannot happen.
+        */
+       hashtable->skewBucket[bucketToRemove] = NULL;
+       hashtable->nSkewBuckets--;
+       pfree(bucket);
+       hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD;
+       hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD;
+
+       /*
+        * If we have removed all skew buckets then give up on skew optimization.
+        * Release the arrays since they aren't useful any more.
+        */
+       if (hashtable->nSkewBuckets == 0)
+       {
+               hashtable->skewEnabled = false;
+               pfree(hashtable->skewBucket);
+               pfree(hashtable->skewBucketNums);
+               hashtable->skewBucket = NULL;
+               hashtable->skewBucketNums = NULL;
+               hashtable->spaceUsed -= hashtable->spaceUsedSkew;
+               hashtable->spaceUsedSkew = 0;
+       }
+}
index ad0c302e029b9ab960f53bc919312a3a41708d10..aea2ab3e4fd7867bf9f97bf7e660484bc4aee72c 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/executor/nodeHashjoin.c,v 1.97 2009/01/01 17:23:41 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/executor/nodeHashjoin.c,v 1.98 2009/03/21 00:04:38 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -198,19 +198,23 @@ ExecHashJoin(HashJoinState *node)
                        node->hj_MatchedOuter = false;
 
                        /*
-                        * now we have an outer tuple, find the corresponding bucket for
-                        * this tuple from the hash table
+                        * Now we have an outer tuple; find the corresponding bucket for
+                        * this tuple in the main hash table or skew hash table.
                         */
                        node->hj_CurHashValue = hashvalue;
                        ExecHashGetBucketAndBatch(hashtable, hashvalue,
                                                                          &node->hj_CurBucketNo, &batchno);
+                       node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
+                                                                                                                        hashvalue);
                        node->hj_CurTuple = NULL;
 
                        /*
                         * Now we've got an outer tuple and the corresponding hash bucket,
-                        * but this tuple may not belong to the current batch.
+                        * but it might not belong to the current batch, or it might
+                        * match a skew bucket.
                         */
-                       if (batchno != hashtable->curbatch)
+                       if (batchno != hashtable->curbatch &&
+                               node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
                        {
                                /*
                                 * Need to postpone this outer tuple to a later batch. Save it
@@ -452,6 +456,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
 
        hjstate->hj_CurHashValue = 0;
        hjstate->hj_CurBucketNo = 0;
+       hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
        hjstate->hj_CurTuple = NULL;
 
        /*
@@ -651,6 +656,19 @@ start_over:
                        BufFileClose(hashtable->outerBatchFile[curbatch]);
                hashtable->outerBatchFile[curbatch] = NULL;
        }
+       else                                            /* we just finished the first batch */
+       {
+               /*
+                * Reset some of the skew optimization state variables, since we
+                * no longer need to consider skew tuples after the first batch.
+                * The memory context reset we are about to do will release the
+                * skew hashtable itself.
+                */
+               hashtable->skewEnabled = false;
+               hashtable->skewBucket = NULL;
+               hashtable->skewBucketNums = NULL;
+               hashtable->spaceUsedSkew = 0;
+       }
 
        /*
         * We can always skip over any batches that are completely empty on both
@@ -880,6 +898,7 @@ ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
        /* Always reset intra-tuple state */
        node->hj_CurHashValue = 0;
        node->hj_CurBucketNo = 0;
+       node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
        node->hj_CurTuple = NULL;
 
        node->js.ps.ps_TupFromTlist = false;
index 3c8bf6fb69a49918215245f2c788bdac8884ee20..f9a1efdc4476ba46a547dda375e446d73722596b 100644 (file)
@@ -15,7 +15,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.426 2009/03/10 22:09:25 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.427 2009/03/21 00:04:39 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -735,6 +735,10 @@ _copyHash(Hash *from)
        /*
         * copy remainder of node
         */
+       COPY_SCALAR_FIELD(skewTable);
+       COPY_SCALAR_FIELD(skewColumn);
+       COPY_SCALAR_FIELD(skewColType);
+       COPY_SCALAR_FIELD(skewColTypmod);
 
        return newnode;
 }
index d64f0ad0426b36c09b2006fff9ba48d615c3e0f4..212fc0673c27082b05afa17a8946ba730a4aaa31 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.354 2009/03/10 22:09:25 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.355 2009/03/21 00:04:39 tgl Exp $
  *
  * NOTES
  *       Every node type that can appear in stored rules' parsetrees *must*
@@ -675,6 +675,11 @@ _outHash(StringInfo str, Hash *node)
        WRITE_NODE_TYPE("HASH");
 
        _outPlanInfo(str, (Plan *) node);
+
+       WRITE_OID_FIELD(skewTable);
+       WRITE_INT_FIELD(skewColumn);
+       WRITE_OID_FIELD(skewColType);
+       WRITE_INT_FIELD(skewColTypmod);
 }
 
 static void
index 07ddf43c8d6073f5af2cc19544f73cfa778ea5b9..b07a2599bbeb3ece9c12d39020d8feecaae3e6d8 100644 (file)
@@ -54,7 +54,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/optimizer/path/costsize.c,v 1.204 2009/02/06 23:43:23 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/optimizer/path/costsize.c,v 1.205 2009/03/21 00:04:39 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1821,6 +1821,7 @@ cost_hashjoin(HashPath *path, PlannerInfo *root, SpecialJoinInfo *sjinfo)
        int                     num_hashclauses = list_length(hashclauses);
        int                     numbuckets;
        int                     numbatches;
+       int                     num_skew_mcvs;
        double          virtualbuckets;
        Selectivity innerbucketsize;
        ListCell   *hcl;
@@ -1862,11 +1863,22 @@ cost_hashjoin(HashPath *path, PlannerInfo *root, SpecialJoinInfo *sjinfo)
                * inner_path_rows;
        run_cost += cpu_operator_cost * num_hashclauses * outer_path_rows;
 
-       /* Get hash table size that executor would use for inner relation */
+       /*
+        * Get hash table size that executor would use for inner relation.
+        *
+        * XXX for the moment, always assume that skew optimization will be
+        * performed.  As long as SKEW_WORK_MEM_PERCENT is small, it's not worth
+        * trying to determine that for sure.
+        *
+        * XXX at some point it might be interesting to try to account for skew
+        * optimization in the cost estimate, but for now, we don't.
+        */
        ExecChooseHashTableSize(inner_path_rows,
                                                        inner_path->parent->width,
+                                                       true,   /* useskew */
                                                        &numbuckets,
-                                                       &numbatches);
+                                                       &numbatches,
+                                                       &num_skew_mcvs);
        virtualbuckets = (double) numbuckets *(double) numbatches;
 
        /*
index a243ca80d2dcf2ed31f00e412093f206acc04778..be4d79f1bf2ff421260c9417270604ac0700226e 100644 (file)
@@ -10,7 +10,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/optimizer/plan/createplan.c,v 1.255 2009/01/01 17:23:44 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/optimizer/plan/createplan.c,v 1.256 2009/03/21 00:04:39 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -112,7 +112,11 @@ static HashJoin *make_hashjoin(List *tlist,
                          List *hashclauses,
                          Plan *lefttree, Plan *righttree,
                          JoinType jointype);
-static Hash *make_hash(Plan *lefttree);
+static Hash *make_hash(Plan *lefttree,
+                                          Oid skewTable,
+                                          AttrNumber skewColumn,
+                                          Oid skewColType,
+                                          int32 skewColTypmod);
 static MergeJoin *make_mergejoin(List *tlist,
                           List *joinclauses, List *otherclauses,
                           List *mergeclauses,
@@ -1864,6 +1868,10 @@ create_hashjoin_plan(PlannerInfo *root,
        List       *joinclauses;
        List       *otherclauses;
        List       *hashclauses;
+       Oid                     skewTable = InvalidOid;
+       AttrNumber      skewColumn = InvalidAttrNumber;
+       Oid                     skewColType = InvalidOid;
+       int32           skewColTypmod = -1;
        HashJoin   *join_plan;
        Hash       *hash_plan;
 
@@ -1902,10 +1910,47 @@ create_hashjoin_plan(PlannerInfo *root,
        /* We don't want any excess columns in the hashed tuples */
        disuse_physical_tlist(inner_plan, best_path->jpath.innerjoinpath);
 
+       /*
+        * If there is a single join clause and we can identify the outer
+        * variable as a simple column reference, supply its identity for
+        * possible use in skew optimization.  (Note: in principle we could
+        * do skew optimization with multiple join clauses, but we'd have to
+        * be able to determine the most common combinations of outer values,
+        * which we don't currently have enough stats for.)
+        */
+       if (list_length(hashclauses) == 1)
+       {
+               OpExpr     *clause = (OpExpr *) linitial(hashclauses);
+               Node       *node;
+
+               Assert(is_opclause(clause));
+               node = (Node *) linitial(clause->args);
+               if (IsA(node, RelabelType))
+                       node = (Node *) ((RelabelType *) node)->arg;
+               if (IsA(node, Var))
+               {
+                       Var        *var = (Var *) node;
+                       RangeTblEntry *rte;
+
+                       rte = root->simple_rte_array[var->varno];
+                       if (rte->rtekind == RTE_RELATION)
+                       {
+                               skewTable = rte->relid;
+                               skewColumn = var->varattno;
+                               skewColType = var->vartype;
+                               skewColTypmod = var->vartypmod;
+                       }
+               }
+       }
+
        /*
         * Build the hash node and hash join node.
         */
-       hash_plan = make_hash(inner_plan);
+       hash_plan = make_hash(inner_plan,
+                                                 skewTable,
+                                                 skewColumn,
+                                                 skewColType,
+                                                 skewColTypmod);
        join_plan = make_hashjoin(tlist,
                                                          joinclauses,
                                                          otherclauses,
@@ -2713,7 +2758,11 @@ make_hashjoin(List *tlist,
 }
 
 static Hash *
-make_hash(Plan *lefttree)
+make_hash(Plan *lefttree,
+                 Oid skewTable,
+                 AttrNumber skewColumn,
+                 Oid skewColType,
+                 int32 skewColTypmod)
 {
        Hash       *node = makeNode(Hash);
        Plan       *plan = &node->plan;
@@ -2730,6 +2779,11 @@ make_hash(Plan *lefttree)
        plan->lefttree = lefttree;
        plan->righttree = NULL;
 
+       node->skewTable = skewTable;
+       node->skewColumn = skewColumn;
+       node->skewColType = skewColType;
+       node->skewColTypmod = skewColTypmod;
+
        return node;
 }
 
index 40a5244ad472a97a22c6ba7e6ea924aaa7e52920..5b18282a646af5afb9a6038625f18c0d21ea1099 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/executor/hashjoin.h,v 1.49 2009/01/01 17:23:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/executor/hashjoin.h,v 1.50 2009/03/21 00:04:40 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -72,6 +72,36 @@ typedef struct HashJoinTupleData
 #define HJTUPLE_MINTUPLE(hjtup)  \
        ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
 
+/*
+ * If the outer relation's distribution is sufficiently nonuniform, we attempt
+ * to optimize the join by treating the hash values corresponding to the outer
+ * relation's MCVs specially.  Inner relation tuples matching these hash
+ * values go into the "skew" hashtable instead of the main hashtable, and
+ * outer relation tuples with these hash values are matched against that
+ * table instead of the main one.  Thus, tuples with these hash values are
+ * effectively handled as part of the first batch and will never go to disk.
+ * The skew hashtable is limited to SKEW_WORK_MEM_PERCENT of the total memory
+ * allowed for the join; while building the hashtables, we decrease the number
+ * of MCVs being specially treated if needed to stay under this limit.
+ *
+ * Note: you might wonder why we look at the outer relation stats for this,
+ * rather than the inner.  One reason is that the outer relation is typically
+ * bigger, so we get more I/O savings by optimizing for its most common values.
+ * Also, for similarly-sized relations, the planner prefers to put the more
+ * uniformly distributed relation on the inside, so we're more likely to find
+ * interesting skew in the outer relation.
+ */
+typedef struct HashSkewBucket
+{
+       uint32          hashvalue;              /* common hash value */
+       HashJoinTuple tuples;           /* linked list of inner-relation tuples */
+} HashSkewBucket;
+
+#define SKEW_BUCKET_OVERHEAD  MAXALIGN(sizeof(HashSkewBucket))
+#define INVALID_SKEW_BUCKET_NO  (-1)
+#define SKEW_WORK_MEM_PERCENT  2
+#define SKEW_MIN_OUTER_FRACTION  0.01
+
 
 typedef struct HashJoinTableData
 {
@@ -82,6 +112,12 @@ typedef struct HashJoinTableData
        struct HashJoinTupleData **buckets;
        /* buckets array is per-batch storage, as are all the tuples */
 
+       bool            skewEnabled;    /* are we using skew optimization? */
+       HashSkewBucket **skewBucket;    /* hashtable of skew buckets */
+       int                     skewBucketLen;  /* size of skewBucket array (a power of 2!) */
+       int                     nSkewBuckets;   /* number of active skew buckets */
+       int                *skewBucketNums;     /* array indexes of active skew buckets */
+
        int                     nbatch;                 /* number of batches */
        int                     curbatch;               /* current batch #; 0 during 1st pass */
 
@@ -113,6 +149,8 @@ typedef struct HashJoinTableData
 
        Size            spaceUsed;              /* memory space currently used by tuples */
        Size            spaceAllowed;   /* upper limit for space used */
+       Size            spaceUsedSkew;  /* skew hash table's current space usage */
+       Size            spaceAllowedSkew;       /* upper limit for skew hashtable */
 
        MemoryContext hashCxt;          /* context for whole-hash-join storage */
        MemoryContext batchCxt;         /* context for this-batch-only storage */
index ae08880d6d360d5db8c17ef05d1b9ec8e71f04a9..7c8ca568a6d5c7c3b6991b15ce436aabc9b64921 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/executor/nodeHash.h,v 1.46 2009/01/01 17:23:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/executor/nodeHash.h,v 1.47 2009/03/21 00:04:40 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -41,8 +41,10 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 extern HashJoinTuple ExecScanHashBucket(HashJoinState *hjstate,
                                   ExprContext *econtext);
 extern void ExecHashTableReset(HashJoinTable hashtable);
-extern void ExecChooseHashTableSize(double ntuples, int tupwidth,
+extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
                                                int *numbuckets,
-                                               int *numbatches);
+                                               int *numbatches,
+                                               int *num_skew_mcvs);
+extern int     ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
 
 #endif   /* NODEHASH_H */
index 8d87ec19e1d09bb365eb3bb9ea598076d57675e7..996efa8f87dd1925445855f0dacb0d51ff5145b0 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.201 2009/01/12 05:10:45 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.202 2009/03/21 00:04:40 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1374,11 +1374,12 @@ typedef struct MergeJoinState
  *             hj_HashTable                    hash table for the hashjoin
  *                                                             (NULL if table not built yet)
  *             hj_CurHashValue                 hash value for current outer tuple
- *             hj_CurBucketNo                  bucket# for current outer tuple
+ *             hj_CurBucketNo                  regular bucket# for current outer tuple
+ *             hj_CurSkewBucketNo              skew bucket# for current outer tuple
  *             hj_CurTuple                             last inner tuple matched to current outer
  *                                                             tuple, or NULL if starting search
- *                                                             (CurHashValue, CurBucketNo and CurTuple are
- *                                                              undefined if OuterTupleSlot is empty!)
+ *                                                             (hj_CurXXX variables are undefined if
+ *                                                             OuterTupleSlot is empty!)
  *             hj_OuterHashKeys                the outer hash keys in the hashjoin condition
  *             hj_InnerHashKeys                the inner hash keys in the hashjoin condition
  *             hj_HashOperators                the join operators in the hashjoin condition
@@ -1403,6 +1404,7 @@ typedef struct HashJoinState
        HashJoinTable hj_HashTable;
        uint32          hj_CurHashValue;
        int                     hj_CurBucketNo;
+       int                     hj_CurSkewBucketNo;
        HashJoinTuple hj_CurTuple;
        List       *hj_OuterHashKeys;           /* list of ExprState nodes */
        List       *hj_InnerHashKeys;           /* list of ExprState nodes */
index 12742b57e55288014f205e009afe19986501cabe..9caf0895e4e4c8255c0785586c0ab8f55ebdbaad 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/nodes/plannodes.h,v 1.108 2009/01/01 17:24:00 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/nodes/plannodes.h,v 1.109 2009/03/21 00:04:40 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -460,7 +460,7 @@ typedef struct MergeJoin
 } MergeJoin;
 
 /* ----------------
- *             hash join (probe) node
+ *             hash join node
  * ----------------
  */
 typedef struct HashJoin
@@ -567,11 +567,20 @@ typedef struct Unique
 
 /* ----------------
  *             hash build node
+ *
+ * If the executor is supposed to try to apply skew join optimization, then
+ * skewTable/skewColumn identify the outer relation's join key column, from
+ * which the relevant MCV statistics can be fetched.  Also, its type
+ * information is provided to save a lookup.
  * ----------------
  */
 typedef struct Hash
 {
        Plan            plan;
+       Oid                     skewTable;              /* outer join key's table OID, or InvalidOid */
+       AttrNumber      skewColumn;             /* outer join key's column #, or zero */
+       Oid                     skewColType;    /* datatype of the outer key column */
+       int32           skewColTypmod;  /* typmod of the outer key column */
        /* all other info is in the parent HashJoin node */
 } Hash;