1 /*-------------------------------------------------------------------------
4 * Routines to handle hash join nodes
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/executor/nodeHashjoin.c
15 * Hash joins can participate in parallel query execution in several ways. A
16 * parallel-oblivious hash join is one where the node is unaware that it is
17 * part of a parallel plan. In this case, a copy of the inner plan is used to
18 * build a copy of the hash table in every backend, and the outer plan could
19 * either be built from a partial or complete path, so that the results of the
20 * hash join are correspondingly either partial or complete. A parallel-aware
21 * hash join is one that behaves differently, coordinating work between
22 * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
23 * Hash Join always appears with a Parallel Hash node.
25 * Parallel-aware hash joins use the same per-backend state machine to track
26 * progress through the hash join algorithm as parallel-oblivious hash joins.
27 * In a parallel-aware hash join, there is also a shared state machine that
28 * co-operating backends use to synchronize their local state machines and
29 * program counters. The shared state machine is managed with a Barrier IPC
30 * primitive. When all attached participants arrive at a barrier, the phase
31 * advances and all waiting participants are released.
33 * When a participant begins working on a parallel hash join, it must first
34 * figure out how much progress has already been made, because participants
35 * don't wait for each other to begin. For this reason there are switch
36 * statements at key points in the code where we have to synchronize our local
37 * state machine with the phase, and then jump to the correct part of the
38 * algorithm so that we can get started.
40 * One barrier called build_barrier is used to coordinate the hashing phases.
41 * The phase is represented by an integer which begins at zero and increments
42 * one by one, but in the code it is referred to by symbolic names as follows:
44 * PHJ_BUILD_ELECTING -- initial state
45 * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
46 * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
47 * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
48 * PHJ_BUILD_DONE -- building done, probing can begin
50 * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
51 * be used repeatedly as required to coordinate expansions in the number of
52 * batches or buckets. Their phases are as follows:
54 * PHJ_GROW_BATCHES_ELECTING -- initial state
55 * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
56 * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
57 * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
59 * PHJ_GROW_BUCKETS_ELECTING -- initial state
60 * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
61 * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
63 * If the planner got the number of batches and buckets right, those won't be
64 * necessary, but on the other hand we might finish up needing to expand the
65 * buckets or batches multiple times while hashing the inner relation to stay
66 * within our memory budget and load factor target. For that reason it's a
67 * separate pair of barriers using circular phases.
69 * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
70 * because we need to divide the outer relation into batches up front in order
71 * to be able to process batches entirely independently. In contrast, the
72 * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
73 * batches whenever it encounters them while scanning and probing, which it
74 * can do because it processes batches in serial order.
76 * Once PHJ_BUILD_DONE is reached, backends then split up and process
77 * different batches, or gang up and work together on probing batches if there
78 * aren't enough to go around. For each batch there is a separate barrier
79 * with the following phases:
81 * PHJ_BATCH_ELECTING -- initial state
82 * PHJ_BATCH_ALLOCATING -- one allocates buckets
83 * PHJ_BATCH_LOADING -- all load the hash table from disk
84 * PHJ_BATCH_PROBING -- all probe
85 * PHJ_BATCH_DONE -- end
87 * Batch 0 is a special case, because it starts out in phase
88 * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
89 * PHJ_BUILD_HASHING_INNER so we can skip loading.
91 * Initially we try to plan for a single-batch hash join using the combined
92 * work_mem of all participants to create a large shared hash table. If that
93 * turns out either at planning or execution time to be impossible then we
94 * fall back to regular work_mem sized hash tables.
96 * To avoid deadlocks, we never wait for any barrier unless it is known that
97 * all other backends attached to it are actively executing the node or have
98 * already arrived. Practically, that means that we never return a tuple
99 * while attached to a barrier, unless the barrier has reached its final
100 * state. In the slightly special case of the per-batch barrier, we return
101 * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
102 * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
104 *-------------------------------------------------------------------------
107 #include "postgres.h"
109 #include "access/htup_details.h"
110 #include "access/parallel.h"
111 #include "executor/executor.h"
112 #include "executor/hashjoin.h"
113 #include "executor/nodeHash.h"
114 #include "executor/nodeHashjoin.h"
115 #include "miscadmin.h"
117 #include "utils/memutils.h"
118 #include "utils/sharedtuplestore.h"
122 * States of the ExecHashJoin state machine
124 #define HJ_BUILD_HASHTABLE 1
125 #define HJ_NEED_NEW_OUTER 2
126 #define HJ_SCAN_BUCKET 3
127 #define HJ_FILL_OUTER_TUPLE 4
128 #define HJ_FILL_INNER_TUPLES 5
129 #define HJ_NEED_NEW_BATCH 6
131 /* Returns true if doing null-fill on outer relation */
132 #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
133 /* Returns true if doing null-fill on inner relation */
134 #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
136 static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
137 HashJoinState *hjstate,
139 static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
140 HashJoinState *hjstate,
142 static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
145 TupleTableSlot *tupleSlot);
146 static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
147 static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
148 static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
151 /* ----------------------------------------------------------------
154 * This function implements the Hybrid Hashjoin algorithm. It is marked
155 * with an always-inline attribute so that ExecHashJoin() and
156 * ExecParallelHashJoin() can inline it. Compilers that respect the
157 * attribute should create versions specialized for parallel == true and
158 * parallel == false with unnecessary branches removed.
160 * Note: the relation we build hash table on is the "inner"
161 * the other one is "outer".
162 * ----------------------------------------------------------------
164 static pg_attribute_always_inline TupleTableSlot *
165 ExecHashJoinImpl(PlanState *pstate, bool parallel)
167 HashJoinState *node = castNode(HashJoinState, pstate);
168 PlanState *outerNode;
171 ExprState *otherqual;
172 ExprContext *econtext;
173 HashJoinTable hashtable;
174 TupleTableSlot *outerTupleSlot;
177 ParallelHashJoinState *parallel_state;
180 * get information from HashJoin node
182 joinqual = node->js.joinqual;
183 otherqual = node->js.ps.qual;
184 hashNode = (HashState *) innerPlanState(node);
185 outerNode = outerPlanState(node);
186 hashtable = node->hj_HashTable;
187 econtext = node->js.ps.ps_ExprContext;
188 parallel_state = hashNode->parallel_state;
191 * Reset per-tuple memory context to free any expression evaluation
192 * storage allocated in the previous tuple cycle.
194 ResetExprContext(econtext);
197 * run the hash join state machine
202 * It's possible to iterate this loop many times before returning a
203 * tuple, in some pathological cases such as needing to move much of
204 * the current batch to a later batch. So let's check for interrupts
207 CHECK_FOR_INTERRUPTS();
209 switch (node->hj_JoinState)
211 case HJ_BUILD_HASHTABLE:
214 * First time through: build hash table for inner relation.
216 Assert(hashtable == NULL);
219 * If the outer relation is completely empty, and it's not
220 * right/full join, we can quit without building the hash
221 * table. However, for an inner join it is only a win to
222 * check this when the outer relation's startup cost is less
223 * than the projected cost of building the hash table.
224 * Otherwise it's best to build the hash table first and see
225 * if the inner relation is empty. (When it's a left join, we
226 * should always make this check, since we aren't going to be
227 * able to skip the join on the strength of an empty inner
230 * If we are rescanning the join, we make use of information
231 * gained on the previous scan: don't bother to try the
232 * prefetch if the previous scan found the outer relation
233 * nonempty. This is not 100% reliable since with new
234 * parameters the outer relation might yield different
235 * results, but it's a good heuristic.
237 * The only way to make the check is to try to fetch a tuple
238 * from the outer plan node. If we succeed, we have to stash
239 * it away for later consumption by ExecHashJoinOuterGetTuple.
241 if (HJ_FILL_INNER(node))
243 /* no chance to not build the hash table */
244 node->hj_FirstOuterTupleSlot = NULL;
249 * The empty-outer optimization is not implemented for
250 * shared hash tables, because no one participant can
251 * determine that there are no outer tuples, and it's not
252 * yet clear that it's worth the synchronization overhead
253 * of reaching consensus to figure that out. So we have
254 * to build the hash table.
256 node->hj_FirstOuterTupleSlot = NULL;
258 else if (HJ_FILL_OUTER(node) ||
259 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
260 !node->hj_OuterNotEmpty))
262 node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
263 if (TupIsNull(node->hj_FirstOuterTupleSlot))
265 node->hj_OuterNotEmpty = false;
269 node->hj_OuterNotEmpty = true;
272 node->hj_FirstOuterTupleSlot = NULL;
275 * Create the hash table. If using Parallel Hash, then
276 * whoever gets here first will create the hash table and any
277 * later arrivals will merely attach to it.
279 hashtable = ExecHashTableCreate(hashNode,
280 node->hj_HashOperators,
281 HJ_FILL_INNER(node));
282 node->hj_HashTable = hashtable;
285 * Execute the Hash node, to build the hash table. If using
286 * Parallel Hash, then we'll try to help hashing unless we
289 hashNode->hashtable = hashtable;
290 (void) MultiExecProcNode((PlanState *) hashNode);
293 * If the inner relation is completely empty, and we're not
294 * doing a left outer join, we can quit without scanning the
297 if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
301 * need to remember whether nbatch has increased since we
302 * began scanning the outer relation
304 hashtable->nbatch_outstart = hashtable->nbatch;
307 * Reset OuterNotEmpty for scan. (It's OK if we fetched a
308 * tuple above, because ExecHashJoinOuterGetTuple will
309 * immediately set it again.)
311 node->hj_OuterNotEmpty = false;
315 Barrier *build_barrier;
317 build_barrier = ¶llel_state->build_barrier;
318 Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
319 BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
320 if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
323 * If multi-batch, we need to hash the outer relation
326 if (hashtable->nbatch > 1)
327 ExecParallelHashJoinPartitionOuter(node);
328 BarrierArriveAndWait(build_barrier,
329 WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
331 Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
333 /* Each backend should now select a batch to work on. */
334 hashtable->curbatch = -1;
335 node->hj_JoinState = HJ_NEED_NEW_BATCH;
340 node->hj_JoinState = HJ_NEED_NEW_OUTER;
344 case HJ_NEED_NEW_OUTER:
347 * We don't have an outer tuple, try to get the next one
351 ExecParallelHashJoinOuterGetTuple(outerNode, node,
355 ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
357 if (TupIsNull(outerTupleSlot))
359 /* end of batch, or maybe whole join */
360 if (HJ_FILL_INNER(node))
362 /* set up to scan for unmatched inner tuples */
363 ExecPrepHashTableForUnmatched(node);
364 node->hj_JoinState = HJ_FILL_INNER_TUPLES;
367 node->hj_JoinState = HJ_NEED_NEW_BATCH;
371 econtext->ecxt_outertuple = outerTupleSlot;
372 node->hj_MatchedOuter = false;
375 * Find the corresponding bucket for this tuple in the main
376 * hash table or skew hash table.
378 node->hj_CurHashValue = hashvalue;
379 ExecHashGetBucketAndBatch(hashtable, hashvalue,
380 &node->hj_CurBucketNo, &batchno);
381 node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
383 node->hj_CurTuple = NULL;
386 * The tuple might not belong to the current batch (where
387 * "current batch" includes the skew buckets if any).
389 if (batchno != hashtable->curbatch &&
390 node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
393 MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
397 * Need to postpone this outer tuple to a later batch.
398 * Save it in the corresponding outer-batch file.
400 Assert(parallel_state == NULL);
401 Assert(batchno > hashtable->curbatch);
402 ExecHashJoinSaveTuple(mintuple, hashvalue,
403 &hashtable->outerBatchFile[batchno]);
406 heap_free_minimal_tuple(mintuple);
408 /* Loop around, staying in HJ_NEED_NEW_OUTER state */
412 /* OK, let's scan the bucket for matches */
413 node->hj_JoinState = HJ_SCAN_BUCKET;
420 * Scan the selected hash bucket for matches to current outer
424 if (!ExecParallelScanHashBucket(node, econtext))
426 /* out of matches; check for possible outer-join fill */
427 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
433 if (!ExecScanHashBucket(node, econtext))
435 /* out of matches; check for possible outer-join fill */
436 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
442 * We've got a match, but still need to test non-hashed quals.
443 * ExecScanHashBucket already set up all the state needed to
446 * If we pass the qual, then save state for next call and have
447 * ExecProject form the projection, store it in the tuple
448 * table, and return the slot.
450 * Only the joinquals determine tuple match status, but all
451 * quals must pass to actually return the tuple.
453 if (joinqual == NULL || ExecQual(joinqual, econtext))
455 node->hj_MatchedOuter = true;
456 HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
458 /* In an antijoin, we never return a matched tuple */
459 if (node->js.jointype == JOIN_ANTI)
461 node->hj_JoinState = HJ_NEED_NEW_OUTER;
466 * If we only need to join to the first matching inner
467 * tuple, then consider returning this one, but after that
468 * continue with next outer tuple.
470 if (node->js.single_match)
471 node->hj_JoinState = HJ_NEED_NEW_OUTER;
473 if (otherqual == NULL || ExecQual(otherqual, econtext))
474 return ExecProject(node->js.ps.ps_ProjInfo);
476 InstrCountFiltered2(node, 1);
479 InstrCountFiltered1(node, 1);
482 case HJ_FILL_OUTER_TUPLE:
485 * The current outer tuple has run out of matches, so check
486 * whether to emit a dummy outer-join tuple. Whether we emit
487 * one or not, the next state is NEED_NEW_OUTER.
489 node->hj_JoinState = HJ_NEED_NEW_OUTER;
491 if (!node->hj_MatchedOuter &&
495 * Generate a fake join tuple with nulls for the inner
496 * tuple, and return it if it passes the non-join quals.
498 econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
500 if (otherqual == NULL || ExecQual(otherqual, econtext))
501 return ExecProject(node->js.ps.ps_ProjInfo);
503 InstrCountFiltered2(node, 1);
507 case HJ_FILL_INNER_TUPLES:
510 * We have finished a batch, but we are doing right/full join,
511 * so any unmatched inner tuples in the hashtable have to be
512 * emitted before we continue to the next batch.
514 if (!ExecScanHashTableForUnmatched(node, econtext))
516 /* no more unmatched tuples */
517 node->hj_JoinState = HJ_NEED_NEW_BATCH;
522 * Generate a fake join tuple with nulls for the outer tuple,
523 * and return it if it passes the non-join quals.
525 econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
527 if (otherqual == NULL || ExecQual(otherqual, econtext))
528 return ExecProject(node->js.ps.ps_ProjInfo);
530 InstrCountFiltered2(node, 1);
533 case HJ_NEED_NEW_BATCH:
536 * Try to advance to next batch. Done if there are no more.
540 if (!ExecParallelHashJoinNewBatch(node))
541 return NULL; /* end of parallel-aware join */
545 if (!ExecHashJoinNewBatch(node))
546 return NULL; /* end of parallel-oblivious join */
548 node->hj_JoinState = HJ_NEED_NEW_OUTER;
552 elog(ERROR, "unrecognized hashjoin state: %d",
553 (int) node->hj_JoinState);
558 /* ----------------------------------------------------------------
561 * Parallel-oblivious version.
562 * ----------------------------------------------------------------
564 static TupleTableSlot * /* return: a tuple or NULL */
565 ExecHashJoin(PlanState *pstate)
568 * On sufficiently smart compilers this should be inlined with the
569 * parallel-aware branches removed.
571 return ExecHashJoinImpl(pstate, false);
574 /* ----------------------------------------------------------------
575 * ExecParallelHashJoin
577 * Parallel-aware version.
578 * ----------------------------------------------------------------
580 static TupleTableSlot * /* return: a tuple or NULL */
581 ExecParallelHashJoin(PlanState *pstate)
584 * On sufficiently smart compilers this should be inlined with the
585 * parallel-oblivious branches removed.
587 return ExecHashJoinImpl(pstate, true);
590 /* ----------------------------------------------------------------
593 * Init routine for HashJoin node.
594 * ----------------------------------------------------------------
597 ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
599 HashJoinState *hjstate;
609 const TupleTableSlotOps *ops;
611 /* check for unsupported flags */
612 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
615 * create state structure
617 hjstate = makeNode(HashJoinState);
618 hjstate->js.ps.plan = (Plan *) node;
619 hjstate->js.ps.state = estate;
622 * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
623 * where this function may be replaced with a parallel version, if we
624 * managed to launch a parallel query.
626 hjstate->js.ps.ExecProcNode = ExecHashJoin;
627 hjstate->js.jointype = node->join.jointype;
630 * Miscellaneous initialization
632 * create expression context for node
634 ExecAssignExprContext(estate, &hjstate->js.ps);
637 * initialize child nodes
639 * Note: we could suppress the REWIND flag for the inner input, which
640 * would amount to betting that the hash will be a single batch. Not
641 * clear if this would be a win or not.
643 outerNode = outerPlan(node);
644 hashNode = (Hash *) innerPlan(node);
646 outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
647 outerDesc = ExecGetResultType(outerPlanState(hjstate));
648 innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
649 innerDesc = ExecGetResultType(innerPlanState(hjstate));
652 * Initialize result slot, type and projection.
654 ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
655 ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
658 * tuple table initialization
660 ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
661 hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
665 * detect whether we need only consider the first matching inner tuple
667 hjstate->js.single_match = (node->join.inner_unique ||
668 node->join.jointype == JOIN_SEMI);
670 /* set up null tuples for outer joins, if needed */
671 switch (node->join.jointype)
678 hjstate->hj_NullInnerTupleSlot =
679 ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
682 hjstate->hj_NullOuterTupleSlot =
683 ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
686 hjstate->hj_NullOuterTupleSlot =
687 ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
688 hjstate->hj_NullInnerTupleSlot =
689 ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
692 elog(ERROR, "unrecognized join type: %d",
693 (int) node->join.jointype);
697 * now for some voodoo. our temporary tuple slot is actually the result
698 * tuple slot of the Hash node (which is our inner plan). we can do this
699 * because Hash nodes don't return tuples via ExecProcNode() -- instead
700 * the hash join node uses ExecScanHashBucket() to get at the contents of
701 * the hash table. -cim 6/9/91
704 HashState *hashstate = (HashState *) innerPlanState(hjstate);
705 TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
707 hjstate->hj_HashTupleSlot = slot;
711 * initialize child expressions
713 hjstate->js.ps.qual =
714 ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
715 hjstate->js.joinqual =
716 ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
717 hjstate->hashclauses =
718 ExecInitQual(node->hashclauses, (PlanState *) hjstate);
721 * initialize hash-specific info
723 hjstate->hj_HashTable = NULL;
724 hjstate->hj_FirstOuterTupleSlot = NULL;
726 hjstate->hj_CurHashValue = 0;
727 hjstate->hj_CurBucketNo = 0;
728 hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
729 hjstate->hj_CurTuple = NULL;
732 * Deconstruct the hash clauses into outer and inner argument values, so
733 * that we can evaluate those subexpressions separately. Also make a list
734 * of the hash operator OIDs, in preparation for looking up the hash
741 foreach(l, node->hashclauses)
743 OpExpr *hclause = lfirst_node(OpExpr, l);
745 lclauses = lappend(lclauses, ExecInitExpr(linitial(hclause->args),
746 (PlanState *) hjstate));
747 rclauses = lappend(rclauses, ExecInitExpr(lsecond(hclause->args),
748 (PlanState *) hjstate));
749 rhclauses = lappend(rhclauses, ExecInitExpr(lsecond(hclause->args),
750 innerPlanState(hjstate)));
751 hoperators = lappend_oid(hoperators, hclause->opno);
753 hjstate->hj_OuterHashKeys = lclauses;
754 hjstate->hj_InnerHashKeys = rclauses;
755 hjstate->hj_HashOperators = hoperators;
756 /* child Hash node needs to evaluate inner hash keys, too */
757 ((HashState *) innerPlanState(hjstate))->hashkeys = rhclauses;
759 hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
760 hjstate->hj_MatchedOuter = false;
761 hjstate->hj_OuterNotEmpty = false;
766 /* ----------------------------------------------------------------
769 * clean up routine for HashJoin node
770 * ----------------------------------------------------------------
773 ExecEndHashJoin(HashJoinState *node)
778 if (node->hj_HashTable)
780 ExecHashTableDestroy(node->hj_HashTable);
781 node->hj_HashTable = NULL;
785 * Free the exprcontext
787 ExecFreeExprContext(&node->js.ps);
790 * clean out the tuple table
792 ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
793 ExecClearTuple(node->hj_OuterTupleSlot);
794 ExecClearTuple(node->hj_HashTupleSlot);
799 ExecEndNode(outerPlanState(node));
800 ExecEndNode(innerPlanState(node));
804 * ExecHashJoinOuterGetTuple
806 * get the next outer tuple for a parallel oblivious hashjoin: either by
807 * executing the outer plan node in the first pass, or from the temp
808 * files for the hashjoin batches.
810 * Returns a null slot if no more outer tuples (within the current batch).
812 * On success, the tuple's hash value is stored at *hashvalue --- this is
813 * either originally computed, or re-read from the temp file.
815 static TupleTableSlot *
816 ExecHashJoinOuterGetTuple(PlanState *outerNode,
817 HashJoinState *hjstate,
820 HashJoinTable hashtable = hjstate->hj_HashTable;
821 int curbatch = hashtable->curbatch;
822 TupleTableSlot *slot;
824 if (curbatch == 0) /* if it is the first pass */
827 * Check to see if first outer tuple was already fetched by
828 * ExecHashJoin() and not used yet.
830 slot = hjstate->hj_FirstOuterTupleSlot;
831 if (!TupIsNull(slot))
832 hjstate->hj_FirstOuterTupleSlot = NULL;
834 slot = ExecProcNode(outerNode);
836 while (!TupIsNull(slot))
839 * We have to compute the tuple's hash value.
841 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
843 econtext->ecxt_outertuple = slot;
844 if (ExecHashGetHashValue(hashtable, econtext,
845 hjstate->hj_OuterHashKeys,
846 true, /* outer tuple */
847 HJ_FILL_OUTER(hjstate),
850 /* remember outer relation is not empty for possible rescan */
851 hjstate->hj_OuterNotEmpty = true;
857 * That tuple couldn't match because of a NULL, so discard it and
858 * continue with the next one.
860 slot = ExecProcNode(outerNode);
863 else if (curbatch < hashtable->nbatch)
865 BufFile *file = hashtable->outerBatchFile[curbatch];
868 * In outer-join cases, we could get here even though the batch file
874 slot = ExecHashJoinGetSavedTuple(hjstate,
877 hjstate->hj_OuterTupleSlot);
878 if (!TupIsNull(slot))
882 /* End of this batch */
887 * ExecHashJoinOuterGetTuple variant for the parallel case.
889 static TupleTableSlot *
890 ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
891 HashJoinState *hjstate,
894 HashJoinTable hashtable = hjstate->hj_HashTable;
895 int curbatch = hashtable->curbatch;
896 TupleTableSlot *slot;
899 * In the Parallel Hash case we only run the outer plan directly for
900 * single-batch hash joins. Otherwise we have to go to batch files, even
903 if (curbatch == 0 && hashtable->nbatch == 1)
905 slot = ExecProcNode(outerNode);
907 while (!TupIsNull(slot))
909 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
911 econtext->ecxt_outertuple = slot;
912 if (ExecHashGetHashValue(hashtable, econtext,
913 hjstate->hj_OuterHashKeys,
914 true, /* outer tuple */
915 HJ_FILL_OUTER(hjstate),
920 * That tuple couldn't match because of a NULL, so discard it and
921 * continue with the next one.
923 slot = ExecProcNode(outerNode);
926 else if (curbatch < hashtable->nbatch)
930 tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
934 ExecForceStoreMinimalTuple(tuple,
935 hjstate->hj_OuterTupleSlot,
937 slot = hjstate->hj_OuterTupleSlot;
941 ExecClearTuple(hjstate->hj_OuterTupleSlot);
944 /* End of this batch */
949 * ExecHashJoinNewBatch
950 * switch to a new hashjoin batch
952 * Returns true if successful, false if there are no more batches.
955 ExecHashJoinNewBatch(HashJoinState *hjstate)
957 HashJoinTable hashtable = hjstate->hj_HashTable;
961 TupleTableSlot *slot;
964 nbatch = hashtable->nbatch;
965 curbatch = hashtable->curbatch;
970 * We no longer need the previous outer batch file; close it right
971 * away to free disk space.
973 if (hashtable->outerBatchFile[curbatch])
974 BufFileClose(hashtable->outerBatchFile[curbatch]);
975 hashtable->outerBatchFile[curbatch] = NULL;
977 else /* we just finished the first batch */
980 * Reset some of the skew optimization state variables, since we no
981 * longer need to consider skew tuples after the first batch. The
982 * memory context reset we are about to do will release the skew
985 hashtable->skewEnabled = false;
986 hashtable->skewBucket = NULL;
987 hashtable->skewBucketNums = NULL;
988 hashtable->nSkewBuckets = 0;
989 hashtable->spaceUsedSkew = 0;
993 * We can always skip over any batches that are completely empty on both
994 * sides. We can sometimes skip over batches that are empty on only one
995 * side, but there are exceptions:
997 * 1. In a left/full outer join, we have to process outer batches even if
998 * the inner batch is empty. Similarly, in a right/full outer join, we
999 * have to process inner batches even if the outer batch is empty.
1001 * 2. If we have increased nbatch since the initial estimate, we have to
1002 * scan inner batches since they might contain tuples that need to be
1003 * reassigned to later inner batches.
1005 * 3. Similarly, if we have increased nbatch since starting the outer
1006 * scan, we have to rescan outer batches in case they contain tuples that
1007 * need to be reassigned.
1010 while (curbatch < nbatch &&
1011 (hashtable->outerBatchFile[curbatch] == NULL ||
1012 hashtable->innerBatchFile[curbatch] == NULL))
1014 if (hashtable->outerBatchFile[curbatch] &&
1015 HJ_FILL_OUTER(hjstate))
1016 break; /* must process due to rule 1 */
1017 if (hashtable->innerBatchFile[curbatch] &&
1018 HJ_FILL_INNER(hjstate))
1019 break; /* must process due to rule 1 */
1020 if (hashtable->innerBatchFile[curbatch] &&
1021 nbatch != hashtable->nbatch_original)
1022 break; /* must process due to rule 2 */
1023 if (hashtable->outerBatchFile[curbatch] &&
1024 nbatch != hashtable->nbatch_outstart)
1025 break; /* must process due to rule 3 */
1026 /* We can ignore this batch. */
1027 /* Release associated temp files right away. */
1028 if (hashtable->innerBatchFile[curbatch])
1029 BufFileClose(hashtable->innerBatchFile[curbatch]);
1030 hashtable->innerBatchFile[curbatch] = NULL;
1031 if (hashtable->outerBatchFile[curbatch])
1032 BufFileClose(hashtable->outerBatchFile[curbatch]);
1033 hashtable->outerBatchFile[curbatch] = NULL;
1037 if (curbatch >= nbatch)
1038 return false; /* no more batches */
1040 hashtable->curbatch = curbatch;
1043 * Reload the hash table with the new inner batch (which could be empty)
1045 ExecHashTableReset(hashtable);
1047 innerFile = hashtable->innerBatchFile[curbatch];
1049 if (innerFile != NULL)
1051 if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
1053 (errcode_for_file_access(),
1054 errmsg("could not rewind hash-join temporary file: %m")));
1056 while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1059 hjstate->hj_HashTupleSlot)))
1062 * NOTE: some tuples may be sent to future batches. Also, it is
1063 * possible for hashtable->nbatch to be increased here!
1065 ExecHashTableInsert(hashtable, slot, hashvalue);
1069 * after we build the hash table, the inner batch file is no longer
1072 BufFileClose(innerFile);
1073 hashtable->innerBatchFile[curbatch] = NULL;
1077 * Rewind outer batch file (if present), so that we can start reading it.
1079 if (hashtable->outerBatchFile[curbatch] != NULL)
1081 if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
1083 (errcode_for_file_access(),
1084 errmsg("could not rewind hash-join temporary file: %m")));
1091 * Choose a batch to work on, and attach to it. Returns true if successful,
1092 * false if there are no more batches.
1095 ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1097 HashJoinTable hashtable = hjstate->hj_HashTable;
1102 * If we started up so late that the batch tracking array has been freed
1103 * already by ExecHashTableDetach(), then we are finished. See also
1104 * ExecParallelHashEnsureBatchAccessors().
1106 if (hashtable->batches == NULL)
1110 * If we were already attached to a batch, remember not to bother checking
1111 * it again, and detach from it (possibly freeing the hash table if we are
1114 if (hashtable->curbatch >= 0)
1116 hashtable->batches[hashtable->curbatch].done = true;
1117 ExecHashTableDetachBatch(hashtable);
1121 * Search for a batch that isn't done. We use an atomic counter to start
1122 * our search at a different batch in every participant when there are
1123 * more batches than participants.
1125 batchno = start_batchno =
1126 pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1132 TupleTableSlot *slot;
1134 if (!hashtable->batches[batchno].done)
1136 SharedTuplestoreAccessor *inner_tuples;
1137 Barrier *batch_barrier =
1138 &hashtable->batches[batchno].shared->batch_barrier;
1140 switch (BarrierAttach(batch_barrier))
1142 case PHJ_BATCH_ELECTING:
1144 /* One backend allocates the hash table. */
1145 if (BarrierArriveAndWait(batch_barrier,
1146 WAIT_EVENT_HASH_BATCH_ELECTING))
1147 ExecParallelHashTableAlloc(hashtable, batchno);
1150 case PHJ_BATCH_ALLOCATING:
1151 /* Wait for allocation to complete. */
1152 BarrierArriveAndWait(batch_barrier,
1153 WAIT_EVENT_HASH_BATCH_ALLOCATING);
1156 case PHJ_BATCH_LOADING:
1157 /* Start (or join in) loading tuples. */
1158 ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1159 inner_tuples = hashtable->batches[batchno].inner_tuples;
1160 sts_begin_parallel_scan(inner_tuples);
1161 while ((tuple = sts_parallel_scan_next(inner_tuples,
1164 ExecForceStoreMinimalTuple(tuple,
1165 hjstate->hj_HashTupleSlot,
1167 slot = hjstate->hj_HashTupleSlot;
1168 ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1171 sts_end_parallel_scan(inner_tuples);
1172 BarrierArriveAndWait(batch_barrier,
1173 WAIT_EVENT_HASH_BATCH_LOADING);
1176 case PHJ_BATCH_PROBING:
1179 * This batch is ready to probe. Return control to
1180 * caller. We stay attached to batch_barrier so that the
1181 * hash table stays alive until everyone's finished
1182 * probing it, but no participant is allowed to wait at
1183 * this barrier again (or else a deadlock could occur).
1184 * All attached participants must eventually call
1185 * BarrierArriveAndDetach() so that the final phase
1186 * PHJ_BATCH_DONE can be reached.
1188 ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1189 sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1192 case PHJ_BATCH_DONE:
1195 * Already done. Detach and go around again (if any
1198 BarrierDetach(batch_barrier);
1199 hashtable->batches[batchno].done = true;
1200 hashtable->curbatch = -1;
1204 elog(ERROR, "unexpected batch phase %d",
1205 BarrierPhase(batch_barrier));
1208 batchno = (batchno + 1) % hashtable->nbatch;
1209 } while (batchno != start_batchno);
1215 * ExecHashJoinSaveTuple
1216 * save a tuple to a batch file.
1218 * The data recorded in the file for each tuple is its hash value,
1219 * then the tuple in MinimalTuple format.
1221 * Note: it is important always to call this in the regular executor
1222 * context, not in a shorter-lived context; else the temp file buffers
1223 * will get messed up.
1226 ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1229 BufFile *file = *fileptr;
1234 /* First write to this batch file, so open it. */
1235 file = BufFileCreateTemp(false);
1239 written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
1240 if (written != sizeof(uint32))
1242 (errcode_for_file_access(),
1243 errmsg("could not write to hash-join temporary file: %m")));
1245 written = BufFileWrite(file, (void *) tuple, tuple->t_len);
1246 if (written != tuple->t_len)
1248 (errcode_for_file_access(),
1249 errmsg("could not write to hash-join temporary file: %m")));
1253 * ExecHashJoinGetSavedTuple
1254 * read the next tuple from a batch file. Return NULL if no more.
1256 * On success, *hashvalue is set to the tuple's hash value, and the tuple
1257 * itself is stored in the given slot.
1259 static TupleTableSlot *
1260 ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1263 TupleTableSlot *tupleSlot)
1270 * We check for interrupts here because this is typically taken as an
1271 * alternative code path to an ExecProcNode() call, which would include
1274 CHECK_FOR_INTERRUPTS();
1277 * Since both the hash value and the MinimalTuple length word are uint32,
1278 * we can read them both in one BufFileRead() call without any type
1281 nread = BufFileRead(file, (void *) header, sizeof(header));
1282 if (nread == 0) /* end of file */
1284 ExecClearTuple(tupleSlot);
1287 if (nread != sizeof(header))
1289 (errcode_for_file_access(),
1290 errmsg("could not read from hash-join temporary file: %m")));
1291 *hashvalue = header[0];
1292 tuple = (MinimalTuple) palloc(header[1]);
1293 tuple->t_len = header[1];
1294 nread = BufFileRead(file,
1295 (void *) ((char *) tuple + sizeof(uint32)),
1296 header[1] - sizeof(uint32));
1297 if (nread != header[1] - sizeof(uint32))
1299 (errcode_for_file_access(),
1300 errmsg("could not read from hash-join temporary file: %m")));
1301 ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1307 ExecReScanHashJoin(HashJoinState *node)
1310 * In a multi-batch join, we currently have to do rescans the hard way,
1311 * primarily because batch temp files may have already been released. But
1312 * if it's a single-batch join, and there is no parameter change for the
1313 * inner subnode, then we can just re-use the existing hash table without
1316 if (node->hj_HashTable != NULL)
1318 if (node->hj_HashTable->nbatch == 1 &&
1319 node->js.ps.righttree->chgParam == NULL)
1322 * Okay to reuse the hash table; needn't rescan inner, either.
1324 * However, if it's a right/full join, we'd better reset the
1325 * inner-tuple match flags contained in the table.
1327 if (HJ_FILL_INNER(node))
1328 ExecHashTableResetMatchFlags(node->hj_HashTable);
1331 * Also, we need to reset our state about the emptiness of the
1332 * outer relation, so that the new scan of the outer will update
1333 * it correctly if it turns out to be empty this time. (There's no
1334 * harm in clearing it now because ExecHashJoin won't need the
1335 * info. In the other cases, where the hash table doesn't exist
1336 * or we are destroying it, we leave this state alone because
1337 * ExecHashJoin will need it the first time through.)
1339 node->hj_OuterNotEmpty = false;
1341 /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1342 node->hj_JoinState = HJ_NEED_NEW_OUTER;
1346 /* must destroy and rebuild hash table */
1347 ExecHashTableDestroy(node->hj_HashTable);
1348 node->hj_HashTable = NULL;
1349 node->hj_JoinState = HJ_BUILD_HASHTABLE;
1352 * if chgParam of subnode is not null then plan will be re-scanned
1353 * by first ExecProcNode.
1355 if (node->js.ps.righttree->chgParam == NULL)
1356 ExecReScan(node->js.ps.righttree);
1360 /* Always reset intra-tuple state */
1361 node->hj_CurHashValue = 0;
1362 node->hj_CurBucketNo = 0;
1363 node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1364 node->hj_CurTuple = NULL;
1366 node->hj_MatchedOuter = false;
1367 node->hj_FirstOuterTupleSlot = NULL;
1370 * if chgParam of subnode is not null then plan will be re-scanned by
1371 * first ExecProcNode.
1373 if (node->js.ps.lefttree->chgParam == NULL)
1374 ExecReScan(node->js.ps.lefttree);
1378 ExecShutdownHashJoin(HashJoinState *node)
1380 if (node->hj_HashTable)
1383 * Detach from shared state before DSM memory goes away. This makes
1384 * sure that we don't have any pointers into DSM memory by the time
1385 * ExecEndHashJoin runs.
1387 ExecHashTableDetachBatch(node->hj_HashTable);
1388 ExecHashTableDetach(node->hj_HashTable);
1393 ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1395 PlanState *outerState = outerPlanState(hjstate);
1396 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1397 HashJoinTable hashtable = hjstate->hj_HashTable;
1398 TupleTableSlot *slot;
1402 Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1404 /* Execute outer plan, writing all tuples to shared tuplestores. */
1407 slot = ExecProcNode(outerState);
1408 if (TupIsNull(slot))
1410 econtext->ecxt_outertuple = slot;
1411 if (ExecHashGetHashValue(hashtable, econtext,
1412 hjstate->hj_OuterHashKeys,
1413 true, /* outer tuple */
1414 HJ_FILL_OUTER(hjstate),
1420 MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1422 ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1424 sts_puttuple(hashtable->batches[batchno].outer_tuples,
1425 &hashvalue, mintup);
1428 heap_free_minimal_tuple(mintup);
1430 CHECK_FOR_INTERRUPTS();
1433 /* Make sure all outer partitions are readable by any backend. */
1434 for (i = 0; i < hashtable->nbatch; ++i)
1435 sts_end_write(hashtable->batches[i].outer_tuples);
1439 ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1441 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1442 shm_toc_estimate_keys(&pcxt->estimator, 1);
1446 ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1448 int plan_node_id = state->js.ps.plan->plan_node_id;
1449 HashState *hashNode;
1450 ParallelHashJoinState *pstate;
1453 * Disable shared hash table mode if we failed to create a real DSM
1454 * segment, because that means that we don't have a DSA area to work with.
1456 if (pcxt->seg == NULL)
1459 ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1462 * Set up the state needed to coordinate access to the shared hash
1463 * table(s), using the plan node ID as the toc key.
1465 pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1466 shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1469 * Set up the shared hash join state with no batches initially.
1470 * ExecHashTableCreate() will prepare at least one later and set nbatch
1471 * and space_allowed.
1474 pstate->space_allowed = 0;
1475 pstate->batches = InvalidDsaPointer;
1476 pstate->old_batches = InvalidDsaPointer;
1477 pstate->nbuckets = 0;
1478 pstate->growth = PHJ_GROWTH_OK;
1479 pstate->chunk_work_queue = InvalidDsaPointer;
1480 pg_atomic_init_u32(&pstate->distributor, 0);
1481 pstate->nparticipants = pcxt->nworkers + 1;
1482 pstate->total_tuples = 0;
1483 LWLockInitialize(&pstate->lock,
1484 LWTRANCHE_PARALLEL_HASH_JOIN);
1485 BarrierInit(&pstate->build_barrier, 0);
1486 BarrierInit(&pstate->grow_batches_barrier, 0);
1487 BarrierInit(&pstate->grow_buckets_barrier, 0);
1489 /* Set up the space we'll use for shared temporary files. */
1490 SharedFileSetInit(&pstate->fileset, pcxt->seg);
1492 /* Initialize the shared state in the hash node. */
1493 hashNode = (HashState *) innerPlanState(state);
1494 hashNode->parallel_state = pstate;
1497 /* ----------------------------------------------------------------
1498 * ExecHashJoinReInitializeDSM
1500 * Reset shared state before beginning a fresh scan.
1501 * ----------------------------------------------------------------
1504 ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
1506 int plan_node_id = state->js.ps.plan->plan_node_id;
1507 ParallelHashJoinState *pstate =
1508 shm_toc_lookup(cxt->toc, plan_node_id, false);
1511 * It would be possible to reuse the shared hash table in single-batch
1512 * cases by resetting and then fast-forwarding build_barrier to
1513 * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
1514 * currently shared hash tables are already freed by now (by the last
1515 * participant to detach from the batch). We could consider keeping it
1516 * around for single-batch joins. We'd also need to adjust
1517 * finalize_plan() so that it doesn't record a dummy dependency for
1518 * Parallel Hash nodes, preventing the rescan optimization. For now we
1522 /* Detach, freeing any remaining shared memory. */
1523 if (state->hj_HashTable != NULL)
1525 ExecHashTableDetachBatch(state->hj_HashTable);
1526 ExecHashTableDetach(state->hj_HashTable);
1529 /* Clear any shared batch files. */
1530 SharedFileSetDeleteAll(&pstate->fileset);
1532 /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
1533 BarrierInit(&pstate->build_barrier, 0);
1537 ExecHashJoinInitializeWorker(HashJoinState *state,
1538 ParallelWorkerContext *pwcxt)
1540 HashState *hashNode;
1541 int plan_node_id = state->js.ps.plan->plan_node_id;
1542 ParallelHashJoinState *pstate =
1543 shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1545 /* Attach to the space for shared temporary files. */
1546 SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1548 /* Attach to the shared state in the hash node. */
1549 hashNode = (HashState *) innerPlanState(state);
1550 hashNode->parallel_state = pstate;
1552 ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);