1 /*-------------------------------------------------------------------------
4 * Routines to handle hash join nodes
6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/executor/nodeHashjoin.c
15 * Hash joins can participate in parallel query execution in several ways. A
16 * parallel-oblivious hash join is one where the node is unaware that it is
17 * part of a parallel plan. In this case, a copy of the inner plan is used to
18 * build a copy of the hash table in every backend, and the outer plan could
19 * either be built from a partial or complete path, so that the results of the
20 * hash join are correspondingly either partial or complete. A parallel-aware
21 * hash join is one that behaves differently, coordinating work between
22 * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
23 * Hash Join always appears with a Parallel Hash node.
25 * Parallel-aware hash joins use the same per-backend state machine to track
26 * progress through the hash join algorithm as parallel-oblivious hash joins.
27 * In a parallel-aware hash join, there is also a shared state machine that
28 * co-operating backends use to synchronize their local state machines and
29 * program counters. The shared state machine is managed with a Barrier IPC
30 * primitive. When all attached participants arrive at a barrier, the phase
31 * advances and all waiting participants are released.
33 * When a participant begins working on a parallel hash join, it must first
34 * figure out how much progress has already been made, because participants
35 * don't wait for each other to begin. For this reason there are switch
36 * statements at key points in the code where we have to synchronize our local
37 * state machine with the phase, and then jump to the correct part of the
38 * algorithm so that we can get started.
40 * One barrier called build_barrier is used to coordinate the hashing phases.
41 * The phase is represented by an integer which begins at zero and increments
42 * one by one, but in the code it is referred to by symbolic names as follows:
44 * PHJ_BUILD_ELECTING -- initial state
45 * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
46 * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
47 * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
48 * PHJ_BUILD_DONE -- building done, probing can begin
50 * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
51 * be used repeatedly as required to coordinate expansions in the number of
52 * batches or buckets. Their phases are as follows:
54 * PHJ_GROW_BATCHES_ELECTING -- initial state
55 * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
56 * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
57 * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
59 * PHJ_GROW_BUCKETS_ELECTING -- initial state
60 * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
61 * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
63 * If the planner got the number of batches and buckets right, those won't be
64 * necessary, but on the other hand we might finish up needing to expand the
65 * buckets or batches multiple times while hashing the inner relation to stay
66 * within our memory budget and load factor target. For that reason it's a
67 * separate pair of barriers using circular phases.
69 * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
70 * because we need to divide the outer relation into batches up front in order
71 * to be able to process batches entirely independently. In contrast, the
72 * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
73 * batches whenever it encounters them while scanning and probing, which it
74 * can do because it processes batches in serial order.
76 * Once PHJ_BUILD_DONE is reached, backends then split up and process
77 * different batches, or gang up and work together on probing batches if there
78 * aren't enough to go around. For each batch there is a separate barrier
79 * with the following phases:
81 * PHJ_BATCH_ELECTING -- initial state
82 * PHJ_BATCH_ALLOCATING -- one allocates buckets
83 * PHJ_BATCH_LOADING -- all load the hash table from disk
84 * PHJ_BATCH_PROBING -- all probe
85 * PHJ_BATCH_DONE -- end
87 * Batch 0 is a special case, because it starts out in phase
88 * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
89 * PHJ_BUILD_HASHING_INNER so we can skip loading.
91 * Initially we try to plan for a single-batch hash join using the combined
92 * work_mem of all participants to create a large shared hash table. If that
93 * turns out either at planning or execution time to be impossible then we
94 * fall back to regular work_mem sized hash tables.
96 * To avoid deadlocks, we never wait for any barrier unless it is known that
97 * all other backends attached to it are actively executing the node or have
98 * already arrived. Practically, that means that we never return a tuple
99 * while attached to a barrier, unless the barrier has reached its final
100 * state. In the slightly special case of the per-batch barrier, we return
101 * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
102 * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
104 *-------------------------------------------------------------------------
107 #include "postgres.h"
109 #include "access/htup_details.h"
110 #include "access/parallel.h"
111 #include "executor/executor.h"
112 #include "executor/hashjoin.h"
113 #include "executor/nodeHash.h"
114 #include "executor/nodeHashjoin.h"
115 #include "miscadmin.h"
117 #include "utils/memutils.h"
118 #include "utils/sharedtuplestore.h"
122 * States of the ExecHashJoin state machine
124 #define HJ_BUILD_HASHTABLE 1
125 #define HJ_NEED_NEW_OUTER 2
126 #define HJ_SCAN_BUCKET 3
127 #define HJ_FILL_OUTER_TUPLE 4
128 #define HJ_FILL_INNER_TUPLES 5
129 #define HJ_NEED_NEW_BATCH 6
131 /* Returns true if doing null-fill on outer relation */
132 #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
133 /* Returns true if doing null-fill on inner relation */
134 #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
136 static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
137 HashJoinState *hjstate,
139 static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
140 HashJoinState *hjstate,
142 static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
145 TupleTableSlot *tupleSlot);
146 static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
147 static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
148 static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
151 /* ----------------------------------------------------------------
154 * This function implements the Hybrid Hashjoin algorithm. It is marked
155 * with an always-inline attribute so that ExecHashJoin() and
156 * ExecParallelHashJoin() can inline it. Compilers that respect the
157 * attribute should create versions specialized for parallel == true and
158 * parallel == false with unnecessary branches removed.
160 * Note: the relation we build hash table on is the "inner"
161 * the other one is "outer".
162 * ----------------------------------------------------------------
164 static pg_attribute_always_inline TupleTableSlot *
165 ExecHashJoinImpl(PlanState *pstate, bool parallel)
167 HashJoinState *node = castNode(HashJoinState, pstate);
168 PlanState *outerNode;
171 ExprState *otherqual;
172 ExprContext *econtext;
173 HashJoinTable hashtable;
174 TupleTableSlot *outerTupleSlot;
177 ParallelHashJoinState *parallel_state;
180 * get information from HashJoin node
182 joinqual = node->js.joinqual;
183 otherqual = node->js.ps.qual;
184 hashNode = (HashState *) innerPlanState(node);
185 outerNode = outerPlanState(node);
186 hashtable = node->hj_HashTable;
187 econtext = node->js.ps.ps_ExprContext;
188 parallel_state = hashNode->parallel_state;
191 * Reset per-tuple memory context to free any expression evaluation
192 * storage allocated in the previous tuple cycle.
194 ResetExprContext(econtext);
197 * run the hash join state machine
202 * It's possible to iterate this loop many times before returning a
203 * tuple, in some pathological cases such as needing to move much of
204 * the current batch to a later batch. So let's check for interrupts
207 CHECK_FOR_INTERRUPTS();
209 switch (node->hj_JoinState)
211 case HJ_BUILD_HASHTABLE:
214 * First time through: build hash table for inner relation.
216 Assert(hashtable == NULL);
219 * If the outer relation is completely empty, and it's not
220 * right/full join, we can quit without building the hash
221 * table. However, for an inner join it is only a win to
222 * check this when the outer relation's startup cost is less
223 * than the projected cost of building the hash table.
224 * Otherwise it's best to build the hash table first and see
225 * if the inner relation is empty. (When it's a left join, we
226 * should always make this check, since we aren't going to be
227 * able to skip the join on the strength of an empty inner
230 * If we are rescanning the join, we make use of information
231 * gained on the previous scan: don't bother to try the
232 * prefetch if the previous scan found the outer relation
233 * nonempty. This is not 100% reliable since with new
234 * parameters the outer relation might yield different
235 * results, but it's a good heuristic.
237 * The only way to make the check is to try to fetch a tuple
238 * from the outer plan node. If we succeed, we have to stash
239 * it away for later consumption by ExecHashJoinOuterGetTuple.
241 if (HJ_FILL_INNER(node))
243 /* no chance to not build the hash table */
244 node->hj_FirstOuterTupleSlot = NULL;
249 * The empty-outer optimization is not implemented for
250 * shared hash tables, because no one participant can
251 * determine that there are no outer tuples, and it's not
252 * yet clear that it's worth the synchronization overhead
253 * of reaching consensus to figure that out. So we have
254 * to build the hash table.
256 node->hj_FirstOuterTupleSlot = NULL;
258 else if (HJ_FILL_OUTER(node) ||
259 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
260 !node->hj_OuterNotEmpty))
262 node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
263 if (TupIsNull(node->hj_FirstOuterTupleSlot))
265 node->hj_OuterNotEmpty = false;
269 node->hj_OuterNotEmpty = true;
272 node->hj_FirstOuterTupleSlot = NULL;
275 * Create the hash table. If using Parallel Hash, then
276 * whoever gets here first will create the hash table and any
277 * later arrivals will merely attach to it.
279 hashtable = ExecHashTableCreate(hashNode,
280 node->hj_HashOperators,
281 HJ_FILL_INNER(node));
282 node->hj_HashTable = hashtable;
285 * Execute the Hash node, to build the hash table. If using
286 * Parallel Hash, then we'll try to help hashing unless we
289 hashNode->hashtable = hashtable;
290 (void) MultiExecProcNode((PlanState *) hashNode);
293 * If the inner relation is completely empty, and we're not
294 * doing a left outer join, we can quit without scanning the
297 if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
301 * need to remember whether nbatch has increased since we
302 * began scanning the outer relation
304 hashtable->nbatch_outstart = hashtable->nbatch;
307 * Reset OuterNotEmpty for scan. (It's OK if we fetched a
308 * tuple above, because ExecHashJoinOuterGetTuple will
309 * immediately set it again.)
311 node->hj_OuterNotEmpty = false;
315 Barrier *build_barrier;
317 build_barrier = ¶llel_state->build_barrier;
318 Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
319 BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
320 if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
323 * If multi-batch, we need to hash the outer relation
326 if (hashtable->nbatch > 1)
327 ExecParallelHashJoinPartitionOuter(node);
328 BarrierArriveAndWait(build_barrier,
329 WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
331 Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
333 /* Each backend should now select a batch to work on. */
334 hashtable->curbatch = -1;
335 node->hj_JoinState = HJ_NEED_NEW_BATCH;
340 node->hj_JoinState = HJ_NEED_NEW_OUTER;
344 case HJ_NEED_NEW_OUTER:
347 * We don't have an outer tuple, try to get the next one
351 ExecParallelHashJoinOuterGetTuple(outerNode, node,
355 ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
357 if (TupIsNull(outerTupleSlot))
359 /* end of batch, or maybe whole join */
360 if (HJ_FILL_INNER(node))
362 /* set up to scan for unmatched inner tuples */
363 ExecPrepHashTableForUnmatched(node);
364 node->hj_JoinState = HJ_FILL_INNER_TUPLES;
367 node->hj_JoinState = HJ_NEED_NEW_BATCH;
371 econtext->ecxt_outertuple = outerTupleSlot;
372 node->hj_MatchedOuter = false;
375 * Find the corresponding bucket for this tuple in the main
376 * hash table or skew hash table.
378 node->hj_CurHashValue = hashvalue;
379 ExecHashGetBucketAndBatch(hashtable, hashvalue,
380 &node->hj_CurBucketNo, &batchno);
381 node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
383 node->hj_CurTuple = NULL;
386 * The tuple might not belong to the current batch (where
387 * "current batch" includes the skew buckets if any).
389 if (batchno != hashtable->curbatch &&
390 node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
393 * Need to postpone this outer tuple to a later batch.
394 * Save it in the corresponding outer-batch file.
396 Assert(parallel_state == NULL);
397 Assert(batchno > hashtable->curbatch);
398 ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
400 &hashtable->outerBatchFile[batchno]);
402 /* Loop around, staying in HJ_NEED_NEW_OUTER state */
406 /* OK, let's scan the bucket for matches */
407 node->hj_JoinState = HJ_SCAN_BUCKET;
414 * Scan the selected hash bucket for matches to current outer
418 if (!ExecParallelScanHashBucket(node, econtext))
420 /* out of matches; check for possible outer-join fill */
421 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
427 if (!ExecScanHashBucket(node, econtext))
429 /* out of matches; check for possible outer-join fill */
430 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
436 * We've got a match, but still need to test non-hashed quals.
437 * ExecScanHashBucket already set up all the state needed to
440 * If we pass the qual, then save state for next call and have
441 * ExecProject form the projection, store it in the tuple
442 * table, and return the slot.
444 * Only the joinquals determine tuple match status, but all
445 * quals must pass to actually return the tuple.
447 if (joinqual == NULL || ExecQual(joinqual, econtext))
449 node->hj_MatchedOuter = true;
450 HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
452 /* In an antijoin, we never return a matched tuple */
453 if (node->js.jointype == JOIN_ANTI)
455 node->hj_JoinState = HJ_NEED_NEW_OUTER;
460 * If we only need to join to the first matching inner
461 * tuple, then consider returning this one, but after that
462 * continue with next outer tuple.
464 if (node->js.single_match)
465 node->hj_JoinState = HJ_NEED_NEW_OUTER;
467 if (otherqual == NULL || ExecQual(otherqual, econtext))
468 return ExecProject(node->js.ps.ps_ProjInfo);
470 InstrCountFiltered2(node, 1);
473 InstrCountFiltered1(node, 1);
476 case HJ_FILL_OUTER_TUPLE:
479 * The current outer tuple has run out of matches, so check
480 * whether to emit a dummy outer-join tuple. Whether we emit
481 * one or not, the next state is NEED_NEW_OUTER.
483 node->hj_JoinState = HJ_NEED_NEW_OUTER;
485 if (!node->hj_MatchedOuter &&
489 * Generate a fake join tuple with nulls for the inner
490 * tuple, and return it if it passes the non-join quals.
492 econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
494 if (otherqual == NULL || ExecQual(otherqual, econtext))
495 return ExecProject(node->js.ps.ps_ProjInfo);
497 InstrCountFiltered2(node, 1);
501 case HJ_FILL_INNER_TUPLES:
504 * We have finished a batch, but we are doing right/full join,
505 * so any unmatched inner tuples in the hashtable have to be
506 * emitted before we continue to the next batch.
508 if (!ExecScanHashTableForUnmatched(node, econtext))
510 /* no more unmatched tuples */
511 node->hj_JoinState = HJ_NEED_NEW_BATCH;
516 * Generate a fake join tuple with nulls for the outer tuple,
517 * and return it if it passes the non-join quals.
519 econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
521 if (otherqual == NULL || ExecQual(otherqual, econtext))
522 return ExecProject(node->js.ps.ps_ProjInfo);
524 InstrCountFiltered2(node, 1);
527 case HJ_NEED_NEW_BATCH:
530 * Try to advance to next batch. Done if there are no more.
534 if (!ExecParallelHashJoinNewBatch(node))
535 return NULL; /* end of parallel-aware join */
539 if (!ExecHashJoinNewBatch(node))
540 return NULL; /* end of parallel-oblivious join */
542 node->hj_JoinState = HJ_NEED_NEW_OUTER;
546 elog(ERROR, "unrecognized hashjoin state: %d",
547 (int) node->hj_JoinState);
552 /* ----------------------------------------------------------------
555 * Parallel-oblivious version.
556 * ----------------------------------------------------------------
558 static TupleTableSlot * /* return: a tuple or NULL */
559 ExecHashJoin(PlanState *pstate)
562 * On sufficiently smart compilers this should be inlined with the
563 * parallel-aware branches removed.
565 return ExecHashJoinImpl(pstate, false);
568 /* ----------------------------------------------------------------
569 * ExecParallelHashJoin
571 * Parallel-aware version.
572 * ----------------------------------------------------------------
574 static TupleTableSlot * /* return: a tuple or NULL */
575 ExecParallelHashJoin(PlanState *pstate)
578 * On sufficiently smart compilers this should be inlined with the
579 * parallel-oblivious branches removed.
581 return ExecHashJoinImpl(pstate, true);
584 /* ----------------------------------------------------------------
587 * Init routine for HashJoin node.
588 * ----------------------------------------------------------------
591 ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
593 HashJoinState *hjstate;
603 /* check for unsupported flags */
604 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
607 * create state structure
609 hjstate = makeNode(HashJoinState);
610 hjstate->js.ps.plan = (Plan *) node;
611 hjstate->js.ps.state = estate;
614 * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
615 * where this function may be replaced with a parallel version, if we
616 * managed to launch a parallel query.
618 hjstate->js.ps.ExecProcNode = ExecHashJoin;
619 hjstate->js.jointype = node->join.jointype;
622 * Miscellaneous initialization
624 * create expression context for node
626 ExecAssignExprContext(estate, &hjstate->js.ps);
629 * initialize child nodes
631 * Note: we could suppress the REWIND flag for the inner input, which
632 * would amount to betting that the hash will be a single batch. Not
633 * clear if this would be a win or not.
635 outerNode = outerPlan(node);
636 hashNode = (Hash *) innerPlan(node);
638 outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
639 outerDesc = ExecGetResultType(outerPlanState(hjstate));
640 innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
641 innerDesc = ExecGetResultType(innerPlanState(hjstate));
644 * Initialize result slot, type and projection.
646 ExecInitResultTupleSlotTL(estate, &hjstate->js.ps);
647 ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
650 * tuple table initialization
652 hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc);
655 * detect whether we need only consider the first matching inner tuple
657 hjstate->js.single_match = (node->join.inner_unique ||
658 node->join.jointype == JOIN_SEMI);
660 /* set up null tuples for outer joins, if needed */
661 switch (node->join.jointype)
668 hjstate->hj_NullInnerTupleSlot =
669 ExecInitNullTupleSlot(estate, innerDesc);
672 hjstate->hj_NullOuterTupleSlot =
673 ExecInitNullTupleSlot(estate, outerDesc);
676 hjstate->hj_NullOuterTupleSlot =
677 ExecInitNullTupleSlot(estate, outerDesc);
678 hjstate->hj_NullInnerTupleSlot =
679 ExecInitNullTupleSlot(estate, innerDesc);
682 elog(ERROR, "unrecognized join type: %d",
683 (int) node->join.jointype);
687 * now for some voodoo. our temporary tuple slot is actually the result
688 * tuple slot of the Hash node (which is our inner plan). we can do this
689 * because Hash nodes don't return tuples via ExecProcNode() -- instead
690 * the hash join node uses ExecScanHashBucket() to get at the contents of
691 * the hash table. -cim 6/9/91
694 HashState *hashstate = (HashState *) innerPlanState(hjstate);
695 TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
697 hjstate->hj_HashTupleSlot = slot;
701 * initialize child expressions
703 hjstate->js.ps.qual =
704 ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
705 hjstate->js.joinqual =
706 ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
707 hjstate->hashclauses =
708 ExecInitQual(node->hashclauses, (PlanState *) hjstate);
711 * initialize hash-specific info
713 hjstate->hj_HashTable = NULL;
714 hjstate->hj_FirstOuterTupleSlot = NULL;
716 hjstate->hj_CurHashValue = 0;
717 hjstate->hj_CurBucketNo = 0;
718 hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
719 hjstate->hj_CurTuple = NULL;
722 * Deconstruct the hash clauses into outer and inner argument values, so
723 * that we can evaluate those subexpressions separately. Also make a list
724 * of the hash operator OIDs, in preparation for looking up the hash
730 foreach(l, node->hashclauses)
732 OpExpr *hclause = lfirst_node(OpExpr, l);
734 lclauses = lappend(lclauses, ExecInitExpr(linitial(hclause->args),
735 (PlanState *) hjstate));
736 rclauses = lappend(rclauses, ExecInitExpr(lsecond(hclause->args),
737 (PlanState *) hjstate));
738 hoperators = lappend_oid(hoperators, hclause->opno);
740 hjstate->hj_OuterHashKeys = lclauses;
741 hjstate->hj_InnerHashKeys = rclauses;
742 hjstate->hj_HashOperators = hoperators;
743 /* child Hash node needs to evaluate inner hash keys, too */
744 ((HashState *) innerPlanState(hjstate))->hashkeys = rclauses;
746 hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
747 hjstate->hj_MatchedOuter = false;
748 hjstate->hj_OuterNotEmpty = false;
753 /* ----------------------------------------------------------------
756 * clean up routine for HashJoin node
757 * ----------------------------------------------------------------
760 ExecEndHashJoin(HashJoinState *node)
765 if (node->hj_HashTable)
767 ExecHashTableDestroy(node->hj_HashTable);
768 node->hj_HashTable = NULL;
772 * Free the exprcontext
774 ExecFreeExprContext(&node->js.ps);
777 * clean out the tuple table
779 ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
780 ExecClearTuple(node->hj_OuterTupleSlot);
781 ExecClearTuple(node->hj_HashTupleSlot);
786 ExecEndNode(outerPlanState(node));
787 ExecEndNode(innerPlanState(node));
791 * ExecHashJoinOuterGetTuple
793 * get the next outer tuple for a parallel oblivious hashjoin: either by
794 * executing the outer plan node in the first pass, or from the temp
795 * files for the hashjoin batches.
797 * Returns a null slot if no more outer tuples (within the current batch).
799 * On success, the tuple's hash value is stored at *hashvalue --- this is
800 * either originally computed, or re-read from the temp file.
802 static TupleTableSlot *
803 ExecHashJoinOuterGetTuple(PlanState *outerNode,
804 HashJoinState *hjstate,
807 HashJoinTable hashtable = hjstate->hj_HashTable;
808 int curbatch = hashtable->curbatch;
809 TupleTableSlot *slot;
811 if (curbatch == 0) /* if it is the first pass */
814 * Check to see if first outer tuple was already fetched by
815 * ExecHashJoin() and not used yet.
817 slot = hjstate->hj_FirstOuterTupleSlot;
818 if (!TupIsNull(slot))
819 hjstate->hj_FirstOuterTupleSlot = NULL;
821 slot = ExecProcNode(outerNode);
823 while (!TupIsNull(slot))
826 * We have to compute the tuple's hash value.
828 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
830 econtext->ecxt_outertuple = slot;
831 if (ExecHashGetHashValue(hashtable, econtext,
832 hjstate->hj_OuterHashKeys,
833 true, /* outer tuple */
834 HJ_FILL_OUTER(hjstate),
837 /* remember outer relation is not empty for possible rescan */
838 hjstate->hj_OuterNotEmpty = true;
844 * That tuple couldn't match because of a NULL, so discard it and
845 * continue with the next one.
847 slot = ExecProcNode(outerNode);
850 else if (curbatch < hashtable->nbatch)
852 BufFile *file = hashtable->outerBatchFile[curbatch];
855 * In outer-join cases, we could get here even though the batch file
861 slot = ExecHashJoinGetSavedTuple(hjstate,
864 hjstate->hj_OuterTupleSlot);
865 if (!TupIsNull(slot))
869 /* End of this batch */
874 * ExecHashJoinOuterGetTuple variant for the parallel case.
876 static TupleTableSlot *
877 ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
878 HashJoinState *hjstate,
881 HashJoinTable hashtable = hjstate->hj_HashTable;
882 int curbatch = hashtable->curbatch;
883 TupleTableSlot *slot;
886 * In the Parallel Hash case we only run the outer plan directly for
887 * single-batch hash joins. Otherwise we have to go to batch files, even
890 if (curbatch == 0 && hashtable->nbatch == 1)
892 slot = ExecProcNode(outerNode);
894 while (!TupIsNull(slot))
896 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
898 econtext->ecxt_outertuple = slot;
899 if (ExecHashGetHashValue(hashtable, econtext,
900 hjstate->hj_OuterHashKeys,
901 true, /* outer tuple */
902 HJ_FILL_OUTER(hjstate),
907 * That tuple couldn't match because of a NULL, so discard it and
908 * continue with the next one.
910 slot = ExecProcNode(outerNode);
913 else if (curbatch < hashtable->nbatch)
917 tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
921 slot = ExecStoreMinimalTuple(tuple,
922 hjstate->hj_OuterTupleSlot,
927 ExecClearTuple(hjstate->hj_OuterTupleSlot);
930 /* End of this batch */
935 * ExecHashJoinNewBatch
936 * switch to a new hashjoin batch
938 * Returns true if successful, false if there are no more batches.
941 ExecHashJoinNewBatch(HashJoinState *hjstate)
943 HashJoinTable hashtable = hjstate->hj_HashTable;
947 TupleTableSlot *slot;
950 nbatch = hashtable->nbatch;
951 curbatch = hashtable->curbatch;
956 * We no longer need the previous outer batch file; close it right
957 * away to free disk space.
959 if (hashtable->outerBatchFile[curbatch])
960 BufFileClose(hashtable->outerBatchFile[curbatch]);
961 hashtable->outerBatchFile[curbatch] = NULL;
963 else /* we just finished the first batch */
966 * Reset some of the skew optimization state variables, since we no
967 * longer need to consider skew tuples after the first batch. The
968 * memory context reset we are about to do will release the skew
971 hashtable->skewEnabled = false;
972 hashtable->skewBucket = NULL;
973 hashtable->skewBucketNums = NULL;
974 hashtable->nSkewBuckets = 0;
975 hashtable->spaceUsedSkew = 0;
979 * We can always skip over any batches that are completely empty on both
980 * sides. We can sometimes skip over batches that are empty on only one
981 * side, but there are exceptions:
983 * 1. In a left/full outer join, we have to process outer batches even if
984 * the inner batch is empty. Similarly, in a right/full outer join, we
985 * have to process inner batches even if the outer batch is empty.
987 * 2. If we have increased nbatch since the initial estimate, we have to
988 * scan inner batches since they might contain tuples that need to be
989 * reassigned to later inner batches.
991 * 3. Similarly, if we have increased nbatch since starting the outer
992 * scan, we have to rescan outer batches in case they contain tuples that
993 * need to be reassigned.
996 while (curbatch < nbatch &&
997 (hashtable->outerBatchFile[curbatch] == NULL ||
998 hashtable->innerBatchFile[curbatch] == NULL))
1000 if (hashtable->outerBatchFile[curbatch] &&
1001 HJ_FILL_OUTER(hjstate))
1002 break; /* must process due to rule 1 */
1003 if (hashtable->innerBatchFile[curbatch] &&
1004 HJ_FILL_INNER(hjstate))
1005 break; /* must process due to rule 1 */
1006 if (hashtable->innerBatchFile[curbatch] &&
1007 nbatch != hashtable->nbatch_original)
1008 break; /* must process due to rule 2 */
1009 if (hashtable->outerBatchFile[curbatch] &&
1010 nbatch != hashtable->nbatch_outstart)
1011 break; /* must process due to rule 3 */
1012 /* We can ignore this batch. */
1013 /* Release associated temp files right away. */
1014 if (hashtable->innerBatchFile[curbatch])
1015 BufFileClose(hashtable->innerBatchFile[curbatch]);
1016 hashtable->innerBatchFile[curbatch] = NULL;
1017 if (hashtable->outerBatchFile[curbatch])
1018 BufFileClose(hashtable->outerBatchFile[curbatch]);
1019 hashtable->outerBatchFile[curbatch] = NULL;
1023 if (curbatch >= nbatch)
1024 return false; /* no more batches */
1026 hashtable->curbatch = curbatch;
1029 * Reload the hash table with the new inner batch (which could be empty)
1031 ExecHashTableReset(hashtable);
1033 innerFile = hashtable->innerBatchFile[curbatch];
1035 if (innerFile != NULL)
1037 if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
1039 (errcode_for_file_access(),
1040 errmsg("could not rewind hash-join temporary file: %m")));
1042 while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1045 hjstate->hj_HashTupleSlot)))
1048 * NOTE: some tuples may be sent to future batches. Also, it is
1049 * possible for hashtable->nbatch to be increased here!
1051 ExecHashTableInsert(hashtable, slot, hashvalue);
1055 * after we build the hash table, the inner batch file is no longer
1058 BufFileClose(innerFile);
1059 hashtable->innerBatchFile[curbatch] = NULL;
1063 * Rewind outer batch file (if present), so that we can start reading it.
1065 if (hashtable->outerBatchFile[curbatch] != NULL)
1067 if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
1069 (errcode_for_file_access(),
1070 errmsg("could not rewind hash-join temporary file: %m")));
1077 * Choose a batch to work on, and attach to it. Returns true if successful,
1078 * false if there are no more batches.
1081 ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1083 HashJoinTable hashtable = hjstate->hj_HashTable;
1088 * If we started up so late that the batch tracking array has been freed
1089 * already by ExecHashTableDetach(), then we are finished. See also
1090 * ExecParallelHashEnsureBatchAccessors().
1092 if (hashtable->batches == NULL)
1096 * If we were already attached to a batch, remember not to bother checking
1097 * it again, and detach from it (possibly freeing the hash table if we are
1100 if (hashtable->curbatch >= 0)
1102 hashtable->batches[hashtable->curbatch].done = true;
1103 ExecHashTableDetachBatch(hashtable);
1107 * Search for a batch that isn't done. We use an atomic counter to start
1108 * our search at a different batch in every participant when there are
1109 * more batches than participants.
1111 batchno = start_batchno =
1112 pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1118 TupleTableSlot *slot;
1120 if (!hashtable->batches[batchno].done)
1122 SharedTuplestoreAccessor *inner_tuples;
1123 Barrier *batch_barrier =
1124 &hashtable->batches[batchno].shared->batch_barrier;
1126 switch (BarrierAttach(batch_barrier))
1128 case PHJ_BATCH_ELECTING:
1130 /* One backend allocates the hash table. */
1131 if (BarrierArriveAndWait(batch_barrier,
1132 WAIT_EVENT_HASH_BATCH_ELECTING))
1133 ExecParallelHashTableAlloc(hashtable, batchno);
1136 case PHJ_BATCH_ALLOCATING:
1137 /* Wait for allocation to complete. */
1138 BarrierArriveAndWait(batch_barrier,
1139 WAIT_EVENT_HASH_BATCH_ALLOCATING);
1142 case PHJ_BATCH_LOADING:
1143 /* Start (or join in) loading tuples. */
1144 ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1145 inner_tuples = hashtable->batches[batchno].inner_tuples;
1146 sts_begin_parallel_scan(inner_tuples);
1147 while ((tuple = sts_parallel_scan_next(inner_tuples,
1150 slot = ExecStoreMinimalTuple(tuple,
1151 hjstate->hj_HashTupleSlot,
1153 ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1156 sts_end_parallel_scan(inner_tuples);
1157 BarrierArriveAndWait(batch_barrier,
1158 WAIT_EVENT_HASH_BATCH_LOADING);
1161 case PHJ_BATCH_PROBING:
1164 * This batch is ready to probe. Return control to
1165 * caller. We stay attached to batch_barrier so that the
1166 * hash table stays alive until everyone's finished
1167 * probing it, but no participant is allowed to wait at
1168 * this barrier again (or else a deadlock could occur).
1169 * All attached participants must eventually call
1170 * BarrierArriveAndDetach() so that the final phase
1171 * PHJ_BATCH_DONE can be reached.
1173 ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1174 sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1177 case PHJ_BATCH_DONE:
1180 * Already done. Detach and go around again (if any
1183 BarrierDetach(batch_barrier);
1184 hashtable->batches[batchno].done = true;
1185 hashtable->curbatch = -1;
1189 elog(ERROR, "unexpected batch phase %d",
1190 BarrierPhase(batch_barrier));
1193 batchno = (batchno + 1) % hashtable->nbatch;
1194 } while (batchno != start_batchno);
1200 * ExecHashJoinSaveTuple
1201 * save a tuple to a batch file.
1203 * The data recorded in the file for each tuple is its hash value,
1204 * then the tuple in MinimalTuple format.
1206 * Note: it is important always to call this in the regular executor
1207 * context, not in a shorter-lived context; else the temp file buffers
1208 * will get messed up.
1211 ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1214 BufFile *file = *fileptr;
1219 /* First write to this batch file, so open it. */
1220 file = BufFileCreateTemp(false);
1224 written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
1225 if (written != sizeof(uint32))
1227 (errcode_for_file_access(),
1228 errmsg("could not write to hash-join temporary file: %m")));
1230 written = BufFileWrite(file, (void *) tuple, tuple->t_len);
1231 if (written != tuple->t_len)
1233 (errcode_for_file_access(),
1234 errmsg("could not write to hash-join temporary file: %m")));
1238 * ExecHashJoinGetSavedTuple
1239 * read the next tuple from a batch file. Return NULL if no more.
1241 * On success, *hashvalue is set to the tuple's hash value, and the tuple
1242 * itself is stored in the given slot.
1244 static TupleTableSlot *
1245 ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1248 TupleTableSlot *tupleSlot)
1255 * We check for interrupts here because this is typically taken as an
1256 * alternative code path to an ExecProcNode() call, which would include
1259 CHECK_FOR_INTERRUPTS();
1262 * Since both the hash value and the MinimalTuple length word are uint32,
1263 * we can read them both in one BufFileRead() call without any type
1266 nread = BufFileRead(file, (void *) header, sizeof(header));
1267 if (nread == 0) /* end of file */
1269 ExecClearTuple(tupleSlot);
1272 if (nread != sizeof(header))
1274 (errcode_for_file_access(),
1275 errmsg("could not read from hash-join temporary file: %m")));
1276 *hashvalue = header[0];
1277 tuple = (MinimalTuple) palloc(header[1]);
1278 tuple->t_len = header[1];
1279 nread = BufFileRead(file,
1280 (void *) ((char *) tuple + sizeof(uint32)),
1281 header[1] - sizeof(uint32));
1282 if (nread != header[1] - sizeof(uint32))
1284 (errcode_for_file_access(),
1285 errmsg("could not read from hash-join temporary file: %m")));
1286 return ExecStoreMinimalTuple(tuple, tupleSlot, true);
1291 ExecReScanHashJoin(HashJoinState *node)
1294 * In a multi-batch join, we currently have to do rescans the hard way,
1295 * primarily because batch temp files may have already been released. But
1296 * if it's a single-batch join, and there is no parameter change for the
1297 * inner subnode, then we can just re-use the existing hash table without
1300 if (node->hj_HashTable != NULL)
1302 if (node->hj_HashTable->nbatch == 1 &&
1303 node->js.ps.righttree->chgParam == NULL)
1306 * Okay to reuse the hash table; needn't rescan inner, either.
1308 * However, if it's a right/full join, we'd better reset the
1309 * inner-tuple match flags contained in the table.
1311 if (HJ_FILL_INNER(node))
1312 ExecHashTableResetMatchFlags(node->hj_HashTable);
1315 * Also, we need to reset our state about the emptiness of the
1316 * outer relation, so that the new scan of the outer will update
1317 * it correctly if it turns out to be empty this time. (There's no
1318 * harm in clearing it now because ExecHashJoin won't need the
1319 * info. In the other cases, where the hash table doesn't exist
1320 * or we are destroying it, we leave this state alone because
1321 * ExecHashJoin will need it the first time through.)
1323 node->hj_OuterNotEmpty = false;
1325 /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1326 node->hj_JoinState = HJ_NEED_NEW_OUTER;
1330 /* must destroy and rebuild hash table */
1331 ExecHashTableDestroy(node->hj_HashTable);
1332 node->hj_HashTable = NULL;
1333 node->hj_JoinState = HJ_BUILD_HASHTABLE;
1336 * if chgParam of subnode is not null then plan will be re-scanned
1337 * by first ExecProcNode.
1339 if (node->js.ps.righttree->chgParam == NULL)
1340 ExecReScan(node->js.ps.righttree);
1344 /* Always reset intra-tuple state */
1345 node->hj_CurHashValue = 0;
1346 node->hj_CurBucketNo = 0;
1347 node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1348 node->hj_CurTuple = NULL;
1350 node->hj_MatchedOuter = false;
1351 node->hj_FirstOuterTupleSlot = NULL;
1354 * if chgParam of subnode is not null then plan will be re-scanned by
1355 * first ExecProcNode.
1357 if (node->js.ps.lefttree->chgParam == NULL)
1358 ExecReScan(node->js.ps.lefttree);
1362 ExecShutdownHashJoin(HashJoinState *node)
1364 if (node->hj_HashTable)
1367 * Detach from shared state before DSM memory goes away. This makes
1368 * sure that we don't have any pointers into DSM memory by the time
1369 * ExecEndHashJoin runs.
1371 ExecHashTableDetachBatch(node->hj_HashTable);
1372 ExecHashTableDetach(node->hj_HashTable);
1377 ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1379 PlanState *outerState = outerPlanState(hjstate);
1380 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1381 HashJoinTable hashtable = hjstate->hj_HashTable;
1382 TupleTableSlot *slot;
1386 Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1388 /* Execute outer plan, writing all tuples to shared tuplestores. */
1391 slot = ExecProcNode(outerState);
1392 if (TupIsNull(slot))
1394 econtext->ecxt_outertuple = slot;
1395 if (ExecHashGetHashValue(hashtable, econtext,
1396 hjstate->hj_OuterHashKeys,
1397 true, /* outer tuple */
1398 false, /* outer join, currently unsupported */
1404 ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1406 sts_puttuple(hashtable->batches[batchno].outer_tuples,
1407 &hashvalue, ExecFetchSlotMinimalTuple(slot));
1409 CHECK_FOR_INTERRUPTS();
1412 /* Make sure all outer partitions are readable by any backend. */
1413 for (i = 0; i < hashtable->nbatch; ++i)
1414 sts_end_write(hashtable->batches[i].outer_tuples);
1418 ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1420 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1421 shm_toc_estimate_keys(&pcxt->estimator, 1);
1425 ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1427 int plan_node_id = state->js.ps.plan->plan_node_id;
1428 HashState *hashNode;
1429 ParallelHashJoinState *pstate;
1432 * Disable shared hash table mode if we failed to create a real DSM
1433 * segment, because that means that we don't have a DSA area to work with.
1435 if (pcxt->seg == NULL)
1438 ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1441 * Set up the state needed to coordinate access to the shared hash
1442 * table(s), using the plan node ID as the toc key.
1444 pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1445 shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1448 * Set up the shared hash join state with no batches initially.
1449 * ExecHashTableCreate() will prepare at least one later and set nbatch
1450 * and space_allowed.
1453 pstate->space_allowed = 0;
1454 pstate->batches = InvalidDsaPointer;
1455 pstate->old_batches = InvalidDsaPointer;
1456 pstate->nbuckets = 0;
1457 pstate->growth = PHJ_GROWTH_OK;
1458 pstate->chunk_work_queue = InvalidDsaPointer;
1459 pg_atomic_init_u32(&pstate->distributor, 0);
1460 pstate->nparticipants = pcxt->nworkers + 1;
1461 pstate->total_tuples = 0;
1462 LWLockInitialize(&pstate->lock,
1463 LWTRANCHE_PARALLEL_HASH_JOIN);
1464 BarrierInit(&pstate->build_barrier, 0);
1465 BarrierInit(&pstate->grow_batches_barrier, 0);
1466 BarrierInit(&pstate->grow_buckets_barrier, 0);
1468 /* Set up the space we'll use for shared temporary files. */
1469 SharedFileSetInit(&pstate->fileset, pcxt->seg);
1471 /* Initialize the shared state in the hash node. */
1472 hashNode = (HashState *) innerPlanState(state);
1473 hashNode->parallel_state = pstate;
1476 /* ----------------------------------------------------------------
1477 * ExecHashJoinReInitializeDSM
1479 * Reset shared state before beginning a fresh scan.
1480 * ----------------------------------------------------------------
1483 ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
1485 int plan_node_id = state->js.ps.plan->plan_node_id;
1486 ParallelHashJoinState *pstate =
1487 shm_toc_lookup(cxt->toc, plan_node_id, false);
1490 * It would be possible to reuse the shared hash table in single-batch
1491 * cases by resetting and then fast-forwarding build_barrier to
1492 * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
1493 * currently shared hash tables are already freed by now (by the last
1494 * participant to detach from the batch). We could consider keeping it
1495 * around for single-batch joins. We'd also need to adjust
1496 * finalize_plan() so that it doesn't record a dummy dependency for
1497 * Parallel Hash nodes, preventing the rescan optimization. For now we
1501 /* Detach, freeing any remaining shared memory. */
1502 if (state->hj_HashTable != NULL)
1504 ExecHashTableDetachBatch(state->hj_HashTable);
1505 ExecHashTableDetach(state->hj_HashTable);
1508 /* Clear any shared batch files. */
1509 SharedFileSetDeleteAll(&pstate->fileset);
1511 /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
1512 BarrierInit(&pstate->build_barrier, 0);
1516 ExecHashJoinInitializeWorker(HashJoinState *state,
1517 ParallelWorkerContext *pwcxt)
1519 HashState *hashNode;
1520 int plan_node_id = state->js.ps.plan->plan_node_id;
1521 ParallelHashJoinState *pstate =
1522 shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1524 /* Attach to the space for shared temporary files. */
1525 SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1527 /* Attach to the shared state in the hash node. */
1528 hashNode = (HashState *) innerPlanState(state);
1529 hashNode->parallel_state = pstate;
1531 ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);