]> granicus.if.org Git - postgresql/blob - src/backend/executor/nodeHash.c
Fix pointer type in size passed to memset.
[postgresql] / src / backend / executor / nodeHash.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeHash.c
4  *        Routines to hash relations for hashjoin
5  *
6  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/executor/nodeHash.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              MultiExecHash   - generate an in-memory hash table of the relation
18  *              ExecInitHash    - initialize node and subnodes
19  *              ExecEndHash             - shutdown node and subnodes
20  */
21
22 #include "postgres.h"
23
24 #include <math.h>
25 #include <limits.h>
26
27 #include "access/htup_details.h"
28 #include "catalog/pg_statistic.h"
29 #include "commands/tablespace.h"
30 #include "executor/execdebug.h"
31 #include "executor/hashjoin.h"
32 #include "executor/nodeHash.h"
33 #include "executor/nodeHashjoin.h"
34 #include "miscadmin.h"
35 #include "utils/dynahash.h"
36 #include "utils/memutils.h"
37 #include "utils/lsyscache.h"
38 #include "utils/syscache.h"
39
40
41 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
42 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
43                                           int mcvsToUse);
44 static void ExecHashSkewTableInsert(HashJoinTable hashtable,
45                                                 TupleTableSlot *slot,
46                                                 uint32 hashvalue,
47                                                 int bucketNumber);
48 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
49
50 static void *dense_alloc(HashJoinTable hashtable, Size size);
51
52 /* ----------------------------------------------------------------
53  *              ExecHash
54  *
55  *              stub for pro forma compliance
56  * ----------------------------------------------------------------
57  */
58 TupleTableSlot *
59 ExecHash(HashState *node)
60 {
61         elog(ERROR, "Hash node does not support ExecProcNode call convention");
62         return NULL;
63 }
64
65 /* ----------------------------------------------------------------
66  *              MultiExecHash
67  *
68  *              build hash table for hashjoin, doing partitioning if more
69  *              than one batch is required.
70  * ----------------------------------------------------------------
71  */
72 Node *
73 MultiExecHash(HashState *node)
74 {
75         PlanState  *outerNode;
76         List       *hashkeys;
77         HashJoinTable hashtable;
78         TupleTableSlot *slot;
79         ExprContext *econtext;
80         uint32          hashvalue;
81
82         /* must provide our own instrumentation support */
83         if (node->ps.instrument)
84                 InstrStartNode(node->ps.instrument);
85
86         /*
87          * get state info from node
88          */
89         outerNode = outerPlanState(node);
90         hashtable = node->hashtable;
91
92         /*
93          * set expression context
94          */
95         hashkeys = node->hashkeys;
96         econtext = node->ps.ps_ExprContext;
97
98         /*
99          * get all inner tuples and insert into the hash table (or temp files)
100          */
101         for (;;)
102         {
103                 slot = ExecProcNode(outerNode);
104                 if (TupIsNull(slot))
105                         break;
106                 /* We have to compute the hash value */
107                 econtext->ecxt_innertuple = slot;
108                 if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
109                                                                  false, hashtable->keepNulls,
110                                                                  &hashvalue))
111                 {
112                         int                     bucketNumber;
113
114                         bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
115                         if (bucketNumber != INVALID_SKEW_BUCKET_NO)
116                         {
117                                 /* It's a skew tuple, so put it into that hash table */
118                                 ExecHashSkewTableInsert(hashtable, slot, hashvalue,
119                                                                                 bucketNumber);
120                         }
121                         else
122                         {
123                                 /* Not subject to skew optimization, so insert normally */
124                                 ExecHashTableInsert(hashtable, slot, hashvalue);
125                         }
126                         hashtable->totalTuples += 1;
127                 }
128         }
129
130         /* must provide our own instrumentation support */
131         if (node->ps.instrument)
132                 InstrStopNode(node->ps.instrument, hashtable->totalTuples);
133
134         /*
135          * We do not return the hash table directly because it's not a subtype of
136          * Node, and so would violate the MultiExecProcNode API.  Instead, our
137          * parent Hashjoin node is expected to know how to fish it out of our node
138          * state.  Ugly but not really worth cleaning up, since Hashjoin knows
139          * quite a bit more about Hash besides that.
140          */
141         return NULL;
142 }
143
144 /* ----------------------------------------------------------------
145  *              ExecInitHash
146  *
147  *              Init routine for Hash node
148  * ----------------------------------------------------------------
149  */
150 HashState *
151 ExecInitHash(Hash *node, EState *estate, int eflags)
152 {
153         HashState  *hashstate;
154
155         /* check for unsupported flags */
156         Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
157
158         /*
159          * create state structure
160          */
161         hashstate = makeNode(HashState);
162         hashstate->ps.plan = (Plan *) node;
163         hashstate->ps.state = estate;
164         hashstate->hashtable = NULL;
165         hashstate->hashkeys = NIL;      /* will be set by parent HashJoin */
166
167         /*
168          * Miscellaneous initialization
169          *
170          * create expression context for node
171          */
172         ExecAssignExprContext(estate, &hashstate->ps);
173
174         /*
175          * initialize our result slot
176          */
177         ExecInitResultTupleSlot(estate, &hashstate->ps);
178
179         /*
180          * initialize child expressions
181          */
182         hashstate->ps.targetlist = (List *)
183                 ExecInitExpr((Expr *) node->plan.targetlist,
184                                          (PlanState *) hashstate);
185         hashstate->ps.qual = (List *)
186                 ExecInitExpr((Expr *) node->plan.qual,
187                                          (PlanState *) hashstate);
188
189         /*
190          * initialize child nodes
191          */
192         outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
193
194         /*
195          * initialize tuple type. no need to initialize projection info because
196          * this node doesn't do projections
197          */
198         ExecAssignResultTypeFromTL(&hashstate->ps);
199         hashstate->ps.ps_ProjInfo = NULL;
200
201         return hashstate;
202 }
203
204 /* ---------------------------------------------------------------
205  *              ExecEndHash
206  *
207  *              clean up routine for Hash node
208  * ----------------------------------------------------------------
209  */
210 void
211 ExecEndHash(HashState *node)
212 {
213         PlanState  *outerPlan;
214
215         /*
216          * free exprcontext
217          */
218         ExecFreeExprContext(&node->ps);
219
220         /*
221          * shut down the subplan
222          */
223         outerPlan = outerPlanState(node);
224         ExecEndNode(outerPlan);
225 }
226
227
228 /* ----------------------------------------------------------------
229  *              ExecHashTableCreate
230  *
231  *              create an empty hashtable data structure for hashjoin.
232  * ----------------------------------------------------------------
233  */
234 HashJoinTable
235 ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
236 {
237         HashJoinTable hashtable;
238         Plan       *outerNode;
239         int                     nbuckets;
240         int                     nbatch;
241         int                     num_skew_mcvs;
242         int                     log2_nbuckets;
243         int                     nkeys;
244         int                     i;
245         ListCell   *ho;
246         MemoryContext oldcxt;
247
248         /*
249          * Get information about the size of the relation to be hashed (it's the
250          * "outer" subtree of this node, but the inner relation of the hashjoin).
251          * Compute the appropriate size of the hash table.
252          */
253         outerNode = outerPlan(node);
254
255         ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
256                                                         OidIsValid(node->skewTable),
257                                                         &nbuckets, &nbatch, &num_skew_mcvs);
258
259 #ifdef HJDEBUG
260         printf("nbatch = %d, nbuckets = %d\n", nbatch, nbuckets);
261 #endif
262
263         /* nbuckets must be a power of 2 */
264         log2_nbuckets = my_log2(nbuckets);
265         Assert(nbuckets == (1 << log2_nbuckets));
266
267         /*
268          * Initialize the hash table control block.
269          *
270          * The hashtable control block is just palloc'd from the executor's
271          * per-query memory context.
272          */
273         hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
274         hashtable->nbuckets = nbuckets;
275         hashtable->log2_nbuckets = log2_nbuckets;
276         hashtable->buckets = NULL;
277         hashtable->keepNulls = keepNulls;
278         hashtable->skewEnabled = false;
279         hashtable->skewBucket = NULL;
280         hashtable->skewBucketLen = 0;
281         hashtable->nSkewBuckets = 0;
282         hashtable->skewBucketNums = NULL;
283         hashtable->nbatch = nbatch;
284         hashtable->curbatch = 0;
285         hashtable->nbatch_original = nbatch;
286         hashtable->nbatch_outstart = nbatch;
287         hashtable->growEnabled = true;
288         hashtable->totalTuples = 0;
289         hashtable->innerBatchFile = NULL;
290         hashtable->outerBatchFile = NULL;
291         hashtable->spaceUsed = 0;
292         hashtable->spacePeak = 0;
293         hashtable->spaceAllowed = work_mem * 1024L;
294         hashtable->spaceUsedSkew = 0;
295         hashtable->spaceAllowedSkew =
296                 hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
297         hashtable->chunks = NULL;
298
299         /*
300          * Get info about the hash functions to be used for each hash key. Also
301          * remember whether the join operators are strict.
302          */
303         nkeys = list_length(hashOperators);
304         hashtable->outer_hashfunctions =
305                 (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
306         hashtable->inner_hashfunctions =
307                 (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
308         hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
309         i = 0;
310         foreach(ho, hashOperators)
311         {
312                 Oid                     hashop = lfirst_oid(ho);
313                 Oid                     left_hashfn;
314                 Oid                     right_hashfn;
315
316                 if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
317                         elog(ERROR, "could not find hash function for hash operator %u",
318                                  hashop);
319                 fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
320                 fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
321                 hashtable->hashStrict[i] = op_strict(hashop);
322                 i++;
323         }
324
325         /*
326          * Create temporary memory contexts in which to keep the hashtable working
327          * storage.  See notes in executor/hashjoin.h.
328          */
329         hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
330                                                                                            "HashTableContext",
331                                                                                            ALLOCSET_DEFAULT_MINSIZE,
332                                                                                            ALLOCSET_DEFAULT_INITSIZE,
333                                                                                            ALLOCSET_DEFAULT_MAXSIZE);
334
335         hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
336                                                                                                 "HashBatchContext",
337                                                                                                 ALLOCSET_DEFAULT_MINSIZE,
338                                                                                                 ALLOCSET_DEFAULT_INITSIZE,
339                                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
340
341         /* Allocate data that will live for the life of the hashjoin */
342
343         oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
344
345         if (nbatch > 1)
346         {
347                 /*
348                  * allocate and initialize the file arrays in hashCxt
349                  */
350                 hashtable->innerBatchFile = (BufFile **)
351                         palloc0(nbatch * sizeof(BufFile *));
352                 hashtable->outerBatchFile = (BufFile **)
353                         palloc0(nbatch * sizeof(BufFile *));
354                 /* The files will not be opened until needed... */
355                 /* ... but make sure we have temp tablespaces established for them */
356                 PrepareTempTablespaces();
357         }
358
359         /*
360          * Prepare context for the first-scan space allocations; allocate the
361          * hashbucket array therein, and set each bucket "empty".
362          */
363         MemoryContextSwitchTo(hashtable->batchCxt);
364
365         hashtable->buckets = (HashJoinTuple *)
366                 palloc0(nbuckets * sizeof(HashJoinTuple));
367
368         /*
369          * Set up for skew optimization, if possible and there's a need for more
370          * than one batch.  (In a one-batch join, there's no point in it.)
371          */
372         if (nbatch > 1)
373                 ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
374
375         MemoryContextSwitchTo(oldcxt);
376
377         return hashtable;
378 }
379
380
381 /*
382  * Compute appropriate size for hashtable given the estimated size of the
383  * relation to be hashed (number of rows and average row width).
384  *
385  * This is exported so that the planner's costsize.c can use it.
386  */
387
388 /* Target bucket loading (tuples per bucket) */
389 #define NTUP_PER_BUCKET                 1
390
391 void
392 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
393                                                 int *numbuckets,
394                                                 int *numbatches,
395                                                 int *num_skew_mcvs)
396 {
397         int                     tupsize;
398         double          inner_rel_bytes;
399         long            bucket_bytes;
400         long            hash_table_bytes;
401         long            skew_table_bytes;
402         long            max_pointers;
403         int                     nbatch = 1;
404         int                     nbuckets;
405         double          dbuckets;
406
407         /* Force a plausible relation size if no info */
408         if (ntuples <= 0.0)
409                 ntuples = 1000.0;
410
411         /*
412          * Estimate tupsize based on footprint of tuple in hashtable... note this
413          * does not allow for any palloc overhead.  The manipulations of spaceUsed
414          * don't count palloc overhead either.
415          */
416         tupsize = HJTUPLE_OVERHEAD +
417                 MAXALIGN(sizeof(MinimalTupleData)) +
418                 MAXALIGN(tupwidth);
419         inner_rel_bytes = ntuples * tupsize;
420
421         /*
422          * Target in-memory hashtable size is work_mem kilobytes.
423          */
424         hash_table_bytes = work_mem * 1024L;
425
426         /*
427          * If skew optimization is possible, estimate the number of skew buckets
428          * that will fit in the memory allowed, and decrement the assumed space
429          * available for the main hash table accordingly.
430          *
431          * We make the optimistic assumption that each skew bucket will contain
432          * one inner-relation tuple.  If that turns out to be low, we will recover
433          * at runtime by reducing the number of skew buckets.
434          *
435          * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
436          * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
437          * will round up to the next power of 2 and then multiply by 4 to reduce
438          * collisions.
439          */
440         if (useskew)
441         {
442                 skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
443
444                 /*----------
445                  * Divisor is:
446                  * size of a hash tuple +
447                  * worst-case size of skewBucket[] per MCV +
448                  * size of skewBucketNums[] entry +
449                  * size of skew bucket struct itself
450                  *----------
451                  */
452                 *num_skew_mcvs = skew_table_bytes / (tupsize +
453                                                                                          (8 * sizeof(HashSkewBucket *)) +
454                                                                                          sizeof(int) +
455                                                                                          SKEW_BUCKET_OVERHEAD);
456                 if (*num_skew_mcvs > 0)
457                         hash_table_bytes -= skew_table_bytes;
458         }
459         else
460                 *num_skew_mcvs = 0;
461
462         /*
463          * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
464          * memory is filled, assuming a single batch.  The Min() step limits the
465          * results so that the pointer arrays we'll try to allocate do not exceed
466          * work_mem.
467          */
468         max_pointers = (work_mem * 1024L) / sizeof(void *);
469         /* also ensure we avoid integer overflow in nbatch and nbuckets */
470         max_pointers = Min(max_pointers, INT_MAX / 2);
471         dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
472         dbuckets = Min(dbuckets, max_pointers);
473         nbuckets = Max((int) dbuckets, 1024);
474         nbuckets = 1 << my_log2(nbuckets);
475         bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
476
477         /*
478          * If there's not enough space to store the projected number of tuples
479          * and the required bucket headers, we will need multiple batches.
480          */
481         if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
482         {
483                 /* We'll need multiple batches */
484                 long            lbuckets;
485                 double          dbatch;
486                 int                     minbatch;
487                 long            bucket_size;
488
489                 /*
490                  * Estimate the number of buckets we'll want to have when work_mem
491                  * is entirely full.  Each bucket will contain a bucket pointer plus
492                  * NTUP_PER_BUCKET tuples, whose projected size already includes
493                  * overhead for the hash code, pointer to the next tuple, etc.
494                  */
495                 bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
496                 lbuckets = 1 << my_log2(hash_table_bytes / bucket_size);
497                 lbuckets = Min(lbuckets, max_pointers);
498                 nbuckets = (int) lbuckets;
499                 bucket_bytes = nbuckets * sizeof(HashJoinTuple);
500
501                 /*
502                  * Buckets are simple pointers to hashjoin tuples, while tupsize
503                  * includes the pointer, hash code, and MinimalTupleData.  So buckets
504                  * should never really exceed 25% of work_mem (even for
505                  * NTUP_PER_BUCKET=1); except maybe * for work_mem values that are
506                  * not 2^N bytes, where we might get more * because of doubling.
507                  * So let's look for 50% here.
508                  */
509                 Assert(bucket_bytes <= hash_table_bytes / 2);
510
511                 /* Calculate required number of batches. */
512                 dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
513                 dbatch = Min(dbatch, max_pointers);
514                 minbatch = (int) dbatch;
515                 nbatch = 2;
516                 while (nbatch < minbatch)
517                         nbatch <<= 1;
518         }
519
520         *numbuckets = nbuckets;
521         *numbatches = nbatch;
522 }
523
524
525 /* ----------------------------------------------------------------
526  *              ExecHashTableDestroy
527  *
528  *              destroy a hash table
529  * ----------------------------------------------------------------
530  */
531 void
532 ExecHashTableDestroy(HashJoinTable hashtable)
533 {
534         int                     i;
535
536         /*
537          * Make sure all the temp files are closed.  We skip batch 0, since it
538          * can't have any temp files (and the arrays might not even exist if
539          * nbatch is only 1).
540          */
541         for (i = 1; i < hashtable->nbatch; i++)
542         {
543                 if (hashtable->innerBatchFile[i])
544                         BufFileClose(hashtable->innerBatchFile[i]);
545                 if (hashtable->outerBatchFile[i])
546                         BufFileClose(hashtable->outerBatchFile[i]);
547         }
548
549         /* Release working memory (batchCxt is a child, so it goes away too) */
550         MemoryContextDelete(hashtable->hashCxt);
551
552         /* And drop the control block */
553         pfree(hashtable);
554 }
555
556 /*
557  * ExecHashIncreaseNumBatches
558  *              increase the original number of batches in order to reduce
559  *              current memory consumption
560  */
561 static void
562 ExecHashIncreaseNumBatches(HashJoinTable hashtable)
563 {
564         int                     oldnbatch = hashtable->nbatch;
565         int                     curbatch = hashtable->curbatch;
566         int                     nbatch;
567         MemoryContext oldcxt;
568         long            ninmemory;
569         long            nfreed;
570         HashMemoryChunk oldchunks;
571
572         /* do nothing if we've decided to shut off growth */
573         if (!hashtable->growEnabled)
574                 return;
575
576         /* safety check to avoid overflow */
577         if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
578                 return;
579
580         nbatch = oldnbatch * 2;
581         Assert(nbatch > 1);
582
583 #ifdef HJDEBUG
584         printf("Increasing nbatch to %d because space = %lu\n",
585                    nbatch, (unsigned long) hashtable->spaceUsed);
586 #endif
587
588         oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
589
590         if (hashtable->innerBatchFile == NULL)
591         {
592                 /* we had no file arrays before */
593                 hashtable->innerBatchFile = (BufFile **)
594                         palloc0(nbatch * sizeof(BufFile *));
595                 hashtable->outerBatchFile = (BufFile **)
596                         palloc0(nbatch * sizeof(BufFile *));
597                 /* time to establish the temp tablespaces, too */
598                 PrepareTempTablespaces();
599         }
600         else
601         {
602                 /* enlarge arrays and zero out added entries */
603                 hashtable->innerBatchFile = (BufFile **)
604                         repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *));
605                 hashtable->outerBatchFile = (BufFile **)
606                         repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *));
607                 MemSet(hashtable->innerBatchFile + oldnbatch, 0,
608                            (nbatch - oldnbatch) * sizeof(BufFile *));
609                 MemSet(hashtable->outerBatchFile + oldnbatch, 0,
610                            (nbatch - oldnbatch) * sizeof(BufFile *));
611         }
612
613         MemoryContextSwitchTo(oldcxt);
614
615         hashtable->nbatch = nbatch;
616
617         /*
618          * Scan through the existing hash table entries and dump out any that are
619          * no longer of the current batch.
620          */
621         ninmemory = nfreed = 0;
622
623         /*
624          * We will scan through the chunks directly, so that we can reset the
625          * buckets now and not have to keep track which tuples in the buckets have
626          * already been processed. We will free the old chunks as we go.
627          */
628         memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets);
629         oldchunks = hashtable->chunks;
630         hashtable->chunks = NULL;
631
632         /* so, let's scan through the old chunks, and all tuples in each chunk */
633         while (oldchunks != NULL)
634         {
635                 HashMemoryChunk nextchunk = oldchunks->next;
636                 /* position within the buffer (up to oldchunks->used) */
637                 size_t          idx = 0;
638
639                 /* process all tuples stored in this chunk (and then free it) */
640                 while (idx < oldchunks->used)
641                 {
642                         HashJoinTuple hashTuple = (HashJoinTuple) (oldchunks->data + idx);
643                         MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
644                         int                     hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
645                         int                     bucketno;
646                         int                     batchno;
647
648                         ninmemory++;
649                         ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
650                                                                           &bucketno, &batchno);
651
652                         if (batchno == curbatch)
653                         {
654                                 /* keep tuple in memory - copy it into the new chunk */
655                                 HashJoinTuple copyTuple =
656                                         (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
657                                 memcpy(copyTuple, hashTuple, hashTupleSize);
658
659                                 /* and add it back to the appropriate bucket */
660                                 copyTuple->next = hashtable->buckets[bucketno];
661                                 hashtable->buckets[bucketno] = copyTuple;
662                         }
663                         else
664                         {
665                                 /* dump it out */
666                                 Assert(batchno > curbatch);
667                                 ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
668                                                                           hashTuple->hashvalue,
669                                                                           &hashtable->innerBatchFile[batchno]);
670
671                                 hashtable->spaceUsed -= hashTupleSize;
672                                 nfreed++;
673                         }
674
675                         /* next tuple in this chunk */
676                         idx += MAXALIGN(hashTupleSize);
677                 }
678
679                 /* we're done with this chunk - free it and proceed to the next one */
680                 pfree(oldchunks);
681                 oldchunks = nextchunk;
682         }
683
684 #ifdef HJDEBUG
685         printf("Freed %ld of %ld tuples, space now %lu\n",
686                    nfreed, ninmemory, (unsigned long) hashtable->spaceUsed);
687 #endif
688
689         /*
690          * If we dumped out either all or none of the tuples in the table, disable
691          * further expansion of nbatch.  This situation implies that we have
692          * enough tuples of identical hashvalues to overflow spaceAllowed.
693          * Increasing nbatch will not fix it since there's no way to subdivide the
694          * group any more finely. We have to just gut it out and hope the server
695          * has enough RAM.
696          */
697         if (nfreed == 0 || nfreed == ninmemory)
698         {
699                 hashtable->growEnabled = false;
700 #ifdef HJDEBUG
701                 printf("Disabling further increase of nbatch\n");
702 #endif
703         }
704 }
705
706 /*
707  * ExecHashTableInsert
708  *              insert a tuple into the hash table depending on the hash value
709  *              it may just go to a temp file for later batches
710  *
711  * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual
712  * tuple; the minimal case in particular is certain to happen while reloading
713  * tuples from batch files.  We could save some cycles in the regular-tuple
714  * case by not forcing the slot contents into minimal form; not clear if it's
715  * worth the messiness required.
716  */
717 void
718 ExecHashTableInsert(HashJoinTable hashtable,
719                                         TupleTableSlot *slot,
720                                         uint32 hashvalue)
721 {
722         MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
723         int                     bucketno;
724         int                     batchno;
725
726         ExecHashGetBucketAndBatch(hashtable, hashvalue,
727                                                           &bucketno, &batchno);
728
729         /*
730          * decide whether to put the tuple in the hash table or a temp file
731          */
732         if (batchno == hashtable->curbatch)
733         {
734                 /*
735                  * put the tuple in hash table
736                  */
737                 HashJoinTuple hashTuple;
738                 int                     hashTupleSize;
739
740                 /* Create the HashJoinTuple */
741                 hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
742                 hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
743
744                 hashTuple->hashvalue = hashvalue;
745                 memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
746
747                 /*
748                  * We always reset the tuple-matched flag on insertion.  This is okay
749                  * even when reloading a tuple from a batch file, since the tuple
750                  * could not possibly have been matched to an outer tuple before it
751                  * went into the batch file.
752                  */
753                 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
754
755                 /* Push it onto the front of the bucket's list */
756                 hashTuple->next = hashtable->buckets[bucketno];
757                 hashtable->buckets[bucketno] = hashTuple;
758
759                 /* Account for space used, and back off if we've used too much */
760                 hashtable->spaceUsed += hashTupleSize;
761                 if (hashtable->spaceUsed > hashtable->spacePeak)
762                         hashtable->spacePeak = hashtable->spaceUsed;
763                 if (hashtable->spaceUsed + hashtable->nbuckets * sizeof(HashJoinTuple)
764                         > hashtable->spaceAllowed)
765                         ExecHashIncreaseNumBatches(hashtable);
766         }
767         else
768         {
769                 /*
770                  * put the tuple into a temp file for later batches
771                  */
772                 Assert(batchno > hashtable->curbatch);
773                 ExecHashJoinSaveTuple(tuple,
774                                                           hashvalue,
775                                                           &hashtable->innerBatchFile[batchno]);
776         }
777 }
778
779 /*
780  * ExecHashGetHashValue
781  *              Compute the hash value for a tuple
782  *
783  * The tuple to be tested must be in either econtext->ecxt_outertuple or
784  * econtext->ecxt_innertuple.  Vars in the hashkeys expressions should have
785  * varno either OUTER_VAR or INNER_VAR.
786  *
787  * A TRUE result means the tuple's hash value has been successfully computed
788  * and stored at *hashvalue.  A FALSE result means the tuple cannot match
789  * because it contains a null attribute, and hence it should be discarded
790  * immediately.  (If keep_nulls is true then FALSE is never returned.)
791  */
792 bool
793 ExecHashGetHashValue(HashJoinTable hashtable,
794                                          ExprContext *econtext,
795                                          List *hashkeys,
796                                          bool outer_tuple,
797                                          bool keep_nulls,
798                                          uint32 *hashvalue)
799 {
800         uint32          hashkey = 0;
801         FmgrInfo   *hashfunctions;
802         ListCell   *hk;
803         int                     i = 0;
804         MemoryContext oldContext;
805
806         /*
807          * We reset the eval context each time to reclaim any memory leaked in the
808          * hashkey expressions.
809          */
810         ResetExprContext(econtext);
811
812         oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
813
814         if (outer_tuple)
815                 hashfunctions = hashtable->outer_hashfunctions;
816         else
817                 hashfunctions = hashtable->inner_hashfunctions;
818
819         foreach(hk, hashkeys)
820         {
821                 ExprState  *keyexpr = (ExprState *) lfirst(hk);
822                 Datum           keyval;
823                 bool            isNull;
824
825                 /* rotate hashkey left 1 bit at each step */
826                 hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
827
828                 /*
829                  * Get the join attribute value of the tuple
830                  */
831                 keyval = ExecEvalExpr(keyexpr, econtext, &isNull, NULL);
832
833                 /*
834                  * If the attribute is NULL, and the join operator is strict, then
835                  * this tuple cannot pass the join qual so we can reject it
836                  * immediately (unless we're scanning the outside of an outer join, in
837                  * which case we must not reject it).  Otherwise we act like the
838                  * hashcode of NULL is zero (this will support operators that act like
839                  * IS NOT DISTINCT, though not any more-random behavior).  We treat
840                  * the hash support function as strict even if the operator is not.
841                  *
842                  * Note: currently, all hashjoinable operators must be strict since
843                  * the hash index AM assumes that.  However, it takes so little extra
844                  * code here to allow non-strict that we may as well do it.
845                  */
846                 if (isNull)
847                 {
848                         if (hashtable->hashStrict[i] && !keep_nulls)
849                         {
850                                 MemoryContextSwitchTo(oldContext);
851                                 return false;   /* cannot match */
852                         }
853                         /* else, leave hashkey unmodified, equivalent to hashcode 0 */
854                 }
855                 else
856                 {
857                         /* Compute the hash function */
858                         uint32          hkey;
859
860                         hkey = DatumGetUInt32(FunctionCall1(&hashfunctions[i], keyval));
861                         hashkey ^= hkey;
862                 }
863
864                 i++;
865         }
866
867         MemoryContextSwitchTo(oldContext);
868
869         *hashvalue = hashkey;
870         return true;
871 }
872
873 /*
874  * ExecHashGetBucketAndBatch
875  *              Determine the bucket number and batch number for a hash value
876  *
877  * Note: on-the-fly increases of nbatch must not change the bucket number
878  * for a given hash code (since we don't move tuples to different hash
879  * chains), and must only cause the batch number to remain the same or
880  * increase.  Our algorithm is
881  *              bucketno = hashvalue MOD nbuckets
882  *              batchno = (hashvalue DIV nbuckets) MOD nbatch
883  * where nbuckets and nbatch are both expected to be powers of 2, so we can
884  * do the computations by shifting and masking.  (This assumes that all hash
885  * functions are good about randomizing all their output bits, else we are
886  * likely to have very skewed bucket or batch occupancy.)
887  *
888  * nbuckets doesn't change over the course of the join.
889  *
890  * nbatch is always a power of 2; we increase it only by doubling it.  This
891  * effectively adds one more bit to the top of the batchno.
892  */
893 void
894 ExecHashGetBucketAndBatch(HashJoinTable hashtable,
895                                                   uint32 hashvalue,
896                                                   int *bucketno,
897                                                   int *batchno)
898 {
899         uint32          nbuckets = (uint32) hashtable->nbuckets;
900         uint32          nbatch = (uint32) hashtable->nbatch;
901
902         if (nbatch > 1)
903         {
904                 /* we can do MOD by masking, DIV by shifting */
905                 *bucketno = hashvalue & (nbuckets - 1);
906                 *batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);
907         }
908         else
909         {
910                 *bucketno = hashvalue & (nbuckets - 1);
911                 *batchno = 0;
912         }
913 }
914
915 /*
916  * ExecScanHashBucket
917  *              scan a hash bucket for matches to the current outer tuple
918  *
919  * The current outer tuple must be stored in econtext->ecxt_outertuple.
920  *
921  * On success, the inner tuple is stored into hjstate->hj_CurTuple and
922  * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
923  * for the latter.
924  */
925 bool
926 ExecScanHashBucket(HashJoinState *hjstate,
927                                    ExprContext *econtext)
928 {
929         List       *hjclauses = hjstate->hashclauses;
930         HashJoinTable hashtable = hjstate->hj_HashTable;
931         HashJoinTuple hashTuple = hjstate->hj_CurTuple;
932         uint32          hashvalue = hjstate->hj_CurHashValue;
933
934         /*
935          * hj_CurTuple is the address of the tuple last returned from the current
936          * bucket, or NULL if it's time to start scanning a new bucket.
937          *
938          * If the tuple hashed to a skew bucket then scan the skew bucket
939          * otherwise scan the standard hashtable bucket.
940          */
941         if (hashTuple != NULL)
942                 hashTuple = hashTuple->next;
943         else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
944                 hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
945         else
946                 hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
947
948         while (hashTuple != NULL)
949         {
950                 if (hashTuple->hashvalue == hashvalue)
951                 {
952                         TupleTableSlot *inntuple;
953
954                         /* insert hashtable's tuple into exec slot so ExecQual sees it */
955                         inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
956                                                                                          hjstate->hj_HashTupleSlot,
957                                                                                          false);        /* do not pfree */
958                         econtext->ecxt_innertuple = inntuple;
959
960                         /* reset temp memory each time to avoid leaks from qual expr */
961                         ResetExprContext(econtext);
962
963                         if (ExecQual(hjclauses, econtext, false))
964                         {
965                                 hjstate->hj_CurTuple = hashTuple;
966                                 return true;
967                         }
968                 }
969
970                 hashTuple = hashTuple->next;
971         }
972
973         /*
974          * no match
975          */
976         return false;
977 }
978
979 /*
980  * ExecPrepHashTableForUnmatched
981  *              set up for a series of ExecScanHashTableForUnmatched calls
982  */
983 void
984 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
985 {
986         /*
987          * ---------- During this scan we use the HashJoinState fields as follows:
988          *
989          * hj_CurBucketNo: next regular bucket to scan hj_CurSkewBucketNo: next
990          * skew bucket (an index into skewBucketNums) hj_CurTuple: last tuple
991          * returned, or NULL to start next bucket ----------
992          */
993         hjstate->hj_CurBucketNo = 0;
994         hjstate->hj_CurSkewBucketNo = 0;
995         hjstate->hj_CurTuple = NULL;
996 }
997
998 /*
999  * ExecScanHashTableForUnmatched
1000  *              scan the hash table for unmatched inner tuples
1001  *
1002  * On success, the inner tuple is stored into hjstate->hj_CurTuple and
1003  * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
1004  * for the latter.
1005  */
1006 bool
1007 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
1008 {
1009         HashJoinTable hashtable = hjstate->hj_HashTable;
1010         HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1011
1012         for (;;)
1013         {
1014                 /*
1015                  * hj_CurTuple is the address of the tuple last returned from the
1016                  * current bucket, or NULL if it's time to start scanning a new
1017                  * bucket.
1018                  */
1019                 if (hashTuple != NULL)
1020                         hashTuple = hashTuple->next;
1021                 else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
1022                 {
1023                         hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
1024                         hjstate->hj_CurBucketNo++;
1025                 }
1026                 else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
1027                 {
1028                         int                     j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
1029
1030                         hashTuple = hashtable->skewBucket[j]->tuples;
1031                         hjstate->hj_CurSkewBucketNo++;
1032                 }
1033                 else
1034                         break;                          /* finished all buckets */
1035
1036                 while (hashTuple != NULL)
1037                 {
1038                         if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
1039                         {
1040                                 TupleTableSlot *inntuple;
1041
1042                                 /* insert hashtable's tuple into exec slot */
1043                                 inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1044                                                                                                  hjstate->hj_HashTupleSlot,
1045                                                                                                  false);                /* do not pfree */
1046                                 econtext->ecxt_innertuple = inntuple;
1047
1048                                 /*
1049                                  * Reset temp memory each time; although this function doesn't
1050                                  * do any qual eval, the caller will, so let's keep it
1051                                  * parallel to ExecScanHashBucket.
1052                                  */
1053                                 ResetExprContext(econtext);
1054
1055                                 hjstate->hj_CurTuple = hashTuple;
1056                                 return true;
1057                         }
1058
1059                         hashTuple = hashTuple->next;
1060                 }
1061         }
1062
1063         /*
1064          * no more unmatched tuples
1065          */
1066         return false;
1067 }
1068
1069 /*
1070  * ExecHashTableReset
1071  *
1072  *              reset hash table header for new batch
1073  */
1074 void
1075 ExecHashTableReset(HashJoinTable hashtable)
1076 {
1077         MemoryContext oldcxt;
1078         int                     nbuckets = hashtable->nbuckets;
1079
1080         /*
1081          * Release all the hash buckets and tuples acquired in the prior pass, and
1082          * reinitialize the context for a new pass.
1083          */
1084         MemoryContextReset(hashtable->batchCxt);
1085         oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
1086
1087         /* Reallocate and reinitialize the hash bucket headers. */
1088         hashtable->buckets = (HashJoinTuple *)
1089                 palloc0(nbuckets * sizeof(HashJoinTuple));
1090
1091         hashtable->spaceUsed = 0;
1092
1093         MemoryContextSwitchTo(oldcxt);
1094
1095         /* Forget the chunks (the memory was freed by the context reset above). */
1096         hashtable->chunks = NULL;
1097 }
1098
1099 /*
1100  * ExecHashTableResetMatchFlags
1101  *              Clear all the HeapTupleHeaderHasMatch flags in the table
1102  */
1103 void
1104 ExecHashTableResetMatchFlags(HashJoinTable hashtable)
1105 {
1106         HashJoinTuple tuple;
1107         int                     i;
1108
1109         /* Reset all flags in the main table ... */
1110         for (i = 0; i < hashtable->nbuckets; i++)
1111         {
1112                 for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next)
1113                         HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
1114         }
1115
1116         /* ... and the same for the skew buckets, if any */
1117         for (i = 0; i < hashtable->nSkewBuckets; i++)
1118         {
1119                 int                     j = hashtable->skewBucketNums[i];
1120                 HashSkewBucket *skewBucket = hashtable->skewBucket[j];
1121
1122                 for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next)
1123                         HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
1124         }
1125 }
1126
1127
1128 void
1129 ExecReScanHash(HashState *node)
1130 {
1131         /*
1132          * if chgParam of subnode is not null then plan will be re-scanned by
1133          * first ExecProcNode.
1134          */
1135         if (node->ps.lefttree->chgParam == NULL)
1136                 ExecReScan(node->ps.lefttree);
1137 }
1138
1139
1140 /*
1141  * ExecHashBuildSkewHash
1142  *
1143  *              Set up for skew optimization if we can identify the most common values
1144  *              (MCVs) of the outer relation's join key.  We make a skew hash bucket
1145  *              for the hash value of each MCV, up to the number of slots allowed
1146  *              based on available memory.
1147  */
1148 static void
1149 ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
1150 {
1151         HeapTupleData *statsTuple;
1152         Datum      *values;
1153         int                     nvalues;
1154         float4     *numbers;
1155         int                     nnumbers;
1156
1157         /* Do nothing if planner didn't identify the outer relation's join key */
1158         if (!OidIsValid(node->skewTable))
1159                 return;
1160         /* Also, do nothing if we don't have room for at least one skew bucket */
1161         if (mcvsToUse <= 0)
1162                 return;
1163
1164         /*
1165          * Try to find the MCV statistics for the outer relation's join key.
1166          */
1167         statsTuple = SearchSysCache3(STATRELATTINH,
1168                                                                  ObjectIdGetDatum(node->skewTable),
1169                                                                  Int16GetDatum(node->skewColumn),
1170                                                                  BoolGetDatum(node->skewInherit));
1171         if (!HeapTupleIsValid(statsTuple))
1172                 return;
1173
1174         if (get_attstatsslot(statsTuple, node->skewColType, node->skewColTypmod,
1175                                                  STATISTIC_KIND_MCV, InvalidOid,
1176                                                  NULL,
1177                                                  &values, &nvalues,
1178                                                  &numbers, &nnumbers))
1179         {
1180                 double          frac;
1181                 int                     nbuckets;
1182                 FmgrInfo   *hashfunctions;
1183                 int                     i;
1184
1185                 if (mcvsToUse > nvalues)
1186                         mcvsToUse = nvalues;
1187
1188                 /*
1189                  * Calculate the expected fraction of outer relation that will
1190                  * participate in the skew optimization.  If this isn't at least
1191                  * SKEW_MIN_OUTER_FRACTION, don't use skew optimization.
1192                  */
1193                 frac = 0;
1194                 for (i = 0; i < mcvsToUse; i++)
1195                         frac += numbers[i];
1196                 if (frac < SKEW_MIN_OUTER_FRACTION)
1197                 {
1198                         free_attstatsslot(node->skewColType,
1199                                                           values, nvalues, numbers, nnumbers);
1200                         ReleaseSysCache(statsTuple);
1201                         return;
1202                 }
1203
1204                 /*
1205                  * Okay, set up the skew hashtable.
1206                  *
1207                  * skewBucket[] is an open addressing hashtable with a power of 2 size
1208                  * that is greater than the number of MCV values.  (This ensures there
1209                  * will be at least one null entry, so searches will always
1210                  * terminate.)
1211                  *
1212                  * Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or
1213                  * MaxAllocSize/sizeof(void *)/8, but that is not currently possible
1214                  * since we limit pg_statistic entries to much less than that.
1215                  */
1216                 nbuckets = 2;
1217                 while (nbuckets <= mcvsToUse)
1218                         nbuckets <<= 1;
1219                 /* use two more bits just to help avoid collisions */
1220                 nbuckets <<= 2;
1221
1222                 hashtable->skewEnabled = true;
1223                 hashtable->skewBucketLen = nbuckets;
1224
1225                 /*
1226                  * We allocate the bucket memory in the hashtable's batch context. It
1227                  * is only needed during the first batch, and this ensures it will be
1228                  * automatically removed once the first batch is done.
1229                  */
1230                 hashtable->skewBucket = (HashSkewBucket **)
1231                         MemoryContextAllocZero(hashtable->batchCxt,
1232                                                                    nbuckets * sizeof(HashSkewBucket *));
1233                 hashtable->skewBucketNums = (int *)
1234                         MemoryContextAllocZero(hashtable->batchCxt,
1235                                                                    mcvsToUse * sizeof(int));
1236
1237                 hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *)
1238                         + mcvsToUse * sizeof(int);
1239                 hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
1240                         + mcvsToUse * sizeof(int);
1241                 if (hashtable->spaceUsed > hashtable->spacePeak)
1242                         hashtable->spacePeak = hashtable->spaceUsed;
1243
1244                 /*
1245                  * Create a skew bucket for each MCV hash value.
1246                  *
1247                  * Note: it is very important that we create the buckets in order of
1248                  * decreasing MCV frequency.  If we have to remove some buckets, they
1249                  * must be removed in reverse order of creation (see notes in
1250                  * ExecHashRemoveNextSkewBucket) and we want the least common MCVs to
1251                  * be removed first.
1252                  */
1253                 hashfunctions = hashtable->outer_hashfunctions;
1254
1255                 for (i = 0; i < mcvsToUse; i++)
1256                 {
1257                         uint32          hashvalue;
1258                         int                     bucket;
1259
1260                         hashvalue = DatumGetUInt32(FunctionCall1(&hashfunctions[0],
1261                                                                                                          values[i]));
1262
1263                         /*
1264                          * While we have not hit a hole in the hashtable and have not hit
1265                          * the desired bucket, we have collided with some previous hash
1266                          * value, so try the next bucket location.  NB: this code must
1267                          * match ExecHashGetSkewBucket.
1268                          */
1269                         bucket = hashvalue & (nbuckets - 1);
1270                         while (hashtable->skewBucket[bucket] != NULL &&
1271                                    hashtable->skewBucket[bucket]->hashvalue != hashvalue)
1272                                 bucket = (bucket + 1) & (nbuckets - 1);
1273
1274                         /*
1275                          * If we found an existing bucket with the same hashvalue, leave
1276                          * it alone.  It's okay for two MCVs to share a hashvalue.
1277                          */
1278                         if (hashtable->skewBucket[bucket] != NULL)
1279                                 continue;
1280
1281                         /* Okay, create a new skew bucket for this hashvalue. */
1282                         hashtable->skewBucket[bucket] = (HashSkewBucket *)
1283                                 MemoryContextAlloc(hashtable->batchCxt,
1284                                                                    sizeof(HashSkewBucket));
1285                         hashtable->skewBucket[bucket]->hashvalue = hashvalue;
1286                         hashtable->skewBucket[bucket]->tuples = NULL;
1287                         hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
1288                         hashtable->nSkewBuckets++;
1289                         hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
1290                         hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
1291                         if (hashtable->spaceUsed > hashtable->spacePeak)
1292                                 hashtable->spacePeak = hashtable->spaceUsed;
1293                 }
1294
1295                 free_attstatsslot(node->skewColType,
1296                                                   values, nvalues, numbers, nnumbers);
1297         }
1298
1299         ReleaseSysCache(statsTuple);
1300 }
1301
1302 /*
1303  * ExecHashGetSkewBucket
1304  *
1305  *              Returns the index of the skew bucket for this hashvalue,
1306  *              or INVALID_SKEW_BUCKET_NO if the hashvalue is not
1307  *              associated with any active skew bucket.
1308  */
1309 int
1310 ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
1311 {
1312         int                     bucket;
1313
1314         /*
1315          * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
1316          * particular, this happens after the initial batch is done).
1317          */
1318         if (!hashtable->skewEnabled)
1319                 return INVALID_SKEW_BUCKET_NO;
1320
1321         /*
1322          * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
1323          */
1324         bucket = hashvalue & (hashtable->skewBucketLen - 1);
1325
1326         /*
1327          * While we have not hit a hole in the hashtable and have not hit the
1328          * desired bucket, we have collided with some other hash value, so try the
1329          * next bucket location.
1330          */
1331         while (hashtable->skewBucket[bucket] != NULL &&
1332                    hashtable->skewBucket[bucket]->hashvalue != hashvalue)
1333                 bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
1334
1335         /*
1336          * Found the desired bucket?
1337          */
1338         if (hashtable->skewBucket[bucket] != NULL)
1339                 return bucket;
1340
1341         /*
1342          * There must not be any hashtable entry for this hash value.
1343          */
1344         return INVALID_SKEW_BUCKET_NO;
1345 }
1346
1347 /*
1348  * ExecHashSkewTableInsert
1349  *
1350  *              Insert a tuple into the skew hashtable.
1351  *
1352  * This should generally match up with the current-batch case in
1353  * ExecHashTableInsert.
1354  */
1355 static void
1356 ExecHashSkewTableInsert(HashJoinTable hashtable,
1357                                                 TupleTableSlot *slot,
1358                                                 uint32 hashvalue,
1359                                                 int bucketNumber)
1360 {
1361         MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
1362         HashJoinTuple hashTuple;
1363         int                     hashTupleSize;
1364
1365         /* Create the HashJoinTuple */
1366         hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1367         hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
1368                                                                                                    hashTupleSize);
1369         hashTuple->hashvalue = hashvalue;
1370         memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1371         HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
1372
1373         /* Push it onto the front of the skew bucket's list */
1374         hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples;
1375         hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
1376
1377         /* Account for space used, and back off if we've used too much */
1378         hashtable->spaceUsed += hashTupleSize;
1379         hashtable->spaceUsedSkew += hashTupleSize;
1380         if (hashtable->spaceUsed > hashtable->spacePeak)
1381                 hashtable->spacePeak = hashtable->spaceUsed;
1382         while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
1383                 ExecHashRemoveNextSkewBucket(hashtable);
1384
1385         /* Check we are not over the total spaceAllowed, either */
1386         if (hashtable->spaceUsed > hashtable->spaceAllowed)
1387                 ExecHashIncreaseNumBatches(hashtable);
1388 }
1389
1390 /*
1391  *              ExecHashRemoveNextSkewBucket
1392  *
1393  *              Remove the least valuable skew bucket by pushing its tuples into
1394  *              the main hash table.
1395  */
1396 static void
1397 ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
1398 {
1399         int                     bucketToRemove;
1400         HashSkewBucket *bucket;
1401         uint32          hashvalue;
1402         int                     bucketno;
1403         int                     batchno;
1404         HashJoinTuple hashTuple;
1405
1406         /* Locate the bucket to remove */
1407         bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1];
1408         bucket = hashtable->skewBucket[bucketToRemove];
1409
1410         /*
1411          * Calculate which bucket and batch the tuples belong to in the main
1412          * hashtable.  They all have the same hash value, so it's the same for all
1413          * of them.  Also note that it's not possible for nbatch to increase while
1414          * we are processing the tuples.
1415          */
1416         hashvalue = bucket->hashvalue;
1417         ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1418
1419         /* Process all tuples in the bucket */
1420         hashTuple = bucket->tuples;
1421         while (hashTuple != NULL)
1422         {
1423                 HashJoinTuple nextHashTuple = hashTuple->next;
1424                 MinimalTuple tuple;
1425                 Size            tupleSize;
1426
1427                 /*
1428                  * This code must agree with ExecHashTableInsert.  We do not use
1429                  * ExecHashTableInsert directly as ExecHashTableInsert expects a
1430                  * TupleTableSlot while we already have HashJoinTuples.
1431                  */
1432                 tuple = HJTUPLE_MINTUPLE(hashTuple);
1433                 tupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1434
1435                 /* Decide whether to put the tuple in the hash table or a temp file */
1436                 if (batchno == hashtable->curbatch)
1437                 {
1438                         /* Move the tuple to the main hash table */
1439                         hashTuple->next = hashtable->buckets[bucketno];
1440                         hashtable->buckets[bucketno] = hashTuple;
1441                         /* We have reduced skew space, but overall space doesn't change */
1442                         hashtable->spaceUsedSkew -= tupleSize;
1443                 }
1444                 else
1445                 {
1446                         /* Put the tuple into a temp file for later batches */
1447                         Assert(batchno > hashtable->curbatch);
1448                         ExecHashJoinSaveTuple(tuple, hashvalue,
1449                                                                   &hashtable->innerBatchFile[batchno]);
1450                         pfree(hashTuple);
1451                         hashtable->spaceUsed -= tupleSize;
1452                         hashtable->spaceUsedSkew -= tupleSize;
1453                 }
1454
1455                 hashTuple = nextHashTuple;
1456         }
1457
1458         /*
1459          * Free the bucket struct itself and reset the hashtable entry to NULL.
1460          *
1461          * NOTE: this is not nearly as simple as it looks on the surface, because
1462          * of the possibility of collisions in the hashtable.  Suppose that hash
1463          * values A and B collide at a particular hashtable entry, and that A was
1464          * entered first so B gets shifted to a different table entry.  If we were
1465          * to remove A first then ExecHashGetSkewBucket would mistakenly start
1466          * reporting that B is not in the hashtable, because it would hit the NULL
1467          * before finding B.  However, we always remove entries in the reverse
1468          * order of creation, so this failure cannot happen.
1469          */
1470         hashtable->skewBucket[bucketToRemove] = NULL;
1471         hashtable->nSkewBuckets--;
1472         pfree(bucket);
1473         hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD;
1474         hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD;
1475
1476         /*
1477          * If we have removed all skew buckets then give up on skew optimization.
1478          * Release the arrays since they aren't useful any more.
1479          */
1480         if (hashtable->nSkewBuckets == 0)
1481         {
1482                 hashtable->skewEnabled = false;
1483                 pfree(hashtable->skewBucket);
1484                 pfree(hashtable->skewBucketNums);
1485                 hashtable->skewBucket = NULL;
1486                 hashtable->skewBucketNums = NULL;
1487                 hashtable->spaceUsed -= hashtable->spaceUsedSkew;
1488                 hashtable->spaceUsedSkew = 0;
1489         }
1490 }
1491
1492 /*
1493  * Allocate 'size' bytes from the currently active HashMemoryChunk
1494  */
1495 static void *
1496 dense_alloc(HashJoinTable hashtable, Size size)
1497 {
1498         HashMemoryChunk newChunk;
1499         char       *ptr;
1500
1501         /* just in case the size is not already aligned properly */
1502         size = MAXALIGN(size);
1503
1504         /*
1505          * If tuple size is larger than of 1/4 of chunk size, allocate a separate
1506          * chunk.
1507          */
1508         if (size > HASH_CHUNK_THRESHOLD)
1509         {
1510                 /* allocate new chunk and put it at the beginning of the list */
1511                 newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
1512                                                                   offsetof(HashMemoryChunkData, data) + size);
1513                 newChunk->maxlen = size;
1514                 newChunk->used = 0;
1515                 newChunk->ntuples = 0;
1516
1517                 /*
1518                  * Add this chunk to the list after the first existing chunk, so that
1519                  * we don't lose the remaining space in the "current" chunk.
1520                  */
1521                 if (hashtable->chunks != NULL)
1522                 {
1523                         newChunk->next = hashtable->chunks->next;
1524                         hashtable->chunks->next = newChunk;
1525                 }
1526                 else
1527                 {
1528                         newChunk->next = hashtable->chunks;
1529                         hashtable->chunks = newChunk;
1530                 }
1531
1532                 newChunk->used += size;
1533                 newChunk->ntuples += 1;
1534
1535                 return newChunk->data;
1536         }
1537
1538         /*
1539          * See if we have enough space for it in the current chunk (if any).
1540          * If not, allocate a fresh chunk.
1541          */
1542         if ((hashtable->chunks == NULL) ||
1543                 (hashtable->chunks->maxlen - hashtable->chunks->used) < size)
1544         {
1545                 /* allocate new chunk and put it at the beginning of the list */
1546                 newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
1547                                            offsetof(HashMemoryChunkData, data) + HASH_CHUNK_SIZE);
1548
1549                 newChunk->maxlen = HASH_CHUNK_SIZE;
1550                 newChunk->used = size;
1551                 newChunk->ntuples = 1;
1552
1553                 newChunk->next = hashtable->chunks;
1554                 hashtable->chunks = newChunk;
1555
1556                 return newChunk->data;
1557         }
1558
1559         /* There is enough space in the current chunk, let's add the tuple */
1560         ptr = hashtable->chunks->data + hashtable->chunks->used;
1561         hashtable->chunks->used += size;
1562         hashtable->chunks->ntuples += 1;
1563
1564         /* return pointer to the start of the tuple memory */
1565         return ptr;
1566 }