]> granicus.if.org Git - postgresql/blob - src/backend/executor/nodeHash.c
Update copyright for 2016
[postgresql] / src / backend / executor / nodeHash.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeHash.c
4  *        Routines to hash relations for hashjoin
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/executor/nodeHash.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              MultiExecHash   - generate an in-memory hash table of the relation
18  *              ExecInitHash    - initialize node and subnodes
19  *              ExecEndHash             - shutdown node and subnodes
20  */
21
22 #include "postgres.h"
23
24 #include <math.h>
25 #include <limits.h>
26
27 #include "access/htup_details.h"
28 #include "catalog/pg_statistic.h"
29 #include "commands/tablespace.h"
30 #include "executor/execdebug.h"
31 #include "executor/hashjoin.h"
32 #include "executor/nodeHash.h"
33 #include "executor/nodeHashjoin.h"
34 #include "miscadmin.h"
35 #include "utils/dynahash.h"
36 #include "utils/memutils.h"
37 #include "utils/lsyscache.h"
38 #include "utils/syscache.h"
39
40
41 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
42 static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
43 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
44                                           int mcvsToUse);
45 static void ExecHashSkewTableInsert(HashJoinTable hashtable,
46                                                 TupleTableSlot *slot,
47                                                 uint32 hashvalue,
48                                                 int bucketNumber);
49 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
50
51 static void *dense_alloc(HashJoinTable hashtable, Size size);
52
53 /* ----------------------------------------------------------------
54  *              ExecHash
55  *
56  *              stub for pro forma compliance
57  * ----------------------------------------------------------------
58  */
59 TupleTableSlot *
60 ExecHash(HashState *node)
61 {
62         elog(ERROR, "Hash node does not support ExecProcNode call convention");
63         return NULL;
64 }
65
66 /* ----------------------------------------------------------------
67  *              MultiExecHash
68  *
69  *              build hash table for hashjoin, doing partitioning if more
70  *              than one batch is required.
71  * ----------------------------------------------------------------
72  */
73 Node *
74 MultiExecHash(HashState *node)
75 {
76         PlanState  *outerNode;
77         List       *hashkeys;
78         HashJoinTable hashtable;
79         TupleTableSlot *slot;
80         ExprContext *econtext;
81         uint32          hashvalue;
82
83         /* must provide our own instrumentation support */
84         if (node->ps.instrument)
85                 InstrStartNode(node->ps.instrument);
86
87         /*
88          * get state info from node
89          */
90         outerNode = outerPlanState(node);
91         hashtable = node->hashtable;
92
93         /*
94          * set expression context
95          */
96         hashkeys = node->hashkeys;
97         econtext = node->ps.ps_ExprContext;
98
99         /*
100          * get all inner tuples and insert into the hash table (or temp files)
101          */
102         for (;;)
103         {
104                 slot = ExecProcNode(outerNode);
105                 if (TupIsNull(slot))
106                         break;
107                 /* We have to compute the hash value */
108                 econtext->ecxt_innertuple = slot;
109                 if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
110                                                                  false, hashtable->keepNulls,
111                                                                  &hashvalue))
112                 {
113                         int                     bucketNumber;
114
115                         bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
116                         if (bucketNumber != INVALID_SKEW_BUCKET_NO)
117                         {
118                                 /* It's a skew tuple, so put it into that hash table */
119                                 ExecHashSkewTableInsert(hashtable, slot, hashvalue,
120                                                                                 bucketNumber);
121                                 hashtable->skewTuples += 1;
122                         }
123                         else
124                         {
125                                 /* Not subject to skew optimization, so insert normally */
126                                 ExecHashTableInsert(hashtable, slot, hashvalue);
127                         }
128                         hashtable->totalTuples += 1;
129                 }
130         }
131
132         /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
133         if (hashtable->nbuckets != hashtable->nbuckets_optimal)
134                 ExecHashIncreaseNumBuckets(hashtable);
135
136         /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
137         hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
138         if (hashtable->spaceUsed > hashtable->spacePeak)
139                 hashtable->spacePeak = hashtable->spaceUsed;
140
141         /* must provide our own instrumentation support */
142         if (node->ps.instrument)
143                 InstrStopNode(node->ps.instrument, hashtable->totalTuples);
144
145         /*
146          * We do not return the hash table directly because it's not a subtype of
147          * Node, and so would violate the MultiExecProcNode API.  Instead, our
148          * parent Hashjoin node is expected to know how to fish it out of our node
149          * state.  Ugly but not really worth cleaning up, since Hashjoin knows
150          * quite a bit more about Hash besides that.
151          */
152         return NULL;
153 }
154
155 /* ----------------------------------------------------------------
156  *              ExecInitHash
157  *
158  *              Init routine for Hash node
159  * ----------------------------------------------------------------
160  */
161 HashState *
162 ExecInitHash(Hash *node, EState *estate, int eflags)
163 {
164         HashState  *hashstate;
165
166         /* check for unsupported flags */
167         Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
168
169         /*
170          * create state structure
171          */
172         hashstate = makeNode(HashState);
173         hashstate->ps.plan = (Plan *) node;
174         hashstate->ps.state = estate;
175         hashstate->hashtable = NULL;
176         hashstate->hashkeys = NIL;      /* will be set by parent HashJoin */
177
178         /*
179          * Miscellaneous initialization
180          *
181          * create expression context for node
182          */
183         ExecAssignExprContext(estate, &hashstate->ps);
184
185         /*
186          * initialize our result slot
187          */
188         ExecInitResultTupleSlot(estate, &hashstate->ps);
189
190         /*
191          * initialize child expressions
192          */
193         hashstate->ps.targetlist = (List *)
194                 ExecInitExpr((Expr *) node->plan.targetlist,
195                                          (PlanState *) hashstate);
196         hashstate->ps.qual = (List *)
197                 ExecInitExpr((Expr *) node->plan.qual,
198                                          (PlanState *) hashstate);
199
200         /*
201          * initialize child nodes
202          */
203         outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
204
205         /*
206          * initialize tuple type. no need to initialize projection info because
207          * this node doesn't do projections
208          */
209         ExecAssignResultTypeFromTL(&hashstate->ps);
210         hashstate->ps.ps_ProjInfo = NULL;
211
212         return hashstate;
213 }
214
215 /* ---------------------------------------------------------------
216  *              ExecEndHash
217  *
218  *              clean up routine for Hash node
219  * ----------------------------------------------------------------
220  */
221 void
222 ExecEndHash(HashState *node)
223 {
224         PlanState  *outerPlan;
225
226         /*
227          * free exprcontext
228          */
229         ExecFreeExprContext(&node->ps);
230
231         /*
232          * shut down the subplan
233          */
234         outerPlan = outerPlanState(node);
235         ExecEndNode(outerPlan);
236 }
237
238
239 /* ----------------------------------------------------------------
240  *              ExecHashTableCreate
241  *
242  *              create an empty hashtable data structure for hashjoin.
243  * ----------------------------------------------------------------
244  */
245 HashJoinTable
246 ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
247 {
248         HashJoinTable hashtable;
249         Plan       *outerNode;
250         int                     nbuckets;
251         int                     nbatch;
252         int                     num_skew_mcvs;
253         int                     log2_nbuckets;
254         int                     nkeys;
255         int                     i;
256         ListCell   *ho;
257         MemoryContext oldcxt;
258
259         /*
260          * Get information about the size of the relation to be hashed (it's the
261          * "outer" subtree of this node, but the inner relation of the hashjoin).
262          * Compute the appropriate size of the hash table.
263          */
264         outerNode = outerPlan(node);
265
266         ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
267                                                         OidIsValid(node->skewTable),
268                                                         &nbuckets, &nbatch, &num_skew_mcvs);
269
270 #ifdef HJDEBUG
271         printf("nbatch = %d, nbuckets = %d\n", nbatch, nbuckets);
272 #endif
273
274         /* nbuckets must be a power of 2 */
275         log2_nbuckets = my_log2(nbuckets);
276         Assert(nbuckets == (1 << log2_nbuckets));
277
278         /*
279          * Initialize the hash table control block.
280          *
281          * The hashtable control block is just palloc'd from the executor's
282          * per-query memory context.
283          */
284         hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
285         hashtable->nbuckets = nbuckets;
286         hashtable->nbuckets_original = nbuckets;
287         hashtable->nbuckets_optimal = nbuckets;
288         hashtable->log2_nbuckets = log2_nbuckets;
289         hashtable->log2_nbuckets_optimal = log2_nbuckets;
290         hashtable->buckets = NULL;
291         hashtable->keepNulls = keepNulls;
292         hashtable->skewEnabled = false;
293         hashtable->skewBucket = NULL;
294         hashtable->skewBucketLen = 0;
295         hashtable->nSkewBuckets = 0;
296         hashtable->skewBucketNums = NULL;
297         hashtable->nbatch = nbatch;
298         hashtable->curbatch = 0;
299         hashtable->nbatch_original = nbatch;
300         hashtable->nbatch_outstart = nbatch;
301         hashtable->growEnabled = true;
302         hashtable->totalTuples = 0;
303         hashtable->skewTuples = 0;
304         hashtable->innerBatchFile = NULL;
305         hashtable->outerBatchFile = NULL;
306         hashtable->spaceUsed = 0;
307         hashtable->spacePeak = 0;
308         hashtable->spaceAllowed = work_mem * 1024L;
309         hashtable->spaceUsedSkew = 0;
310         hashtable->spaceAllowedSkew =
311                 hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
312         hashtable->chunks = NULL;
313
314         /*
315          * Get info about the hash functions to be used for each hash key. Also
316          * remember whether the join operators are strict.
317          */
318         nkeys = list_length(hashOperators);
319         hashtable->outer_hashfunctions =
320                 (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
321         hashtable->inner_hashfunctions =
322                 (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
323         hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
324         i = 0;
325         foreach(ho, hashOperators)
326         {
327                 Oid                     hashop = lfirst_oid(ho);
328                 Oid                     left_hashfn;
329                 Oid                     right_hashfn;
330
331                 if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
332                         elog(ERROR, "could not find hash function for hash operator %u",
333                                  hashop);
334                 fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
335                 fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
336                 hashtable->hashStrict[i] = op_strict(hashop);
337                 i++;
338         }
339
340         /*
341          * Create temporary memory contexts in which to keep the hashtable working
342          * storage.  See notes in executor/hashjoin.h.
343          */
344         hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
345                                                                                            "HashTableContext",
346                                                                                            ALLOCSET_DEFAULT_MINSIZE,
347                                                                                            ALLOCSET_DEFAULT_INITSIZE,
348                                                                                            ALLOCSET_DEFAULT_MAXSIZE);
349
350         hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
351                                                                                                 "HashBatchContext",
352                                                                                                 ALLOCSET_DEFAULT_MINSIZE,
353                                                                                                 ALLOCSET_DEFAULT_INITSIZE,
354                                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
355
356         /* Allocate data that will live for the life of the hashjoin */
357
358         oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
359
360         if (nbatch > 1)
361         {
362                 /*
363                  * allocate and initialize the file arrays in hashCxt
364                  */
365                 hashtable->innerBatchFile = (BufFile **)
366                         palloc0(nbatch * sizeof(BufFile *));
367                 hashtable->outerBatchFile = (BufFile **)
368                         palloc0(nbatch * sizeof(BufFile *));
369                 /* The files will not be opened until needed... */
370                 /* ... but make sure we have temp tablespaces established for them */
371                 PrepareTempTablespaces();
372         }
373
374         /*
375          * Prepare context for the first-scan space allocations; allocate the
376          * hashbucket array therein, and set each bucket "empty".
377          */
378         MemoryContextSwitchTo(hashtable->batchCxt);
379
380         hashtable->buckets = (HashJoinTuple *)
381                 palloc0(nbuckets * sizeof(HashJoinTuple));
382
383         /*
384          * Set up for skew optimization, if possible and there's a need for more
385          * than one batch.  (In a one-batch join, there's no point in it.)
386          */
387         if (nbatch > 1)
388                 ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
389
390         MemoryContextSwitchTo(oldcxt);
391
392         return hashtable;
393 }
394
395
396 /*
397  * Compute appropriate size for hashtable given the estimated size of the
398  * relation to be hashed (number of rows and average row width).
399  *
400  * This is exported so that the planner's costsize.c can use it.
401  */
402
403 /* Target bucket loading (tuples per bucket) */
404 #define NTUP_PER_BUCKET                 1
405
406 void
407 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
408                                                 int *numbuckets,
409                                                 int *numbatches,
410                                                 int *num_skew_mcvs)
411 {
412         int                     tupsize;
413         double          inner_rel_bytes;
414         long            bucket_bytes;
415         long            hash_table_bytes;
416         long            skew_table_bytes;
417         long            max_pointers;
418         long            mppow2;
419         int                     nbatch = 1;
420         int                     nbuckets;
421         double          dbuckets;
422
423         /* Force a plausible relation size if no info */
424         if (ntuples <= 0.0)
425                 ntuples = 1000.0;
426
427         /*
428          * Estimate tupsize based on footprint of tuple in hashtable... note this
429          * does not allow for any palloc overhead.  The manipulations of spaceUsed
430          * don't count palloc overhead either.
431          */
432         tupsize = HJTUPLE_OVERHEAD +
433                 MAXALIGN(SizeofMinimalTupleHeader) +
434                 MAXALIGN(tupwidth);
435         inner_rel_bytes = ntuples * tupsize;
436
437         /*
438          * Target in-memory hashtable size is work_mem kilobytes.
439          */
440         hash_table_bytes = work_mem * 1024L;
441
442         /*
443          * If skew optimization is possible, estimate the number of skew buckets
444          * that will fit in the memory allowed, and decrement the assumed space
445          * available for the main hash table accordingly.
446          *
447          * We make the optimistic assumption that each skew bucket will contain
448          * one inner-relation tuple.  If that turns out to be low, we will recover
449          * at runtime by reducing the number of skew buckets.
450          *
451          * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
452          * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
453          * will round up to the next power of 2 and then multiply by 4 to reduce
454          * collisions.
455          */
456         if (useskew)
457         {
458                 skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
459
460                 /*----------
461                  * Divisor is:
462                  * size of a hash tuple +
463                  * worst-case size of skewBucket[] per MCV +
464                  * size of skewBucketNums[] entry +
465                  * size of skew bucket struct itself
466                  *----------
467                  */
468                 *num_skew_mcvs = skew_table_bytes / (tupsize +
469                                                                                          (8 * sizeof(HashSkewBucket *)) +
470                                                                                          sizeof(int) +
471                                                                                          SKEW_BUCKET_OVERHEAD);
472                 if (*num_skew_mcvs > 0)
473                         hash_table_bytes -= skew_table_bytes;
474         }
475         else
476                 *num_skew_mcvs = 0;
477
478         /*
479          * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
480          * memory is filled, assuming a single batch; but limit the value so that
481          * the pointer arrays we'll try to allocate do not exceed work_mem nor
482          * MaxAllocSize.
483          *
484          * Note that both nbuckets and nbatch must be powers of 2 to make
485          * ExecHashGetBucketAndBatch fast.
486          */
487         max_pointers = (work_mem * 1024L) / sizeof(HashJoinTuple);
488         max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
489         /* If max_pointers isn't a power of 2, must round it down to one */
490         mppow2 = 1L << my_log2(max_pointers);
491         if (max_pointers != mppow2)
492                 max_pointers = mppow2 / 2;
493
494         /* Also ensure we avoid integer overflow in nbatch and nbuckets */
495         /* (this step is redundant given the current value of MaxAllocSize) */
496         max_pointers = Min(max_pointers, INT_MAX / 2);
497
498         dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
499         dbuckets = Min(dbuckets, max_pointers);
500         nbuckets = (int) dbuckets;
501         /* don't let nbuckets be really small, though ... */
502         nbuckets = Max(nbuckets, 1024);
503         /* ... and force it to be a power of 2. */
504         nbuckets = 1 << my_log2(nbuckets);
505
506         /*
507          * If there's not enough space to store the projected number of tuples and
508          * the required bucket headers, we will need multiple batches.
509          */
510         bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
511         if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
512         {
513                 /* We'll need multiple batches */
514                 long            lbuckets;
515                 double          dbatch;
516                 int                     minbatch;
517                 long            bucket_size;
518
519                 /*
520                  * Estimate the number of buckets we'll want to have when work_mem is
521                  * entirely full.  Each bucket will contain a bucket pointer plus
522                  * NTUP_PER_BUCKET tuples, whose projected size already includes
523                  * overhead for the hash code, pointer to the next tuple, etc.
524                  */
525                 bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
526                 lbuckets = 1L << my_log2(hash_table_bytes / bucket_size);
527                 lbuckets = Min(lbuckets, max_pointers);
528                 nbuckets = (int) lbuckets;
529                 nbuckets = 1 << my_log2(nbuckets);
530                 bucket_bytes = nbuckets * sizeof(HashJoinTuple);
531
532                 /*
533                  * Buckets are simple pointers to hashjoin tuples, while tupsize
534                  * includes the pointer, hash code, and MinimalTupleData.  So buckets
535                  * should never really exceed 25% of work_mem (even for
536                  * NTUP_PER_BUCKET=1); except maybe for work_mem values that are not
537                  * 2^N bytes, where we might get more because of doubling. So let's
538                  * look for 50% here.
539                  */
540                 Assert(bucket_bytes <= hash_table_bytes / 2);
541
542                 /* Calculate required number of batches. */
543                 dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
544                 dbatch = Min(dbatch, max_pointers);
545                 minbatch = (int) dbatch;
546                 nbatch = 2;
547                 while (nbatch < minbatch)
548                         nbatch <<= 1;
549         }
550
551         Assert(nbuckets > 0);
552         Assert(nbatch > 0);
553
554         *numbuckets = nbuckets;
555         *numbatches = nbatch;
556 }
557
558
559 /* ----------------------------------------------------------------
560  *              ExecHashTableDestroy
561  *
562  *              destroy a hash table
563  * ----------------------------------------------------------------
564  */
565 void
566 ExecHashTableDestroy(HashJoinTable hashtable)
567 {
568         int                     i;
569
570         /*
571          * Make sure all the temp files are closed.  We skip batch 0, since it
572          * can't have any temp files (and the arrays might not even exist if
573          * nbatch is only 1).
574          */
575         for (i = 1; i < hashtable->nbatch; i++)
576         {
577                 if (hashtable->innerBatchFile[i])
578                         BufFileClose(hashtable->innerBatchFile[i]);
579                 if (hashtable->outerBatchFile[i])
580                         BufFileClose(hashtable->outerBatchFile[i]);
581         }
582
583         /* Release working memory (batchCxt is a child, so it goes away too) */
584         MemoryContextDelete(hashtable->hashCxt);
585
586         /* And drop the control block */
587         pfree(hashtable);
588 }
589
590 /*
591  * ExecHashIncreaseNumBatches
592  *              increase the original number of batches in order to reduce
593  *              current memory consumption
594  */
595 static void
596 ExecHashIncreaseNumBatches(HashJoinTable hashtable)
597 {
598         int                     oldnbatch = hashtable->nbatch;
599         int                     curbatch = hashtable->curbatch;
600         int                     nbatch;
601         MemoryContext oldcxt;
602         long            ninmemory;
603         long            nfreed;
604         HashMemoryChunk oldchunks;
605
606         /* do nothing if we've decided to shut off growth */
607         if (!hashtable->growEnabled)
608                 return;
609
610         /* safety check to avoid overflow */
611         if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
612                 return;
613
614         nbatch = oldnbatch * 2;
615         Assert(nbatch > 1);
616
617 #ifdef HJDEBUG
618         printf("Increasing nbatch to %d because space = %lu\n",
619                    nbatch, (unsigned long) hashtable->spaceUsed);
620 #endif
621
622         oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
623
624         if (hashtable->innerBatchFile == NULL)
625         {
626                 /* we had no file arrays before */
627                 hashtable->innerBatchFile = (BufFile **)
628                         palloc0(nbatch * sizeof(BufFile *));
629                 hashtable->outerBatchFile = (BufFile **)
630                         palloc0(nbatch * sizeof(BufFile *));
631                 /* time to establish the temp tablespaces, too */
632                 PrepareTempTablespaces();
633         }
634         else
635         {
636                 /* enlarge arrays and zero out added entries */
637                 hashtable->innerBatchFile = (BufFile **)
638                         repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *));
639                 hashtable->outerBatchFile = (BufFile **)
640                         repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *));
641                 MemSet(hashtable->innerBatchFile + oldnbatch, 0,
642                            (nbatch - oldnbatch) * sizeof(BufFile *));
643                 MemSet(hashtable->outerBatchFile + oldnbatch, 0,
644                            (nbatch - oldnbatch) * sizeof(BufFile *));
645         }
646
647         MemoryContextSwitchTo(oldcxt);
648
649         hashtable->nbatch = nbatch;
650
651         /*
652          * Scan through the existing hash table entries and dump out any that are
653          * no longer of the current batch.
654          */
655         ninmemory = nfreed = 0;
656
657         /* If know we need to resize nbuckets, we can do it while rebatching. */
658         if (hashtable->nbuckets_optimal != hashtable->nbuckets)
659         {
660                 /* we never decrease the number of buckets */
661                 Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
662
663                 hashtable->nbuckets = hashtable->nbuckets_optimal;
664                 hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
665
666                 hashtable->buckets = repalloc(hashtable->buckets,
667                                                                 sizeof(HashJoinTuple) * hashtable->nbuckets);
668         }
669
670         /*
671          * We will scan through the chunks directly, so that we can reset the
672          * buckets now and not have to keep track which tuples in the buckets have
673          * already been processed. We will free the old chunks as we go.
674          */
675         memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets);
676         oldchunks = hashtable->chunks;
677         hashtable->chunks = NULL;
678
679         /* so, let's scan through the old chunks, and all tuples in each chunk */
680         while (oldchunks != NULL)
681         {
682                 HashMemoryChunk nextchunk = oldchunks->next;
683
684                 /* position within the buffer (up to oldchunks->used) */
685                 size_t          idx = 0;
686
687                 /* process all tuples stored in this chunk (and then free it) */
688                 while (idx < oldchunks->used)
689                 {
690                         HashJoinTuple hashTuple = (HashJoinTuple) (oldchunks->data + idx);
691                         MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
692                         int                     hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
693                         int                     bucketno;
694                         int                     batchno;
695
696                         ninmemory++;
697                         ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
698                                                                           &bucketno, &batchno);
699
700                         if (batchno == curbatch)
701                         {
702                                 /* keep tuple in memory - copy it into the new chunk */
703                                 HashJoinTuple copyTuple;
704
705                                 copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
706                                 memcpy(copyTuple, hashTuple, hashTupleSize);
707
708                                 /* and add it back to the appropriate bucket */
709                                 copyTuple->next = hashtable->buckets[bucketno];
710                                 hashtable->buckets[bucketno] = copyTuple;
711                         }
712                         else
713                         {
714                                 /* dump it out */
715                                 Assert(batchno > curbatch);
716                                 ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
717                                                                           hashTuple->hashvalue,
718                                                                           &hashtable->innerBatchFile[batchno]);
719
720                                 hashtable->spaceUsed -= hashTupleSize;
721                                 nfreed++;
722                         }
723
724                         /* next tuple in this chunk */
725                         idx += MAXALIGN(hashTupleSize);
726                 }
727
728                 /* we're done with this chunk - free it and proceed to the next one */
729                 pfree(oldchunks);
730                 oldchunks = nextchunk;
731         }
732
733 #ifdef HJDEBUG
734         printf("Freed %ld of %ld tuples, space now %lu\n",
735                    nfreed, ninmemory, (unsigned long) hashtable->spaceUsed);
736 #endif
737
738         /*
739          * If we dumped out either all or none of the tuples in the table, disable
740          * further expansion of nbatch.  This situation implies that we have
741          * enough tuples of identical hashvalues to overflow spaceAllowed.
742          * Increasing nbatch will not fix it since there's no way to subdivide the
743          * group any more finely. We have to just gut it out and hope the server
744          * has enough RAM.
745          */
746         if (nfreed == 0 || nfreed == ninmemory)
747         {
748                 hashtable->growEnabled = false;
749 #ifdef HJDEBUG
750                 printf("Disabling further increase of nbatch\n");
751 #endif
752         }
753 }
754
755 /*
756  * ExecHashIncreaseNumBuckets
757  *              increase the original number of buckets in order to reduce
758  *              number of tuples per bucket
759  */
760 static void
761 ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
762 {
763         HashMemoryChunk chunk;
764
765         /* do nothing if not an increase (it's called increase for a reason) */
766         if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
767                 return;
768
769 #ifdef HJDEBUG
770         printf("Increasing nbuckets %d => %d\n",
771                    hashtable->nbuckets, hashtable->nbuckets_optimal);
772 #endif
773
774         hashtable->nbuckets = hashtable->nbuckets_optimal;
775         hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
776
777         Assert(hashtable->nbuckets > 1);
778         Assert(hashtable->nbuckets <= (INT_MAX / 2));
779         Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
780
781         /*
782          * Just reallocate the proper number of buckets - we don't need to walk
783          * through them - we can walk the dense-allocated chunks (just like in
784          * ExecHashIncreaseNumBatches, but without all the copying into new
785          * chunks)
786          */
787         hashtable->buckets =
788                 (HashJoinTuple *) repalloc(hashtable->buckets,
789                                                                 hashtable->nbuckets * sizeof(HashJoinTuple));
790
791         memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple));
792
793         /* scan through all tuples in all chunks to rebuild the hash table */
794         for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next)
795         {
796                 /* process all tuples stored in this chunk */
797                 size_t          idx = 0;
798
799                 while (idx < chunk->used)
800                 {
801                         HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx);
802                         int                     bucketno;
803                         int                     batchno;
804
805                         ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
806                                                                           &bucketno, &batchno);
807
808                         /* add the tuple to the proper bucket */
809                         hashTuple->next = hashtable->buckets[bucketno];
810                         hashtable->buckets[bucketno] = hashTuple;
811
812                         /* advance index past the tuple */
813                         idx += MAXALIGN(HJTUPLE_OVERHEAD +
814                                                         HJTUPLE_MINTUPLE(hashTuple)->t_len);
815                 }
816         }
817
818 #ifdef HJDEBUG
819         printf("Nbuckets increased to %d, average items per bucket %.1f\n",
820                    hashtable->nbuckets, batchTuples / hashtable->nbuckets);
821 #endif
822 }
823
824
825 /*
826  * ExecHashTableInsert
827  *              insert a tuple into the hash table depending on the hash value
828  *              it may just go to a temp file for later batches
829  *
830  * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual
831  * tuple; the minimal case in particular is certain to happen while reloading
832  * tuples from batch files.  We could save some cycles in the regular-tuple
833  * case by not forcing the slot contents into minimal form; not clear if it's
834  * worth the messiness required.
835  */
836 void
837 ExecHashTableInsert(HashJoinTable hashtable,
838                                         TupleTableSlot *slot,
839                                         uint32 hashvalue)
840 {
841         MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
842         int                     bucketno;
843         int                     batchno;
844
845         ExecHashGetBucketAndBatch(hashtable, hashvalue,
846                                                           &bucketno, &batchno);
847
848         /*
849          * decide whether to put the tuple in the hash table or a temp file
850          */
851         if (batchno == hashtable->curbatch)
852         {
853                 /*
854                  * put the tuple in hash table
855                  */
856                 HashJoinTuple hashTuple;
857                 int                     hashTupleSize;
858                 double          ntuples = (hashtable->totalTuples - hashtable->skewTuples);
859
860                 /* Create the HashJoinTuple */
861                 hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
862                 hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
863
864                 hashTuple->hashvalue = hashvalue;
865                 memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
866
867                 /*
868                  * We always reset the tuple-matched flag on insertion.  This is okay
869                  * even when reloading a tuple from a batch file, since the tuple
870                  * could not possibly have been matched to an outer tuple before it
871                  * went into the batch file.
872                  */
873                 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
874
875                 /* Push it onto the front of the bucket's list */
876                 hashTuple->next = hashtable->buckets[bucketno];
877                 hashtable->buckets[bucketno] = hashTuple;
878
879                 /*
880                  * Increase the (optimal) number of buckets if we just exceeded the
881                  * NTUP_PER_BUCKET threshold, but only when there's still a single
882                  * batch.
883                  */
884                 if (hashtable->nbatch == 1 &&
885                         ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
886                 {
887                         /* Guard against integer overflow and alloc size overflow */
888                         if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
889                                 hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
890                         {
891                                 hashtable->nbuckets_optimal *= 2;
892                                 hashtable->log2_nbuckets_optimal += 1;
893                         }
894                 }
895
896                 /* Account for space used, and back off if we've used too much */
897                 hashtable->spaceUsed += hashTupleSize;
898                 if (hashtable->spaceUsed > hashtable->spacePeak)
899                         hashtable->spacePeak = hashtable->spaceUsed;
900                 if (hashtable->spaceUsed +
901                         hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
902                         > hashtable->spaceAllowed)
903                         ExecHashIncreaseNumBatches(hashtable);
904         }
905         else
906         {
907                 /*
908                  * put the tuple into a temp file for later batches
909                  */
910                 Assert(batchno > hashtable->curbatch);
911                 ExecHashJoinSaveTuple(tuple,
912                                                           hashvalue,
913                                                           &hashtable->innerBatchFile[batchno]);
914         }
915 }
916
917 /*
918  * ExecHashGetHashValue
919  *              Compute the hash value for a tuple
920  *
921  * The tuple to be tested must be in either econtext->ecxt_outertuple or
922  * econtext->ecxt_innertuple.  Vars in the hashkeys expressions should have
923  * varno either OUTER_VAR or INNER_VAR.
924  *
925  * A TRUE result means the tuple's hash value has been successfully computed
926  * and stored at *hashvalue.  A FALSE result means the tuple cannot match
927  * because it contains a null attribute, and hence it should be discarded
928  * immediately.  (If keep_nulls is true then FALSE is never returned.)
929  */
930 bool
931 ExecHashGetHashValue(HashJoinTable hashtable,
932                                          ExprContext *econtext,
933                                          List *hashkeys,
934                                          bool outer_tuple,
935                                          bool keep_nulls,
936                                          uint32 *hashvalue)
937 {
938         uint32          hashkey = 0;
939         FmgrInfo   *hashfunctions;
940         ListCell   *hk;
941         int                     i = 0;
942         MemoryContext oldContext;
943
944         /*
945          * We reset the eval context each time to reclaim any memory leaked in the
946          * hashkey expressions.
947          */
948         ResetExprContext(econtext);
949
950         oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
951
952         if (outer_tuple)
953                 hashfunctions = hashtable->outer_hashfunctions;
954         else
955                 hashfunctions = hashtable->inner_hashfunctions;
956
957         foreach(hk, hashkeys)
958         {
959                 ExprState  *keyexpr = (ExprState *) lfirst(hk);
960                 Datum           keyval;
961                 bool            isNull;
962
963                 /* rotate hashkey left 1 bit at each step */
964                 hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
965
966                 /*
967                  * Get the join attribute value of the tuple
968                  */
969                 keyval = ExecEvalExpr(keyexpr, econtext, &isNull, NULL);
970
971                 /*
972                  * If the attribute is NULL, and the join operator is strict, then
973                  * this tuple cannot pass the join qual so we can reject it
974                  * immediately (unless we're scanning the outside of an outer join, in
975                  * which case we must not reject it).  Otherwise we act like the
976                  * hashcode of NULL is zero (this will support operators that act like
977                  * IS NOT DISTINCT, though not any more-random behavior).  We treat
978                  * the hash support function as strict even if the operator is not.
979                  *
980                  * Note: currently, all hashjoinable operators must be strict since
981                  * the hash index AM assumes that.  However, it takes so little extra
982                  * code here to allow non-strict that we may as well do it.
983                  */
984                 if (isNull)
985                 {
986                         if (hashtable->hashStrict[i] && !keep_nulls)
987                         {
988                                 MemoryContextSwitchTo(oldContext);
989                                 return false;   /* cannot match */
990                         }
991                         /* else, leave hashkey unmodified, equivalent to hashcode 0 */
992                 }
993                 else
994                 {
995                         /* Compute the hash function */
996                         uint32          hkey;
997
998                         hkey = DatumGetUInt32(FunctionCall1(&hashfunctions[i], keyval));
999                         hashkey ^= hkey;
1000                 }
1001
1002                 i++;
1003         }
1004
1005         MemoryContextSwitchTo(oldContext);
1006
1007         *hashvalue = hashkey;
1008         return true;
1009 }
1010
1011 /*
1012  * ExecHashGetBucketAndBatch
1013  *              Determine the bucket number and batch number for a hash value
1014  *
1015  * Note: on-the-fly increases of nbatch must not change the bucket number
1016  * for a given hash code (since we don't move tuples to different hash
1017  * chains), and must only cause the batch number to remain the same or
1018  * increase.  Our algorithm is
1019  *              bucketno = hashvalue MOD nbuckets
1020  *              batchno = (hashvalue DIV nbuckets) MOD nbatch
1021  * where nbuckets and nbatch are both expected to be powers of 2, so we can
1022  * do the computations by shifting and masking.  (This assumes that all hash
1023  * functions are good about randomizing all their output bits, else we are
1024  * likely to have very skewed bucket or batch occupancy.)
1025  *
1026  * nbuckets and log2_nbuckets may change while nbatch == 1 because of dynamic
1027  * bucket count growth.  Once we start batching, the value is fixed and does
1028  * not change over the course of the join (making it possible to compute batch
1029  * number the way we do here).
1030  *
1031  * nbatch is always a power of 2; we increase it only by doubling it.  This
1032  * effectively adds one more bit to the top of the batchno.
1033  */
1034 void
1035 ExecHashGetBucketAndBatch(HashJoinTable hashtable,
1036                                                   uint32 hashvalue,
1037                                                   int *bucketno,
1038                                                   int *batchno)
1039 {
1040         uint32          nbuckets = (uint32) hashtable->nbuckets;
1041         uint32          nbatch = (uint32) hashtable->nbatch;
1042
1043         if (nbatch > 1)
1044         {
1045                 /* we can do MOD by masking, DIV by shifting */
1046                 *bucketno = hashvalue & (nbuckets - 1);
1047                 *batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);
1048         }
1049         else
1050         {
1051                 *bucketno = hashvalue & (nbuckets - 1);
1052                 *batchno = 0;
1053         }
1054 }
1055
1056 /*
1057  * ExecScanHashBucket
1058  *              scan a hash bucket for matches to the current outer tuple
1059  *
1060  * The current outer tuple must be stored in econtext->ecxt_outertuple.
1061  *
1062  * On success, the inner tuple is stored into hjstate->hj_CurTuple and
1063  * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
1064  * for the latter.
1065  */
1066 bool
1067 ExecScanHashBucket(HashJoinState *hjstate,
1068                                    ExprContext *econtext)
1069 {
1070         List       *hjclauses = hjstate->hashclauses;
1071         HashJoinTable hashtable = hjstate->hj_HashTable;
1072         HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1073         uint32          hashvalue = hjstate->hj_CurHashValue;
1074
1075         /*
1076          * hj_CurTuple is the address of the tuple last returned from the current
1077          * bucket, or NULL if it's time to start scanning a new bucket.
1078          *
1079          * If the tuple hashed to a skew bucket then scan the skew bucket
1080          * otherwise scan the standard hashtable bucket.
1081          */
1082         if (hashTuple != NULL)
1083                 hashTuple = hashTuple->next;
1084         else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
1085                 hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
1086         else
1087                 hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
1088
1089         while (hashTuple != NULL)
1090         {
1091                 if (hashTuple->hashvalue == hashvalue)
1092                 {
1093                         TupleTableSlot *inntuple;
1094
1095                         /* insert hashtable's tuple into exec slot so ExecQual sees it */
1096                         inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1097                                                                                          hjstate->hj_HashTupleSlot,
1098                                                                                          false);        /* do not pfree */
1099                         econtext->ecxt_innertuple = inntuple;
1100
1101                         /* reset temp memory each time to avoid leaks from qual expr */
1102                         ResetExprContext(econtext);
1103
1104                         if (ExecQual(hjclauses, econtext, false))
1105                         {
1106                                 hjstate->hj_CurTuple = hashTuple;
1107                                 return true;
1108                         }
1109                 }
1110
1111                 hashTuple = hashTuple->next;
1112         }
1113
1114         /*
1115          * no match
1116          */
1117         return false;
1118 }
1119
1120 /*
1121  * ExecPrepHashTableForUnmatched
1122  *              set up for a series of ExecScanHashTableForUnmatched calls
1123  */
1124 void
1125 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
1126 {
1127         /*
1128          * ---------- During this scan we use the HashJoinState fields as follows:
1129          *
1130          * hj_CurBucketNo: next regular bucket to scan hj_CurSkewBucketNo: next
1131          * skew bucket (an index into skewBucketNums) hj_CurTuple: last tuple
1132          * returned, or NULL to start next bucket ----------
1133          */
1134         hjstate->hj_CurBucketNo = 0;
1135         hjstate->hj_CurSkewBucketNo = 0;
1136         hjstate->hj_CurTuple = NULL;
1137 }
1138
1139 /*
1140  * ExecScanHashTableForUnmatched
1141  *              scan the hash table for unmatched inner tuples
1142  *
1143  * On success, the inner tuple is stored into hjstate->hj_CurTuple and
1144  * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
1145  * for the latter.
1146  */
1147 bool
1148 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
1149 {
1150         HashJoinTable hashtable = hjstate->hj_HashTable;
1151         HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1152
1153         for (;;)
1154         {
1155                 /*
1156                  * hj_CurTuple is the address of the tuple last returned from the
1157                  * current bucket, or NULL if it's time to start scanning a new
1158                  * bucket.
1159                  */
1160                 if (hashTuple != NULL)
1161                         hashTuple = hashTuple->next;
1162                 else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
1163                 {
1164                         hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
1165                         hjstate->hj_CurBucketNo++;
1166                 }
1167                 else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
1168                 {
1169                         int                     j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
1170
1171                         hashTuple = hashtable->skewBucket[j]->tuples;
1172                         hjstate->hj_CurSkewBucketNo++;
1173                 }
1174                 else
1175                         break;                          /* finished all buckets */
1176
1177                 while (hashTuple != NULL)
1178                 {
1179                         if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
1180                         {
1181                                 TupleTableSlot *inntuple;
1182
1183                                 /* insert hashtable's tuple into exec slot */
1184                                 inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1185                                                                                                  hjstate->hj_HashTupleSlot,
1186                                                                                                  false);                /* do not pfree */
1187                                 econtext->ecxt_innertuple = inntuple;
1188
1189                                 /*
1190                                  * Reset temp memory each time; although this function doesn't
1191                                  * do any qual eval, the caller will, so let's keep it
1192                                  * parallel to ExecScanHashBucket.
1193                                  */
1194                                 ResetExprContext(econtext);
1195
1196                                 hjstate->hj_CurTuple = hashTuple;
1197                                 return true;
1198                         }
1199
1200                         hashTuple = hashTuple->next;
1201                 }
1202         }
1203
1204         /*
1205          * no more unmatched tuples
1206          */
1207         return false;
1208 }
1209
1210 /*
1211  * ExecHashTableReset
1212  *
1213  *              reset hash table header for new batch
1214  */
1215 void
1216 ExecHashTableReset(HashJoinTable hashtable)
1217 {
1218         MemoryContext oldcxt;
1219         int                     nbuckets = hashtable->nbuckets;
1220
1221         /*
1222          * Release all the hash buckets and tuples acquired in the prior pass, and
1223          * reinitialize the context for a new pass.
1224          */
1225         MemoryContextReset(hashtable->batchCxt);
1226         oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
1227
1228         /* Reallocate and reinitialize the hash bucket headers. */
1229         hashtable->buckets = (HashJoinTuple *)
1230                 palloc0(nbuckets * sizeof(HashJoinTuple));
1231
1232         hashtable->spaceUsed = 0;
1233
1234         MemoryContextSwitchTo(oldcxt);
1235
1236         /* Forget the chunks (the memory was freed by the context reset above). */
1237         hashtable->chunks = NULL;
1238 }
1239
1240 /*
1241  * ExecHashTableResetMatchFlags
1242  *              Clear all the HeapTupleHeaderHasMatch flags in the table
1243  */
1244 void
1245 ExecHashTableResetMatchFlags(HashJoinTable hashtable)
1246 {
1247         HashJoinTuple tuple;
1248         int                     i;
1249
1250         /* Reset all flags in the main table ... */
1251         for (i = 0; i < hashtable->nbuckets; i++)
1252         {
1253                 for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next)
1254                         HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
1255         }
1256
1257         /* ... and the same for the skew buckets, if any */
1258         for (i = 0; i < hashtable->nSkewBuckets; i++)
1259         {
1260                 int                     j = hashtable->skewBucketNums[i];
1261                 HashSkewBucket *skewBucket = hashtable->skewBucket[j];
1262
1263                 for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next)
1264                         HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
1265         }
1266 }
1267
1268
1269 void
1270 ExecReScanHash(HashState *node)
1271 {
1272         /*
1273          * if chgParam of subnode is not null then plan will be re-scanned by
1274          * first ExecProcNode.
1275          */
1276         if (node->ps.lefttree->chgParam == NULL)
1277                 ExecReScan(node->ps.lefttree);
1278 }
1279
1280
1281 /*
1282  * ExecHashBuildSkewHash
1283  *
1284  *              Set up for skew optimization if we can identify the most common values
1285  *              (MCVs) of the outer relation's join key.  We make a skew hash bucket
1286  *              for the hash value of each MCV, up to the number of slots allowed
1287  *              based on available memory.
1288  */
1289 static void
1290 ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
1291 {
1292         HeapTupleData *statsTuple;
1293         Datum      *values;
1294         int                     nvalues;
1295         float4     *numbers;
1296         int                     nnumbers;
1297
1298         /* Do nothing if planner didn't identify the outer relation's join key */
1299         if (!OidIsValid(node->skewTable))
1300                 return;
1301         /* Also, do nothing if we don't have room for at least one skew bucket */
1302         if (mcvsToUse <= 0)
1303                 return;
1304
1305         /*
1306          * Try to find the MCV statistics for the outer relation's join key.
1307          */
1308         statsTuple = SearchSysCache3(STATRELATTINH,
1309                                                                  ObjectIdGetDatum(node->skewTable),
1310                                                                  Int16GetDatum(node->skewColumn),
1311                                                                  BoolGetDatum(node->skewInherit));
1312         if (!HeapTupleIsValid(statsTuple))
1313                 return;
1314
1315         if (get_attstatsslot(statsTuple, node->skewColType, node->skewColTypmod,
1316                                                  STATISTIC_KIND_MCV, InvalidOid,
1317                                                  NULL,
1318                                                  &values, &nvalues,
1319                                                  &numbers, &nnumbers))
1320         {
1321                 double          frac;
1322                 int                     nbuckets;
1323                 FmgrInfo   *hashfunctions;
1324                 int                     i;
1325
1326                 if (mcvsToUse > nvalues)
1327                         mcvsToUse = nvalues;
1328
1329                 /*
1330                  * Calculate the expected fraction of outer relation that will
1331                  * participate in the skew optimization.  If this isn't at least
1332                  * SKEW_MIN_OUTER_FRACTION, don't use skew optimization.
1333                  */
1334                 frac = 0;
1335                 for (i = 0; i < mcvsToUse; i++)
1336                         frac += numbers[i];
1337                 if (frac < SKEW_MIN_OUTER_FRACTION)
1338                 {
1339                         free_attstatsslot(node->skewColType,
1340                                                           values, nvalues, numbers, nnumbers);
1341                         ReleaseSysCache(statsTuple);
1342                         return;
1343                 }
1344
1345                 /*
1346                  * Okay, set up the skew hashtable.
1347                  *
1348                  * skewBucket[] is an open addressing hashtable with a power of 2 size
1349                  * that is greater than the number of MCV values.  (This ensures there
1350                  * will be at least one null entry, so searches will always
1351                  * terminate.)
1352                  *
1353                  * Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or
1354                  * MaxAllocSize/sizeof(void *)/8, but that is not currently possible
1355                  * since we limit pg_statistic entries to much less than that.
1356                  */
1357                 nbuckets = 2;
1358                 while (nbuckets <= mcvsToUse)
1359                         nbuckets <<= 1;
1360                 /* use two more bits just to help avoid collisions */
1361                 nbuckets <<= 2;
1362
1363                 hashtable->skewEnabled = true;
1364                 hashtable->skewBucketLen = nbuckets;
1365
1366                 /*
1367                  * We allocate the bucket memory in the hashtable's batch context. It
1368                  * is only needed during the first batch, and this ensures it will be
1369                  * automatically removed once the first batch is done.
1370                  */
1371                 hashtable->skewBucket = (HashSkewBucket **)
1372                         MemoryContextAllocZero(hashtable->batchCxt,
1373                                                                    nbuckets * sizeof(HashSkewBucket *));
1374                 hashtable->skewBucketNums = (int *)
1375                         MemoryContextAllocZero(hashtable->batchCxt,
1376                                                                    mcvsToUse * sizeof(int));
1377
1378                 hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *)
1379                         + mcvsToUse * sizeof(int);
1380                 hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
1381                         + mcvsToUse * sizeof(int);
1382                 if (hashtable->spaceUsed > hashtable->spacePeak)
1383                         hashtable->spacePeak = hashtable->spaceUsed;
1384
1385                 /*
1386                  * Create a skew bucket for each MCV hash value.
1387                  *
1388                  * Note: it is very important that we create the buckets in order of
1389                  * decreasing MCV frequency.  If we have to remove some buckets, they
1390                  * must be removed in reverse order of creation (see notes in
1391                  * ExecHashRemoveNextSkewBucket) and we want the least common MCVs to
1392                  * be removed first.
1393                  */
1394                 hashfunctions = hashtable->outer_hashfunctions;
1395
1396                 for (i = 0; i < mcvsToUse; i++)
1397                 {
1398                         uint32          hashvalue;
1399                         int                     bucket;
1400
1401                         hashvalue = DatumGetUInt32(FunctionCall1(&hashfunctions[0],
1402                                                                                                          values[i]));
1403
1404                         /*
1405                          * While we have not hit a hole in the hashtable and have not hit
1406                          * the desired bucket, we have collided with some previous hash
1407                          * value, so try the next bucket location.  NB: this code must
1408                          * match ExecHashGetSkewBucket.
1409                          */
1410                         bucket = hashvalue & (nbuckets - 1);
1411                         while (hashtable->skewBucket[bucket] != NULL &&
1412                                    hashtable->skewBucket[bucket]->hashvalue != hashvalue)
1413                                 bucket = (bucket + 1) & (nbuckets - 1);
1414
1415                         /*
1416                          * If we found an existing bucket with the same hashvalue, leave
1417                          * it alone.  It's okay for two MCVs to share a hashvalue.
1418                          */
1419                         if (hashtable->skewBucket[bucket] != NULL)
1420                                 continue;
1421
1422                         /* Okay, create a new skew bucket for this hashvalue. */
1423                         hashtable->skewBucket[bucket] = (HashSkewBucket *)
1424                                 MemoryContextAlloc(hashtable->batchCxt,
1425                                                                    sizeof(HashSkewBucket));
1426                         hashtable->skewBucket[bucket]->hashvalue = hashvalue;
1427                         hashtable->skewBucket[bucket]->tuples = NULL;
1428                         hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
1429                         hashtable->nSkewBuckets++;
1430                         hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
1431                         hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
1432                         if (hashtable->spaceUsed > hashtable->spacePeak)
1433                                 hashtable->spacePeak = hashtable->spaceUsed;
1434                 }
1435
1436                 free_attstatsslot(node->skewColType,
1437                                                   values, nvalues, numbers, nnumbers);
1438         }
1439
1440         ReleaseSysCache(statsTuple);
1441 }
1442
1443 /*
1444  * ExecHashGetSkewBucket
1445  *
1446  *              Returns the index of the skew bucket for this hashvalue,
1447  *              or INVALID_SKEW_BUCKET_NO if the hashvalue is not
1448  *              associated with any active skew bucket.
1449  */
1450 int
1451 ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
1452 {
1453         int                     bucket;
1454
1455         /*
1456          * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
1457          * particular, this happens after the initial batch is done).
1458          */
1459         if (!hashtable->skewEnabled)
1460                 return INVALID_SKEW_BUCKET_NO;
1461
1462         /*
1463          * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
1464          */
1465         bucket = hashvalue & (hashtable->skewBucketLen - 1);
1466
1467         /*
1468          * While we have not hit a hole in the hashtable and have not hit the
1469          * desired bucket, we have collided with some other hash value, so try the
1470          * next bucket location.
1471          */
1472         while (hashtable->skewBucket[bucket] != NULL &&
1473                    hashtable->skewBucket[bucket]->hashvalue != hashvalue)
1474                 bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
1475
1476         /*
1477          * Found the desired bucket?
1478          */
1479         if (hashtable->skewBucket[bucket] != NULL)
1480                 return bucket;
1481
1482         /*
1483          * There must not be any hashtable entry for this hash value.
1484          */
1485         return INVALID_SKEW_BUCKET_NO;
1486 }
1487
1488 /*
1489  * ExecHashSkewTableInsert
1490  *
1491  *              Insert a tuple into the skew hashtable.
1492  *
1493  * This should generally match up with the current-batch case in
1494  * ExecHashTableInsert.
1495  */
1496 static void
1497 ExecHashSkewTableInsert(HashJoinTable hashtable,
1498                                                 TupleTableSlot *slot,
1499                                                 uint32 hashvalue,
1500                                                 int bucketNumber)
1501 {
1502         MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
1503         HashJoinTuple hashTuple;
1504         int                     hashTupleSize;
1505
1506         /* Create the HashJoinTuple */
1507         hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1508         hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
1509                                                                                                    hashTupleSize);
1510         hashTuple->hashvalue = hashvalue;
1511         memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1512         HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
1513
1514         /* Push it onto the front of the skew bucket's list */
1515         hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples;
1516         hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
1517
1518         /* Account for space used, and back off if we've used too much */
1519         hashtable->spaceUsed += hashTupleSize;
1520         hashtable->spaceUsedSkew += hashTupleSize;
1521         if (hashtable->spaceUsed > hashtable->spacePeak)
1522                 hashtable->spacePeak = hashtable->spaceUsed;
1523         while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
1524                 ExecHashRemoveNextSkewBucket(hashtable);
1525
1526         /* Check we are not over the total spaceAllowed, either */
1527         if (hashtable->spaceUsed > hashtable->spaceAllowed)
1528                 ExecHashIncreaseNumBatches(hashtable);
1529 }
1530
1531 /*
1532  *              ExecHashRemoveNextSkewBucket
1533  *
1534  *              Remove the least valuable skew bucket by pushing its tuples into
1535  *              the main hash table.
1536  */
1537 static void
1538 ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
1539 {
1540         int                     bucketToRemove;
1541         HashSkewBucket *bucket;
1542         uint32          hashvalue;
1543         int                     bucketno;
1544         int                     batchno;
1545         HashJoinTuple hashTuple;
1546
1547         /* Locate the bucket to remove */
1548         bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1];
1549         bucket = hashtable->skewBucket[bucketToRemove];
1550
1551         /*
1552          * Calculate which bucket and batch the tuples belong to in the main
1553          * hashtable.  They all have the same hash value, so it's the same for all
1554          * of them.  Also note that it's not possible for nbatch to increase while
1555          * we are processing the tuples.
1556          */
1557         hashvalue = bucket->hashvalue;
1558         ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1559
1560         /* Process all tuples in the bucket */
1561         hashTuple = bucket->tuples;
1562         while (hashTuple != NULL)
1563         {
1564                 HashJoinTuple nextHashTuple = hashTuple->next;
1565                 MinimalTuple tuple;
1566                 Size            tupleSize;
1567
1568                 /*
1569                  * This code must agree with ExecHashTableInsert.  We do not use
1570                  * ExecHashTableInsert directly as ExecHashTableInsert expects a
1571                  * TupleTableSlot while we already have HashJoinTuples.
1572                  */
1573                 tuple = HJTUPLE_MINTUPLE(hashTuple);
1574                 tupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1575
1576                 /* Decide whether to put the tuple in the hash table or a temp file */
1577                 if (batchno == hashtable->curbatch)
1578                 {
1579                         /* Move the tuple to the main hash table */
1580                         hashTuple->next = hashtable->buckets[bucketno];
1581                         hashtable->buckets[bucketno] = hashTuple;
1582                         /* We have reduced skew space, but overall space doesn't change */
1583                         hashtable->spaceUsedSkew -= tupleSize;
1584                 }
1585                 else
1586                 {
1587                         /* Put the tuple into a temp file for later batches */
1588                         Assert(batchno > hashtable->curbatch);
1589                         ExecHashJoinSaveTuple(tuple, hashvalue,
1590                                                                   &hashtable->innerBatchFile[batchno]);
1591                         pfree(hashTuple);
1592                         hashtable->spaceUsed -= tupleSize;
1593                         hashtable->spaceUsedSkew -= tupleSize;
1594                 }
1595
1596                 hashTuple = nextHashTuple;
1597         }
1598
1599         /*
1600          * Free the bucket struct itself and reset the hashtable entry to NULL.
1601          *
1602          * NOTE: this is not nearly as simple as it looks on the surface, because
1603          * of the possibility of collisions in the hashtable.  Suppose that hash
1604          * values A and B collide at a particular hashtable entry, and that A was
1605          * entered first so B gets shifted to a different table entry.  If we were
1606          * to remove A first then ExecHashGetSkewBucket would mistakenly start
1607          * reporting that B is not in the hashtable, because it would hit the NULL
1608          * before finding B.  However, we always remove entries in the reverse
1609          * order of creation, so this failure cannot happen.
1610          */
1611         hashtable->skewBucket[bucketToRemove] = NULL;
1612         hashtable->nSkewBuckets--;
1613         pfree(bucket);
1614         hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD;
1615         hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD;
1616
1617         /*
1618          * If we have removed all skew buckets then give up on skew optimization.
1619          * Release the arrays since they aren't useful any more.
1620          */
1621         if (hashtable->nSkewBuckets == 0)
1622         {
1623                 hashtable->skewEnabled = false;
1624                 pfree(hashtable->skewBucket);
1625                 pfree(hashtable->skewBucketNums);
1626                 hashtable->skewBucket = NULL;
1627                 hashtable->skewBucketNums = NULL;
1628                 hashtable->spaceUsed -= hashtable->spaceUsedSkew;
1629                 hashtable->spaceUsedSkew = 0;
1630         }
1631 }
1632
1633 /*
1634  * Allocate 'size' bytes from the currently active HashMemoryChunk
1635  */
1636 static void *
1637 dense_alloc(HashJoinTable hashtable, Size size)
1638 {
1639         HashMemoryChunk newChunk;
1640         char       *ptr;
1641
1642         /* just in case the size is not already aligned properly */
1643         size = MAXALIGN(size);
1644
1645         /*
1646          * If tuple size is larger than of 1/4 of chunk size, allocate a separate
1647          * chunk.
1648          */
1649         if (size > HASH_CHUNK_THRESHOLD)
1650         {
1651                 /* allocate new chunk and put it at the beginning of the list */
1652                 newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
1653                                                                  offsetof(HashMemoryChunkData, data) + size);
1654                 newChunk->maxlen = size;
1655                 newChunk->used = 0;
1656                 newChunk->ntuples = 0;
1657
1658                 /*
1659                  * Add this chunk to the list after the first existing chunk, so that
1660                  * we don't lose the remaining space in the "current" chunk.
1661                  */
1662                 if (hashtable->chunks != NULL)
1663                 {
1664                         newChunk->next = hashtable->chunks->next;
1665                         hashtable->chunks->next = newChunk;
1666                 }
1667                 else
1668                 {
1669                         newChunk->next = hashtable->chunks;
1670                         hashtable->chunks = newChunk;
1671                 }
1672
1673                 newChunk->used += size;
1674                 newChunk->ntuples += 1;
1675
1676                 return newChunk->data;
1677         }
1678
1679         /*
1680          * See if we have enough space for it in the current chunk (if any). If
1681          * not, allocate a fresh chunk.
1682          */
1683         if ((hashtable->chunks == NULL) ||
1684                 (hashtable->chunks->maxlen - hashtable->chunks->used) < size)
1685         {
1686                 /* allocate new chunk and put it at the beginning of the list */
1687                 newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
1688                                           offsetof(HashMemoryChunkData, data) + HASH_CHUNK_SIZE);
1689
1690                 newChunk->maxlen = HASH_CHUNK_SIZE;
1691                 newChunk->used = size;
1692                 newChunk->ntuples = 1;
1693
1694                 newChunk->next = hashtable->chunks;
1695                 hashtable->chunks = newChunk;
1696
1697                 return newChunk->data;
1698         }
1699
1700         /* There is enough space in the current chunk, let's add the tuple */
1701         ptr = hashtable->chunks->data + hashtable->chunks->used;
1702         hashtable->chunks->used += size;
1703         hashtable->chunks->ntuples += 1;
1704
1705         /* return pointer to the start of the tuple memory */
1706         return ptr;
1707 }