* nodeHashjoin.c
* Routines to handle hash join nodes
*
- * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/nodeHashjoin.c,v 1.60 2004/01/07 18:56:26 neilc Exp $
+ * src/backend/executor/nodeHashjoin.c
+ *
+ * PARALLELISM
+ *
+ * Hash joins can participate in parallel query execution in several ways. A
+ * parallel-oblivious hash join is one where the node is unaware that it is
+ * part of a parallel plan. In this case, a copy of the inner plan is used to
+ * build a copy of the hash table in every backend, and the outer plan could
+ * either be built from a partial or complete path, so that the results of the
+ * hash join are correspondingly either partial or complete. A parallel-aware
+ * hash join is one that behaves differently, coordinating work between
+ * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
+ * Hash Join always appears with a Parallel Hash node.
+ *
+ * Parallel-aware hash joins use the same per-backend state machine to track
+ * progress through the hash join algorithm as parallel-oblivious hash joins.
+ * In a parallel-aware hash join, there is also a shared state machine that
+ * co-operating backends use to synchronize their local state machines and
+ * program counters. The shared state machine is managed with a Barrier IPC
+ * primitive. When all attached participants arrive at a barrier, the phase
+ * advances and all waiting participants are released.
+ *
+ * When a participant begins working on a parallel hash join, it must first
+ * figure out how much progress has already been made, because participants
+ * don't wait for each other to begin. For this reason there are switch
+ * statements at key points in the code where we have to synchronize our local
+ * state machine with the phase, and then jump to the correct part of the
+ * algorithm so that we can get started.
+ *
+ * One barrier called build_barrier is used to coordinate the hashing phases.
+ * The phase is represented by an integer which begins at zero and increments
+ * one by one, but in the code it is referred to by symbolic names as follows:
+ *
+ * PHJ_BUILD_ELECTING -- initial state
+ * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
+ * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
+ * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
+ * PHJ_BUILD_DONE -- building done, probing can begin
+ *
+ * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
+ * be used repeatedly as required to coordinate expansions in the number of
+ * batches or buckets. Their phases are as follows:
+ *
+ * PHJ_GROW_BATCHES_ELECTING -- initial state
+ * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
+ * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
+ * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
+ *
+ * PHJ_GROW_BUCKETS_ELECTING -- initial state
+ * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
+ * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
+ *
+ * If the planner got the number of batches and buckets right, those won't be
+ * necessary, but on the other hand we might finish up needing to expand the
+ * buckets or batches multiple times while hashing the inner relation to stay
+ * within our memory budget and load factor target. For that reason it's a
+ * separate pair of barriers using circular phases.
+ *
+ * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
+ * because we need to divide the outer relation into batches up front in order
+ * to be able to process batches entirely independently. In contrast, the
+ * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
+ * batches whenever it encounters them while scanning and probing, which it
+ * can do because it processes batches in serial order.
+ *
+ * Once PHJ_BUILD_DONE is reached, backends then split up and process
+ * different batches, or gang up and work together on probing batches if there
+ * aren't enough to go around. For each batch there is a separate barrier
+ * with the following phases:
+ *
+ * PHJ_BATCH_ELECTING -- initial state
+ * PHJ_BATCH_ALLOCATING -- one allocates buckets
+ * PHJ_BATCH_LOADING -- all load the hash table from disk
+ * PHJ_BATCH_PROBING -- all probe
+ * PHJ_BATCH_DONE -- end
+ *
+ * Batch 0 is a special case, because it starts out in phase
+ * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
+ * PHJ_BUILD_HASHING_INNER so we can skip loading.
+ *
+ * Initially we try to plan for a single-batch hash join using the combined
+ * work_mem of all participants to create a large shared hash table. If that
+ * turns out either at planning or execution time to be impossible then we
+ * fall back to regular work_mem sized hash tables.
+ *
+ * To avoid deadlocks, we never wait for any barrier unless it is known that
+ * all other backends attached to it are actively executing the node or have
+ * already arrived. Practically, that means that we never return a tuple
+ * while attached to a barrier, unless the barrier has reached its final
+ * state. In the slightly special case of the per-batch barrier, we return
+ * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include "access/htup_details.h"
+#include "access/parallel.h"
#include "executor/executor.h"
+#include "executor/hashjoin.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
-#include "optimizer/clauses.h"
+#include "miscadmin.h"
+#include "pgstat.h"
#include "utils/memutils.h"
+#include "utils/sharedtuplestore.h"
-static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *node,
- HashJoinState *hjstate);
+/*
+ * States of the ExecHashJoin state machine
+ */
+#define HJ_BUILD_HASHTABLE 1
+#define HJ_NEED_NEW_OUTER 2
+#define HJ_SCAN_BUCKET 3
+#define HJ_FILL_OUTER_TUPLE 4
+#define HJ_FILL_INNER_TUPLES 5
+#define HJ_NEED_NEW_BATCH 6
+
+/* Returns true if doing null-fill on outer relation */
+#define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
+/* Returns true if doing null-fill on inner relation */
+#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
+
+static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
+ HashJoinState *hjstate,
+ uint32 *hashvalue);
+static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
+ HashJoinState *hjstate,
+ uint32 *hashvalue);
static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
BufFile *file,
+ uint32 *hashvalue,
TupleTableSlot *tupleSlot);
-static int ExecHashJoinNewBatch(HashJoinState *hjstate);
+static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
+static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
+static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
/* ----------------------------------------------------------------
- * ExecHashJoin
+ * ExecHashJoinImpl
*
- * This function implements the Hybrid Hashjoin algorithm.
- * recursive partitioning remains to be added.
- * Note: the relation we build hash table on is the inner
- * the other one is outer.
+ * This function implements the Hybrid Hashjoin algorithm. It is marked
+ * with an always-inline attribute so that ExecHashJoin() and
+ * ExecParallelHashJoin() can inline it. Compilers that respect the
+ * attribute should create versions specialized for parallel == true and
+ * parallel == false with unnecessary branches removed.
+ *
+ * Note: the relation we build hash table on is the "inner"
+ * the other one is "outer".
* ----------------------------------------------------------------
*/
-TupleTableSlot * /* return: a tuple or NULL */
-ExecHashJoin(HashJoinState *node)
+static pg_attribute_always_inline TupleTableSlot *
+ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
- EState *estate;
+ HashJoinState *node = castNode(HashJoinState, pstate);
PlanState *outerNode;
HashState *hashNode;
- List *hjclauses;
- List *outerkeys;
- List *joinqual;
- List *otherqual;
- ScanDirection dir;
- TupleTableSlot *inntuple;
+ ExprState *joinqual;
+ ExprState *otherqual;
ExprContext *econtext;
- ExprDoneCond isDone;
HashJoinTable hashtable;
- HeapTuple curtuple;
TupleTableSlot *outerTupleSlot;
- int i;
+ uint32 hashvalue;
+ int batchno;
+ ParallelHashJoinState *parallel_state;
/*
* get information from HashJoin node
*/
- hjclauses = node->hashclauses;
- estate = node->js.ps.state;
joinqual = node->js.joinqual;
otherqual = node->js.ps.qual;
hashNode = (HashState *) innerPlanState(node);
outerNode = outerPlanState(node);
- dir = estate->es_direction;
-
- /*
- * get information from HashJoin state
- */
hashtable = node->hj_HashTable;
- outerkeys = node->hj_OuterHashKeys;
econtext = node->js.ps.ps_ExprContext;
-
- /*
- * Check to see if we're still projecting out tuples from a previous
- * join tuple (because there is a function-returning-set in the
- * projection expressions). If so, try to project another one.
- */
- if (node->js.ps.ps_TupFromTlist)
- {
- TupleTableSlot *result;
-
- result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
- if (isDone == ExprMultipleResult)
- return result;
- /* Done with that source tuple... */
- node->js.ps.ps_TupFromTlist = false;
- }
-
- /*
- * If we're doing an IN join, we want to return at most one row per
- * outer tuple; so we can stop scanning the inner scan if we matched
- * on the previous try.
- */
- if (node->js.jointype == JOIN_IN &&
- node->hj_MatchedOuter)
- node->hj_NeedNewOuter = true;
+ parallel_state = hashNode->parallel_state;
/*
* Reset per-tuple memory context to free any expression evaluation
- * storage allocated in the previous tuple cycle. Note this can't
- * happen until we're done projecting out tuples from a join tuple.
+ * storage allocated in the previous tuple cycle.
*/
ResetExprContext(econtext);
/*
- * if this is the first call, build the hash table for inner relation
+ * run the hash join state machine
*/
- if (!node->hj_hashdone)
+ for (;;)
{
/*
- * create the hash table
+ * It's possible to iterate this loop many times before returning a
+ * tuple, in some pathological cases such as needing to move much of
+ * the current batch to a later batch. So let's check for interrupts
+ * each time through.
*/
- Assert(hashtable == NULL);
- hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan,
- node->hj_HashOperators);
- node->hj_HashTable = hashtable;
+ CHECK_FOR_INTERRUPTS();
- /*
- * execute the Hash node, to build the hash table
- */
- hashNode->hashtable = hashtable;
- (void) ExecProcNode((PlanState *) hashNode);
+ switch (node->hj_JoinState)
+ {
+ case HJ_BUILD_HASHTABLE:
- /*
- * Open temp files for outer batches, if needed. Note that file
- * buffers are palloc'd in regular executor context.
- */
- for (i = 0; i < hashtable->nbatch; i++)
- hashtable->outerBatchFile[i] = BufFileCreateTemp(false);
+ /*
+ * First time through: build hash table for inner relation.
+ */
+ Assert(hashtable == NULL);
- node->hj_hashdone = true;
- }
+ /*
+ * If the outer relation is completely empty, and it's not
+ * right/full join, we can quit without building the hash
+ * table. However, for an inner join it is only a win to
+ * check this when the outer relation's startup cost is less
+ * than the projected cost of building the hash table.
+ * Otherwise it's best to build the hash table first and see
+ * if the inner relation is empty. (When it's a left join, we
+ * should always make this check, since we aren't going to be
+ * able to skip the join on the strength of an empty inner
+ * relation anyway.)
+ *
+ * If we are rescanning the join, we make use of information
+ * gained on the previous scan: don't bother to try the
+ * prefetch if the previous scan found the outer relation
+ * nonempty. This is not 100% reliable since with new
+ * parameters the outer relation might yield different
+ * results, but it's a good heuristic.
+ *
+ * The only way to make the check is to try to fetch a tuple
+ * from the outer plan node. If we succeed, we have to stash
+ * it away for later consumption by ExecHashJoinOuterGetTuple.
+ */
+ if (HJ_FILL_INNER(node))
+ {
+ /* no chance to not build the hash table */
+ node->hj_FirstOuterTupleSlot = NULL;
+ }
+ else if (parallel)
+ {
+ /*
+ * The empty-outer optimization is not implemented for
+ * shared hash tables, because no one participant can
+ * determine that there are no outer tuples, and it's not
+ * yet clear that it's worth the synchronization overhead
+ * of reaching consensus to figure that out. So we have
+ * to build the hash table.
+ */
+ node->hj_FirstOuterTupleSlot = NULL;
+ }
+ else if (HJ_FILL_OUTER(node) ||
+ (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
+ !node->hj_OuterNotEmpty))
+ {
+ node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
+ if (TupIsNull(node->hj_FirstOuterTupleSlot))
+ {
+ node->hj_OuterNotEmpty = false;
+ return NULL;
+ }
+ else
+ node->hj_OuterNotEmpty = true;
+ }
+ else
+ node->hj_FirstOuterTupleSlot = NULL;
- /*
- * Now get an outer tuple and probe into the hash table for matches
- */
- outerTupleSlot = node->js.ps.ps_OuterTupleSlot;
+ /*
+ * Create the hash table. If using Parallel Hash, then
+ * whoever gets here first will create the hash table and any
+ * later arrivals will merely attach to it.
+ */
+ hashtable = ExecHashTableCreate(hashNode,
+ node->hj_HashOperators,
+ HJ_FILL_INNER(node));
+ node->hj_HashTable = hashtable;
- for (;;)
- {
- /*
- * If we don't have an outer tuple, get the next one
- */
- if (node->hj_NeedNewOuter)
- {
- outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
- node);
- if (TupIsNull(outerTupleSlot))
- {
- /* end of join */
- return NULL;
- }
+ /*
+ * Execute the Hash node, to build the hash table. If using
+ * Parallel Hash, then we'll try to help hashing unless we
+ * arrived too late.
+ */
+ hashNode->hashtable = hashtable;
+ (void) MultiExecProcNode((PlanState *) hashNode);
- node->js.ps.ps_OuterTupleSlot = outerTupleSlot;
- econtext->ecxt_outertuple = outerTupleSlot;
- node->hj_NeedNewOuter = false;
- node->hj_MatchedOuter = false;
+ /*
+ * If the inner relation is completely empty, and we're not
+ * doing a left outer join, we can quit without scanning the
+ * outer relation.
+ */
+ if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
+ return NULL;
- /*
- * now we have an outer tuple, find the corresponding bucket
- * for this tuple from the hash table
- */
- node->hj_CurBucketNo = ExecHashGetBucket(hashtable, econtext,
- outerkeys);
- node->hj_CurTuple = NULL;
+ /*
+ * need to remember whether nbatch has increased since we
+ * began scanning the outer relation
+ */
+ hashtable->nbatch_outstart = hashtable->nbatch;
- /*
- * Now we've got an outer tuple and the corresponding hash
- * bucket, but this tuple may not belong to the current batch.
- * This need only be checked in the first pass.
- */
- if (hashtable->curbatch == 0)
- {
- int batchno = ExecHashGetBatch(node->hj_CurBucketNo,
- hashtable);
+ /*
+ * Reset OuterNotEmpty for scan. (It's OK if we fetched a
+ * tuple above, because ExecHashJoinOuterGetTuple will
+ * immediately set it again.)
+ */
+ node->hj_OuterNotEmpty = false;
+
+ if (parallel)
+ {
+ Barrier *build_barrier;
+
+ build_barrier = ¶llel_state->build_barrier;
+ Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
+ BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+ if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
+ {
+ /*
+ * If multi-batch, we need to hash the outer relation
+ * up front.
+ */
+ if (hashtable->nbatch > 1)
+ ExecParallelHashJoinPartitionOuter(node);
+ BarrierArriveAndWait(build_barrier,
+ WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
+ }
+ Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+
+ /* Each backend should now select a batch to work on. */
+ hashtable->curbatch = -1;
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+
+ continue;
+ }
+ else
+ node->hj_JoinState = HJ_NEED_NEW_OUTER;
- if (batchno >= 0)
+ /* FALL THRU */
+
+ case HJ_NEED_NEW_OUTER:
+
+ /*
+ * We don't have an outer tuple, try to get the next one
+ */
+ if (parallel)
+ outerTupleSlot =
+ ExecParallelHashJoinOuterGetTuple(outerNode, node,
+ &hashvalue);
+ else
+ outerTupleSlot =
+ ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
+
+ if (TupIsNull(outerTupleSlot))
{
+ /* end of batch, or maybe whole join */
+ if (HJ_FILL_INNER(node))
+ {
+ /* set up to scan for unmatched inner tuples */
+ ExecPrepHashTableForUnmatched(node);
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ }
+ else
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+ continue;
+ }
+
+ econtext->ecxt_outertuple = outerTupleSlot;
+ node->hj_MatchedOuter = false;
+
+ /*
+ * Find the corresponding bucket for this tuple in the main
+ * hash table or skew hash table.
+ */
+ node->hj_CurHashValue = hashvalue;
+ ExecHashGetBucketAndBatch(hashtable, hashvalue,
+ &node->hj_CurBucketNo, &batchno);
+ node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
+ hashvalue);
+ node->hj_CurTuple = NULL;
+
+ /*
+ * The tuple might not belong to the current batch (where
+ * "current batch" includes the skew buckets if any).
+ */
+ if (batchno != hashtable->curbatch &&
+ node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
+ {
+ bool shouldFree;
+ MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
+ &shouldFree);
+
/*
* Need to postpone this outer tuple to a later batch.
* Save it in the corresponding outer-batch file.
*/
- hashtable->outerBatchSize[batchno]++;
- ExecHashJoinSaveTuple(outerTupleSlot->val,
- hashtable->outerBatchFile[batchno]);
- node->hj_NeedNewOuter = true;
- continue; /* loop around for a new outer tuple */
- }
- }
- }
+ Assert(parallel_state == NULL);
+ Assert(batchno > hashtable->curbatch);
+ ExecHashJoinSaveTuple(mintuple, hashvalue,
+ &hashtable->outerBatchFile[batchno]);
- /*
- * OK, scan the selected hash bucket for matches
- */
- for (;;)
- {
- curtuple = ExecScanHashBucket(node,
- hjclauses,
- econtext);
- if (curtuple == NULL)
- break; /* out of matches */
+ if (shouldFree)
+ heap_free_minimal_tuple(mintuple);
- /*
- * we've got a match, but still need to test non-hashed quals
- */
- inntuple = ExecStoreTuple(curtuple,
- node->hj_HashTupleSlot,
- InvalidBuffer,
- false); /* don't pfree this tuple */
- econtext->ecxt_innertuple = inntuple;
+ /* Loop around, staying in HJ_NEED_NEW_OUTER state */
+ continue;
+ }
- /* reset temp memory each time to avoid leaks from qual expr */
- ResetExprContext(econtext);
+ /* OK, let's scan the bucket for matches */
+ node->hj_JoinState = HJ_SCAN_BUCKET;
- /*
- * if we pass the qual, then save state for next call and have
- * ExecProject form the projection, store it in the tuple
- * table, and return the slot.
- *
- * Only the joinquals determine MatchedOuter status, but all
- * quals must pass to actually return the tuple.
- */
- if (ExecQual(joinqual, econtext, false))
- {
- node->hj_MatchedOuter = true;
+ /* FALL THRU */
+
+ case HJ_SCAN_BUCKET:
- if (otherqual == NIL || ExecQual(otherqual, econtext, false))
+ /*
+ * Scan the selected hash bucket for matches to current outer
+ */
+ if (parallel)
{
- TupleTableSlot *result;
+ if (!ExecParallelScanHashBucket(node, econtext))
+ {
+ /* out of matches; check for possible outer-join fill */
+ node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+ continue;
+ }
+ }
+ else
+ {
+ if (!ExecScanHashBucket(node, econtext))
+ {
+ /* out of matches; check for possible outer-join fill */
+ node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+ continue;
+ }
+ }
- result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
+ /*
+ * We've got a match, but still need to test non-hashed quals.
+ * ExecScanHashBucket already set up all the state needed to
+ * call ExecQual.
+ *
+ * If we pass the qual, then save state for next call and have
+ * ExecProject form the projection, store it in the tuple
+ * table, and return the slot.
+ *
+ * Only the joinquals determine tuple match status, but all
+ * quals must pass to actually return the tuple.
+ */
+ if (joinqual == NULL || ExecQual(joinqual, econtext))
+ {
+ node->hj_MatchedOuter = true;
+ HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
- if (isDone != ExprEndResult)
+ /* In an antijoin, we never return a matched tuple */
+ if (node->js.jointype == JOIN_ANTI)
{
- node->js.ps.ps_TupFromTlist =
- (isDone == ExprMultipleResult);
- return result;
+ node->hj_JoinState = HJ_NEED_NEW_OUTER;
+ continue;
}
+
+ /*
+ * If we only need to join to the first matching inner
+ * tuple, then consider returning this one, but after that
+ * continue with next outer tuple.
+ */
+ if (node->js.single_match)
+ node->hj_JoinState = HJ_NEED_NEW_OUTER;
+
+ if (otherqual == NULL || ExecQual(otherqual, econtext))
+ return ExecProject(node->js.ps.ps_ProjInfo);
+ else
+ InstrCountFiltered2(node, 1);
}
+ else
+ InstrCountFiltered1(node, 1);
+ break;
+
+ case HJ_FILL_OUTER_TUPLE:
/*
- * If we didn't return a tuple, may need to set
- * NeedNewOuter
+ * The current outer tuple has run out of matches, so check
+ * whether to emit a dummy outer-join tuple. Whether we emit
+ * one or not, the next state is NEED_NEW_OUTER.
*/
- if (node->js.jointype == JOIN_IN)
+ node->hj_JoinState = HJ_NEED_NEW_OUTER;
+
+ if (!node->hj_MatchedOuter &&
+ HJ_FILL_OUTER(node))
{
- node->hj_NeedNewOuter = true;
- break; /* out of loop over hash bucket */
+ /*
+ * Generate a fake join tuple with nulls for the inner
+ * tuple, and return it if it passes the non-join quals.
+ */
+ econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
+
+ if (otherqual == NULL || ExecQual(otherqual, econtext))
+ return ExecProject(node->js.ps.ps_ProjInfo);
+ else
+ InstrCountFiltered2(node, 1);
}
- }
- }
+ break;
- /*
- * Now the current outer tuple has run out of matches, so check
- * whether to emit a dummy outer-join tuple. If not, loop around
- * to get a new outer tuple.
- */
- node->hj_NeedNewOuter = true;
+ case HJ_FILL_INNER_TUPLES:
- if (!node->hj_MatchedOuter &&
- node->js.jointype == JOIN_LEFT)
- {
- /*
- * We are doing an outer join and there were no join matches
- * for this outer tuple. Generate a fake join tuple with
- * nulls for the inner tuple, and return it if it passes the
- * non-join quals.
- */
- econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
+ /*
+ * We have finished a batch, but we are doing right/full join,
+ * so any unmatched inner tuples in the hashtable have to be
+ * emitted before we continue to the next batch.
+ */
+ if (!ExecScanHashTableForUnmatched(node, econtext))
+ {
+ /* no more unmatched tuples */
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+ continue;
+ }
- if (ExecQual(otherqual, econtext, false))
- {
/*
- * qualification was satisfied so we project and return
- * the slot containing the result tuple using
- * ExecProject().
+ * Generate a fake join tuple with nulls for the outer tuple,
+ * and return it if it passes the non-join quals.
*/
- TupleTableSlot *result;
+ econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
+
+ if (otherqual == NULL || ExecQual(otherqual, econtext))
+ return ExecProject(node->js.ps.ps_ProjInfo);
+ else
+ InstrCountFiltered2(node, 1);
+ break;
- result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
+ case HJ_NEED_NEW_BATCH:
- if (isDone != ExprEndResult)
+ /*
+ * Try to advance to next batch. Done if there are no more.
+ */
+ if (parallel)
{
- node->js.ps.ps_TupFromTlist =
- (isDone == ExprMultipleResult);
- return result;
+ if (!ExecParallelHashJoinNewBatch(node))
+ return NULL; /* end of parallel-aware join */
}
- }
+ else
+ {
+ if (!ExecHashJoinNewBatch(node))
+ return NULL; /* end of parallel-oblivious join */
+ }
+ node->hj_JoinState = HJ_NEED_NEW_OUTER;
+ break;
+
+ default:
+ elog(ERROR, "unrecognized hashjoin state: %d",
+ (int) node->hj_JoinState);
}
}
}
+/* ----------------------------------------------------------------
+ * ExecHashJoin
+ *
+ * Parallel-oblivious version.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot * /* return: a tuple or NULL */
+ExecHashJoin(PlanState *pstate)
+{
+ /*
+ * On sufficiently smart compilers this should be inlined with the
+ * parallel-aware branches removed.
+ */
+ return ExecHashJoinImpl(pstate, false);
+}
+
+/* ----------------------------------------------------------------
+ * ExecParallelHashJoin
+ *
+ * Parallel-aware version.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot * /* return: a tuple or NULL */
+ExecParallelHashJoin(PlanState *pstate)
+{
+ /*
+ * On sufficiently smart compilers this should be inlined with the
+ * parallel-oblivious branches removed.
+ */
+ return ExecHashJoinImpl(pstate, true);
+}
+
/* ----------------------------------------------------------------
* ExecInitHashJoin
*
* ----------------------------------------------------------------
*/
HashJoinState *
-ExecInitHashJoin(HashJoin *node, EState *estate)
+ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
{
HashJoinState *hjstate;
Plan *outerNode;
Hash *hashNode;
List *lclauses;
List *rclauses;
+ List *rhclauses;
List *hoperators;
- List *hcl;
+ TupleDesc outerDesc,
+ innerDesc;
+ ListCell *l;
+ const TupleTableSlotOps *ops;
+
+ /* check for unsupported flags */
+ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
/*
* create state structure
hjstate->js.ps.plan = (Plan *) node;
hjstate->js.ps.state = estate;
+ /*
+ * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
+ * where this function may be replaced with a parallel version, if we
+ * managed to launch a parallel query.
+ */
+ hjstate->js.ps.ExecProcNode = ExecHashJoin;
+ hjstate->js.jointype = node->join.jointype;
+
/*
* Miscellaneous initialization
*
*/
ExecAssignExprContext(estate, &hjstate->js.ps);
- /*
- * initialize child expressions
- */
- hjstate->js.ps.targetlist = (List *)
- ExecInitExpr((Expr *) node->join.plan.targetlist,
- (PlanState *) hjstate);
- hjstate->js.ps.qual = (List *)
- ExecInitExpr((Expr *) node->join.plan.qual,
- (PlanState *) hjstate);
- hjstate->js.jointype = node->join.jointype;
- hjstate->js.joinqual = (List *)
- ExecInitExpr((Expr *) node->join.joinqual,
- (PlanState *) hjstate);
- hjstate->hashclauses = (List *)
- ExecInitExpr((Expr *) node->hashclauses,
- (PlanState *) hjstate);
-
/*
* initialize child nodes
+ *
+ * Note: we could suppress the REWIND flag for the inner input, which
+ * would amount to betting that the hash will be a single batch. Not
+ * clear if this would be a win or not.
*/
outerNode = outerPlan(node);
hashNode = (Hash *) innerPlan(node);
- outerPlanState(hjstate) = ExecInitNode(outerNode, estate);
- innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate);
+ outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
+ outerDesc = ExecGetResultType(outerPlanState(hjstate));
+ innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
+ innerDesc = ExecGetResultType(innerPlanState(hjstate));
-#define HASHJOIN_NSLOTS 3
+ /*
+ * Initialize result slot, type and projection.
+ */
+ ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
+ ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
/*
* tuple table initialization
*/
- ExecInitResultTupleSlot(estate, &hjstate->js.ps);
- hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate);
+ ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
+ hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
+ ops);
+
+ /*
+ * detect whether we need only consider the first matching inner tuple
+ */
+ hjstate->js.single_match = (node->join.inner_unique ||
+ node->join.jointype == JOIN_SEMI);
+ /* set up null tuples for outer joins, if needed */
switch (node->join.jointype)
{
case JOIN_INNER:
- case JOIN_IN:
+ case JOIN_SEMI:
break;
case JOIN_LEFT:
+ case JOIN_ANTI:
+ hjstate->hj_NullInnerTupleSlot =
+ ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
+ break;
+ case JOIN_RIGHT:
+ hjstate->hj_NullOuterTupleSlot =
+ ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
+ break;
+ case JOIN_FULL:
+ hjstate->hj_NullOuterTupleSlot =
+ ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
hjstate->hj_NullInnerTupleSlot =
- ExecInitNullTupleSlot(estate,
- ExecGetResultType(innerPlanState(hjstate)));
+ ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
break;
default:
elog(ERROR, "unrecognized join type: %d",
}
/*
- * now for some voodoo. our temporary tuple slot is actually the
- * result tuple slot of the Hash node (which is our inner plan). we
- * do this because Hash nodes don't return tuples via ExecProcNode()
- * -- instead the hash join node uses ExecScanHashBucket() to get at
- * the contents of the hash table. -cim 6/9/91
+ * now for some voodoo. our temporary tuple slot is actually the result
+ * tuple slot of the Hash node (which is our inner plan). we can do this
+ * because Hash nodes don't return tuples via ExecProcNode() -- instead
+ * the hash join node uses ExecScanHashBucket() to get at the contents of
+ * the hash table. -cim 6/9/91
*/
{
HashState *hashstate = (HashState *) innerPlanState(hjstate);
}
/*
- * initialize tuple type and projection info
+ * initialize child expressions
*/
- ExecAssignResultTypeFromTL(&hjstate->js.ps);
- ExecAssignProjectionInfo(&hjstate->js.ps);
-
- ExecSetSlotDescriptor(hjstate->hj_OuterTupleSlot,
- ExecGetResultType(outerPlanState(hjstate)),
- false);
+ hjstate->js.ps.qual =
+ ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
+ hjstate->js.joinqual =
+ ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
+ hjstate->hashclauses =
+ ExecInitQual(node->hashclauses, (PlanState *) hjstate);
/*
* initialize hash-specific info
*/
-
- hjstate->hj_hashdone = false;
hjstate->hj_HashTable = NULL;
+ hjstate->hj_FirstOuterTupleSlot = NULL;
+ hjstate->hj_CurHashValue = 0;
hjstate->hj_CurBucketNo = 0;
+ hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
hjstate->hj_CurTuple = NULL;
/*
- * Deconstruct the hash clauses into outer and inner argument values,
- * so that we can evaluate those subexpressions separately. Also make
- * a list of the hash operator OIDs, in preparation for looking up the
- * hash functions to use.
+ * Deconstruct the hash clauses into outer and inner argument values, so
+ * that we can evaluate those subexpressions separately. Also make a list
+ * of the hash operator OIDs, in preparation for looking up the hash
+ * functions to use.
*/
lclauses = NIL;
rclauses = NIL;
+ rhclauses = NIL;
hoperators = NIL;
- foreach(hcl, hjstate->hashclauses)
+ foreach(l, node->hashclauses)
{
- FuncExprState *fstate = (FuncExprState *) lfirst(hcl);
- OpExpr *hclause;
-
- Assert(IsA(fstate, FuncExprState));
- hclause = (OpExpr *) fstate->xprstate.expr;
- Assert(IsA(hclause, OpExpr));
- lclauses = lappend(lclauses, lfirst(fstate->args));
- rclauses = lappend(rclauses, lsecond(fstate->args));
- hoperators = lappendo(hoperators, hclause->opno);
+ OpExpr *hclause = lfirst_node(OpExpr, l);
+
+ lclauses = lappend(lclauses, ExecInitExpr(linitial(hclause->args),
+ (PlanState *) hjstate));
+ rclauses = lappend(rclauses, ExecInitExpr(lsecond(hclause->args),
+ (PlanState *) hjstate));
+ rhclauses = lappend(rhclauses, ExecInitExpr(lsecond(hclause->args),
+ innerPlanState(hjstate)));
+ hoperators = lappend_oid(hoperators, hclause->opno);
}
hjstate->hj_OuterHashKeys = lclauses;
hjstate->hj_InnerHashKeys = rclauses;
hjstate->hj_HashOperators = hoperators;
/* child Hash node needs to evaluate inner hash keys, too */
- ((HashState *) innerPlanState(hjstate))->hashkeys = rclauses;
+ ((HashState *) innerPlanState(hjstate))->hashkeys = rhclauses;
- hjstate->js.ps.ps_OuterTupleSlot = NULL;
- hjstate->js.ps.ps_TupFromTlist = false;
- hjstate->hj_NeedNewOuter = true;
+ hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
hjstate->hj_MatchedOuter = false;
+ hjstate->hj_OuterNotEmpty = false;
return hjstate;
}
-int
-ExecCountSlotsHashJoin(HashJoin *node)
-{
- return ExecCountSlotsNode(outerPlan(node)) +
- ExecCountSlotsNode(innerPlan(node)) +
- HASHJOIN_NSLOTS;
-}
-
/* ----------------------------------------------------------------
* ExecEndHashJoin
*
ExecEndNode(innerPlanState(node));
}
-/* ----------------------------------------------------------------
- * ExecHashJoinOuterGetTuple
+/*
+ * ExecHashJoinOuterGetTuple
*
- * get the next outer tuple for hashjoin: either by
- * executing a plan node as in the first pass, or from
- * the tmp files for the hashjoin batches.
- * ----------------------------------------------------------------
+ * get the next outer tuple for a parallel oblivious hashjoin: either by
+ * executing the outer plan node in the first pass, or from the temp
+ * files for the hashjoin batches.
+ *
+ * Returns a null slot if no more outer tuples (within the current batch).
+ *
+ * On success, the tuple's hash value is stored at *hashvalue --- this is
+ * either originally computed, or re-read from the temp file.
*/
-
static TupleTableSlot *
-ExecHashJoinOuterGetTuple(PlanState *node, HashJoinState *hjstate)
+ExecHashJoinOuterGetTuple(PlanState *outerNode,
+ HashJoinState *hjstate,
+ uint32 *hashvalue)
{
HashJoinTable hashtable = hjstate->hj_HashTable;
int curbatch = hashtable->curbatch;
TupleTableSlot *slot;
- if (curbatch == 0)
- { /* if it is the first pass */
- slot = ExecProcNode(node);
+ if (curbatch == 0) /* if it is the first pass */
+ {
+ /*
+ * Check to see if first outer tuple was already fetched by
+ * ExecHashJoin() and not used yet.
+ */
+ slot = hjstate->hj_FirstOuterTupleSlot;
if (!TupIsNull(slot))
- return slot;
+ hjstate->hj_FirstOuterTupleSlot = NULL;
+ else
+ slot = ExecProcNode(outerNode);
+
+ while (!TupIsNull(slot))
+ {
+ /*
+ * We have to compute the tuple's hash value.
+ */
+ ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+
+ econtext->ecxt_outertuple = slot;
+ if (ExecHashGetHashValue(hashtable, econtext,
+ hjstate->hj_OuterHashKeys,
+ true, /* outer tuple */
+ HJ_FILL_OUTER(hjstate),
+ hashvalue))
+ {
+ /* remember outer relation is not empty for possible rescan */
+ hjstate->hj_OuterNotEmpty = true;
+
+ return slot;
+ }
+
+ /*
+ * That tuple couldn't match because of a NULL, so discard it and
+ * continue with the next one.
+ */
+ slot = ExecProcNode(outerNode);
+ }
+ }
+ else if (curbatch < hashtable->nbatch)
+ {
+ BufFile *file = hashtable->outerBatchFile[curbatch];
/*
- * We have just reached the end of the first pass. Try to switch
- * to a saved batch.
+ * In outer-join cases, we could get here even though the batch file
+ * is empty.
*/
- curbatch = ExecHashJoinNewBatch(hjstate);
- }
+ if (file == NULL)
+ return NULL;
- /*
- * Try to read from a temp file. Loop allows us to advance to new
- * batch as needed.
- */
- while (curbatch <= hashtable->nbatch)
- {
slot = ExecHashJoinGetSavedTuple(hjstate,
- hashtable->outerBatchFile[curbatch - 1],
+ file,
+ hashvalue,
hjstate->hj_OuterTupleSlot);
if (!TupIsNull(slot))
return slot;
- curbatch = ExecHashJoinNewBatch(hjstate);
}
- /* Out of batches... */
+ /* End of this batch */
return NULL;
}
-/* ----------------------------------------------------------------
- * ExecHashJoinGetSavedTuple
- *
- * read the next tuple from a tmp file
- * ----------------------------------------------------------------
+/*
+ * ExecHashJoinOuterGetTuple variant for the parallel case.
*/
-
static TupleTableSlot *
-ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
- BufFile *file,
- TupleTableSlot *tupleSlot)
+ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
+ HashJoinState *hjstate,
+ uint32 *hashvalue)
{
- HeapTupleData htup;
- size_t nread;
- HeapTuple heapTuple;
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ TupleTableSlot *slot;
- nread = BufFileRead(file, (void *) &htup, sizeof(HeapTupleData));
- if (nread == 0)
- return NULL; /* end of file */
- if (nread != sizeof(HeapTupleData))
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read from hash-join temporary file: %m")));
- heapTuple = palloc(HEAPTUPLESIZE + htup.t_len);
- memcpy((char *) heapTuple, (char *) &htup, sizeof(HeapTupleData));
- heapTuple->t_datamcxt = CurrentMemoryContext;
- heapTuple->t_data = (HeapTupleHeader)
- ((char *) heapTuple + HEAPTUPLESIZE);
- nread = BufFileRead(file, (void *) heapTuple->t_data, htup.t_len);
- if (nread != (size_t) htup.t_len)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read from hash-join temporary file: %m")));
- return ExecStoreTuple(heapTuple, tupleSlot, InvalidBuffer, true);
+ /*
+ * In the Parallel Hash case we only run the outer plan directly for
+ * single-batch hash joins. Otherwise we have to go to batch files, even
+ * for batch 0.
+ */
+ if (curbatch == 0 && hashtable->nbatch == 1)
+ {
+ slot = ExecProcNode(outerNode);
+
+ while (!TupIsNull(slot))
+ {
+ ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+
+ econtext->ecxt_outertuple = slot;
+ if (ExecHashGetHashValue(hashtable, econtext,
+ hjstate->hj_OuterHashKeys,
+ true, /* outer tuple */
+ HJ_FILL_OUTER(hjstate),
+ hashvalue))
+ return slot;
+
+ /*
+ * That tuple couldn't match because of a NULL, so discard it and
+ * continue with the next one.
+ */
+ slot = ExecProcNode(outerNode);
+ }
+ }
+ else if (curbatch < hashtable->nbatch)
+ {
+ MinimalTuple tuple;
+
+ tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
+ hashvalue);
+ if (tuple != NULL)
+ {
+ ExecForceStoreMinimalTuple(tuple,
+ hjstate->hj_OuterTupleSlot,
+ false);
+ slot = hjstate->hj_OuterTupleSlot;
+ return slot;
+ }
+ else
+ ExecClearTuple(hjstate->hj_OuterTupleSlot);
+ }
+
+ /* End of this batch */
+ return NULL;
}
-/* ----------------------------------------------------------------
- * ExecHashJoinNewBatch
- *
+/*
+ * ExecHashJoinNewBatch
* switch to a new hashjoin batch
- * ----------------------------------------------------------------
+ *
+ * Returns true if successful, false if there are no more batches.
*/
-static int
+static bool
ExecHashJoinNewBatch(HashJoinState *hjstate)
{
HashJoinTable hashtable = hjstate->hj_HashTable;
- int nbatch = hashtable->nbatch;
- int newbatch = hashtable->curbatch + 1;
- long *innerBatchSize = hashtable->innerBatchSize;
- long *outerBatchSize = hashtable->outerBatchSize;
+ int nbatch;
+ int curbatch;
BufFile *innerFile;
TupleTableSlot *slot;
- ExprContext *econtext;
- List *innerhashkeys;
+ uint32 hashvalue;
+
+ nbatch = hashtable->nbatch;
+ curbatch = hashtable->curbatch;
- if (newbatch > 1)
+ if (curbatch > 0)
{
/*
* We no longer need the previous outer batch file; close it right
* away to free disk space.
*/
- BufFileClose(hashtable->outerBatchFile[newbatch - 2]);
- hashtable->outerBatchFile[newbatch - 2] = NULL;
+ if (hashtable->outerBatchFile[curbatch])
+ BufFileClose(hashtable->outerBatchFile[curbatch]);
+ hashtable->outerBatchFile[curbatch] = NULL;
+ }
+ else /* we just finished the first batch */
+ {
+ /*
+ * Reset some of the skew optimization state variables, since we no
+ * longer need to consider skew tuples after the first batch. The
+ * memory context reset we are about to do will release the skew
+ * hashtable itself.
+ */
+ hashtable->skewEnabled = false;
+ hashtable->skewBucket = NULL;
+ hashtable->skewBucketNums = NULL;
+ hashtable->nSkewBuckets = 0;
+ hashtable->spaceUsedSkew = 0;
}
/*
- * We can skip over any batches that are empty on either side. Release
- * associated temp files right away.
+ * We can always skip over any batches that are completely empty on both
+ * sides. We can sometimes skip over batches that are empty on only one
+ * side, but there are exceptions:
+ *
+ * 1. In a left/full outer join, we have to process outer batches even if
+ * the inner batch is empty. Similarly, in a right/full outer join, we
+ * have to process inner batches even if the outer batch is empty.
+ *
+ * 2. If we have increased nbatch since the initial estimate, we have to
+ * scan inner batches since they might contain tuples that need to be
+ * reassigned to later inner batches.
+ *
+ * 3. Similarly, if we have increased nbatch since starting the outer
+ * scan, we have to rescan outer batches in case they contain tuples that
+ * need to be reassigned.
*/
- while (newbatch <= nbatch &&
- (innerBatchSize[newbatch - 1] == 0L ||
- outerBatchSize[newbatch - 1] == 0L))
+ curbatch++;
+ while (curbatch < nbatch &&
+ (hashtable->outerBatchFile[curbatch] == NULL ||
+ hashtable->innerBatchFile[curbatch] == NULL))
{
- BufFileClose(hashtable->innerBatchFile[newbatch - 1]);
- hashtable->innerBatchFile[newbatch - 1] = NULL;
- BufFileClose(hashtable->outerBatchFile[newbatch - 1]);
- hashtable->outerBatchFile[newbatch - 1] = NULL;
- newbatch++;
+ if (hashtable->outerBatchFile[curbatch] &&
+ HJ_FILL_OUTER(hjstate))
+ break; /* must process due to rule 1 */
+ if (hashtable->innerBatchFile[curbatch] &&
+ HJ_FILL_INNER(hjstate))
+ break; /* must process due to rule 1 */
+ if (hashtable->innerBatchFile[curbatch] &&
+ nbatch != hashtable->nbatch_original)
+ break; /* must process due to rule 2 */
+ if (hashtable->outerBatchFile[curbatch] &&
+ nbatch != hashtable->nbatch_outstart)
+ break; /* must process due to rule 3 */
+ /* We can ignore this batch. */
+ /* Release associated temp files right away. */
+ if (hashtable->innerBatchFile[curbatch])
+ BufFileClose(hashtable->innerBatchFile[curbatch]);
+ hashtable->innerBatchFile[curbatch] = NULL;
+ if (hashtable->outerBatchFile[curbatch])
+ BufFileClose(hashtable->outerBatchFile[curbatch]);
+ hashtable->outerBatchFile[curbatch] = NULL;
+ curbatch++;
}
- if (newbatch > nbatch)
- return newbatch; /* no more batches */
+ if (curbatch >= nbatch)
+ return false; /* no more batches */
+
+ hashtable->curbatch = curbatch;
/*
- * Rewind inner and outer batch files for this batch, so that we can
- * start reading them.
+ * Reload the hash table with the new inner batch (which could be empty)
*/
- if (BufFileSeek(hashtable->outerBatchFile[newbatch - 1], 0, 0L, SEEK_SET))
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not rewind hash-join temporary file: %m")));
+ ExecHashTableReset(hashtable);
- innerFile = hashtable->innerBatchFile[newbatch - 1];
+ innerFile = hashtable->innerBatchFile[curbatch];
- if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not rewind hash-join temporary file: %m")));
+ if (innerFile != NULL)
+ {
+ if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rewind hash-join temporary file: %m")));
+
+ while ((slot = ExecHashJoinGetSavedTuple(hjstate,
+ innerFile,
+ &hashvalue,
+ hjstate->hj_HashTupleSlot)))
+ {
+ /*
+ * NOTE: some tuples may be sent to future batches. Also, it is
+ * possible for hashtable->nbatch to be increased here!
+ */
+ ExecHashTableInsert(hashtable, slot, hashvalue);
+ }
+
+ /*
+ * after we build the hash table, the inner batch file is no longer
+ * needed
+ */
+ BufFileClose(innerFile);
+ hashtable->innerBatchFile[curbatch] = NULL;
+ }
/*
- * Reload the hash table with the new inner batch
+ * Rewind outer batch file (if present), so that we can start reading it.
*/
- ExecHashTableReset(hashtable, innerBatchSize[newbatch - 1]);
+ if (hashtable->outerBatchFile[curbatch] != NULL)
+ {
+ if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rewind hash-join temporary file: %m")));
+ }
- econtext = hjstate->js.ps.ps_ExprContext;
- innerhashkeys = hjstate->hj_InnerHashKeys;
+ return true;
+}
+
+/*
+ * Choose a batch to work on, and attach to it. Returns true if successful,
+ * false if there are no more batches.
+ */
+static bool
+ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int start_batchno;
+ int batchno;
+
+ /*
+ * If we started up so late that the batch tracking array has been freed
+ * already by ExecHashTableDetach(), then we are finished. See also
+ * ExecParallelHashEnsureBatchAccessors().
+ */
+ if (hashtable->batches == NULL)
+ return false;
- while ((slot = ExecHashJoinGetSavedTuple(hjstate,
- innerFile,
- hjstate->hj_HashTupleSlot))
- && !TupIsNull(slot))
+ /*
+ * If we were already attached to a batch, remember not to bother checking
+ * it again, and detach from it (possibly freeing the hash table if we are
+ * last to detach).
+ */
+ if (hashtable->curbatch >= 0)
{
- econtext->ecxt_innertuple = slot;
- ExecHashTableInsert(hashtable, econtext, innerhashkeys);
+ hashtable->batches[hashtable->curbatch].done = true;
+ ExecHashTableDetachBatch(hashtable);
}
/*
- * after we build the hash table, the inner batch file is no longer
- * needed
+ * Search for a batch that isn't done. We use an atomic counter to start
+ * our search at a different batch in every participant when there are
+ * more batches than participants.
*/
- BufFileClose(innerFile);
- hashtable->innerBatchFile[newbatch - 1] = NULL;
+ batchno = start_batchno =
+ pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
+ hashtable->nbatch;
+ do
+ {
+ uint32 hashvalue;
+ MinimalTuple tuple;
+ TupleTableSlot *slot;
+
+ if (!hashtable->batches[batchno].done)
+ {
+ SharedTuplestoreAccessor *inner_tuples;
+ Barrier *batch_barrier =
+ &hashtable->batches[batchno].shared->batch_barrier;
+
+ switch (BarrierAttach(batch_barrier))
+ {
+ case PHJ_BATCH_ELECTING:
+
+ /* One backend allocates the hash table. */
+ if (BarrierArriveAndWait(batch_barrier,
+ WAIT_EVENT_HASH_BATCH_ELECTING))
+ ExecParallelHashTableAlloc(hashtable, batchno);
+ /* Fall through. */
+
+ case PHJ_BATCH_ALLOCATING:
+ /* Wait for allocation to complete. */
+ BarrierArriveAndWait(batch_barrier,
+ WAIT_EVENT_HASH_BATCH_ALLOCATING);
+ /* Fall through. */
+
+ case PHJ_BATCH_LOADING:
+ /* Start (or join in) loading tuples. */
+ ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+ inner_tuples = hashtable->batches[batchno].inner_tuples;
+ sts_begin_parallel_scan(inner_tuples);
+ while ((tuple = sts_parallel_scan_next(inner_tuples,
+ &hashvalue)))
+ {
+ ExecForceStoreMinimalTuple(tuple,
+ hjstate->hj_HashTupleSlot,
+ false);
+ slot = hjstate->hj_HashTupleSlot;
+ ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
+ hashvalue);
+ }
+ sts_end_parallel_scan(inner_tuples);
+ BarrierArriveAndWait(batch_barrier,
+ WAIT_EVENT_HASH_BATCH_LOADING);
+ /* Fall through. */
+
+ case PHJ_BATCH_PROBING:
+
+ /*
+ * This batch is ready to probe. Return control to
+ * caller. We stay attached to batch_barrier so that the
+ * hash table stays alive until everyone's finished
+ * probing it, but no participant is allowed to wait at
+ * this barrier again (or else a deadlock could occur).
+ * All attached participants must eventually call
+ * BarrierArriveAndDetach() so that the final phase
+ * PHJ_BATCH_DONE can be reached.
+ */
+ ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+ sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
+ return true;
- hashtable->curbatch = newbatch;
- return newbatch;
+ case PHJ_BATCH_DONE:
+
+ /*
+ * Already done. Detach and go around again (if any
+ * remain).
+ */
+ BarrierDetach(batch_barrier);
+ hashtable->batches[batchno].done = true;
+ hashtable->curbatch = -1;
+ break;
+
+ default:
+ elog(ERROR, "unexpected batch phase %d",
+ BarrierPhase(batch_barrier));
+ }
+ }
+ batchno = (batchno + 1) % hashtable->nbatch;
+ } while (batchno != start_batchno);
+
+ return false;
}
-/* ----------------------------------------------------------------
- * ExecHashJoinSaveTuple
+/*
+ * ExecHashJoinSaveTuple
+ * save a tuple to a batch file.
*
- * save a tuple to a tmp file.
+ * The data recorded in the file for each tuple is its hash value,
+ * then the tuple in MinimalTuple format.
*
- * The data recorded in the file for each tuple is an image of its
- * HeapTupleData (with meaningless t_data pointer) followed by the
- * HeapTupleHeader and tuple data.
- * ----------------------------------------------------------------
+ * Note: it is important always to call this in the regular executor
+ * context, not in a shorter-lived context; else the temp file buffers
+ * will get messed up.
*/
-
void
-ExecHashJoinSaveTuple(HeapTuple heapTuple,
- BufFile *file)
+ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
+ BufFile **fileptr)
{
+ BufFile *file = *fileptr;
size_t written;
- written = BufFileWrite(file, (void *) heapTuple, sizeof(HeapTupleData));
- if (written != sizeof(HeapTupleData))
+ if (file == NULL)
+ {
+ /* First write to this batch file, so open it. */
+ file = BufFileCreateTemp(false);
+ *fileptr = file;
+ }
+
+ written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
+ if (written != sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to hash-join temporary file: %m")));
- written = BufFileWrite(file, (void *) heapTuple->t_data, heapTuple->t_len);
- if (written != (size_t) heapTuple->t_len)
+
+ written = BufFileWrite(file, (void *) tuple, tuple->t_len);
+ if (written != tuple->t_len)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to hash-join temporary file: %m")));
}
-void
-ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
+/*
+ * ExecHashJoinGetSavedTuple
+ * read the next tuple from a batch file. Return NULL if no more.
+ *
+ * On success, *hashvalue is set to the tuple's hash value, and the tuple
+ * itself is stored in the given slot.
+ */
+static TupleTableSlot *
+ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
+ BufFile *file,
+ uint32 *hashvalue,
+ TupleTableSlot *tupleSlot)
{
+ uint32 header[2];
+ size_t nread;
+ MinimalTuple tuple;
+
/*
- * If we haven't yet built the hash table then we can just return;
- * nothing done yet, so nothing to undo.
+ * We check for interrupts here because this is typically taken as an
+ * alternative code path to an ExecProcNode() call, which would include
+ * such a check.
*/
- if (!node->hj_hashdone)
- return;
- Assert(node->hj_HashTable != NULL);
+ CHECK_FOR_INTERRUPTS();
/*
- * In a multi-batch join, we currently have to do rescans the hard
- * way, primarily because batch temp files may have already been
- * released. But if it's a single-batch join, and there is no
- * parameter change for the inner subnode, then we can just re-use the
- * existing hash table without rebuilding it.
+ * Since both the hash value and the MinimalTuple length word are uint32,
+ * we can read them both in one BufFileRead() call without any type
+ * cheating.
*/
- if (node->hj_HashTable->nbatch == 0 &&
- ((PlanState *) node)->righttree->chgParam == NULL)
+ nread = BufFileRead(file, (void *) header, sizeof(header));
+ if (nread == 0) /* end of file */
{
- /* okay to reuse the hash table; needn't rescan inner, either */
+ ExecClearTuple(tupleSlot);
+ return NULL;
}
- else
+ if (nread != sizeof(header))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from hash-join temporary file: %m")));
+ *hashvalue = header[0];
+ tuple = (MinimalTuple) palloc(header[1]);
+ tuple->t_len = header[1];
+ nread = BufFileRead(file,
+ (void *) ((char *) tuple + sizeof(uint32)),
+ header[1] - sizeof(uint32));
+ if (nread != header[1] - sizeof(uint32))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from hash-join temporary file: %m")));
+ ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
+ return tupleSlot;
+}
+
+
+void
+ExecReScanHashJoin(HashJoinState *node)
+{
+ /*
+ * In a multi-batch join, we currently have to do rescans the hard way,
+ * primarily because batch temp files may have already been released. But
+ * if it's a single-batch join, and there is no parameter change for the
+ * inner subnode, then we can just re-use the existing hash table without
+ * rebuilding it.
+ */
+ if (node->hj_HashTable != NULL)
{
- /* must destroy and rebuild hash table */
- node->hj_hashdone = false;
- ExecHashTableDestroy(node->hj_HashTable);
- node->hj_HashTable = NULL;
+ if (node->hj_HashTable->nbatch == 1 &&
+ node->js.ps.righttree->chgParam == NULL)
+ {
+ /*
+ * Okay to reuse the hash table; needn't rescan inner, either.
+ *
+ * However, if it's a right/full join, we'd better reset the
+ * inner-tuple match flags contained in the table.
+ */
+ if (HJ_FILL_INNER(node))
+ ExecHashTableResetMatchFlags(node->hj_HashTable);
- /*
- * if chgParam of subnode is not null then plan will be re-scanned
- * by first ExecProcNode.
- */
- if (((PlanState *) node)->righttree->chgParam == NULL)
- ExecReScan(((PlanState *) node)->righttree, exprCtxt);
+ /*
+ * Also, we need to reset our state about the emptiness of the
+ * outer relation, so that the new scan of the outer will update
+ * it correctly if it turns out to be empty this time. (There's no
+ * harm in clearing it now because ExecHashJoin won't need the
+ * info. In the other cases, where the hash table doesn't exist
+ * or we are destroying it, we leave this state alone because
+ * ExecHashJoin will need it the first time through.)
+ */
+ node->hj_OuterNotEmpty = false;
+
+ /* ExecHashJoin can skip the BUILD_HASHTABLE step */
+ node->hj_JoinState = HJ_NEED_NEW_OUTER;
+ }
+ else
+ {
+ /* must destroy and rebuild hash table */
+ ExecHashTableDestroy(node->hj_HashTable);
+ node->hj_HashTable = NULL;
+ node->hj_JoinState = HJ_BUILD_HASHTABLE;
+
+ /*
+ * if chgParam of subnode is not null then plan will be re-scanned
+ * by first ExecProcNode.
+ */
+ if (node->js.ps.righttree->chgParam == NULL)
+ ExecReScan(node->js.ps.righttree);
+ }
}
/* Always reset intra-tuple state */
+ node->hj_CurHashValue = 0;
node->hj_CurBucketNo = 0;
+ node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
node->hj_CurTuple = NULL;
- node->js.ps.ps_OuterTupleSlot = NULL;
- node->js.ps.ps_TupFromTlist = false;
- node->hj_NeedNewOuter = true;
node->hj_MatchedOuter = false;
+ node->hj_FirstOuterTupleSlot = NULL;
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
- if (((PlanState *) node)->lefttree->chgParam == NULL)
- ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
+ if (node->js.ps.lefttree->chgParam == NULL)
+ ExecReScan(node->js.ps.lefttree);
+}
+
+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+ if (node->hj_HashTable)
+ {
+ /*
+ * Detach from shared state before DSM memory goes away. This makes
+ * sure that we don't have any pointers into DSM memory by the time
+ * ExecEndHashJoin runs.
+ */
+ ExecHashTableDetachBatch(node->hj_HashTable);
+ ExecHashTableDetach(node->hj_HashTable);
+ }
+}
+
+static void
+ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
+{
+ PlanState *outerState = outerPlanState(hjstate);
+ ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ TupleTableSlot *slot;
+ uint32 hashvalue;
+ int i;
+
+ Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
+
+ /* Execute outer plan, writing all tuples to shared tuplestores. */
+ for (;;)
+ {
+ slot = ExecProcNode(outerState);
+ if (TupIsNull(slot))
+ break;
+ econtext->ecxt_outertuple = slot;
+ if (ExecHashGetHashValue(hashtable, econtext,
+ hjstate->hj_OuterHashKeys,
+ true, /* outer tuple */
+ HJ_FILL_OUTER(hjstate),
+ &hashvalue))
+ {
+ int batchno;
+ int bucketno;
+ bool shouldFree;
+ MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
+
+ ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
+ &batchno);
+ sts_puttuple(hashtable->batches[batchno].outer_tuples,
+ &hashvalue, mintup);
+
+ if (shouldFree)
+ heap_free_minimal_tuple(mintup);
+ }
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /* Make sure all outer partitions are readable by any backend. */
+ for (i = 0; i < hashtable->nbatch; ++i)
+ sts_end_write(hashtable->batches[i].outer_tuples);
+}
+
+void
+ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
+{
+ shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+void
+ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
+{
+ int plan_node_id = state->js.ps.plan->plan_node_id;
+ HashState *hashNode;
+ ParallelHashJoinState *pstate;
+
+ /*
+ * Disable shared hash table mode if we failed to create a real DSM
+ * segment, because that means that we don't have a DSA area to work with.
+ */
+ if (pcxt->seg == NULL)
+ return;
+
+ ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
+
+ /*
+ * Set up the state needed to coordinate access to the shared hash
+ * table(s), using the plan node ID as the toc key.
+ */
+ pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
+ shm_toc_insert(pcxt->toc, plan_node_id, pstate);
+
+ /*
+ * Set up the shared hash join state with no batches initially.
+ * ExecHashTableCreate() will prepare at least one later and set nbatch
+ * and space_allowed.
+ */
+ pstate->nbatch = 0;
+ pstate->space_allowed = 0;
+ pstate->batches = InvalidDsaPointer;
+ pstate->old_batches = InvalidDsaPointer;
+ pstate->nbuckets = 0;
+ pstate->growth = PHJ_GROWTH_OK;
+ pstate->chunk_work_queue = InvalidDsaPointer;
+ pg_atomic_init_u32(&pstate->distributor, 0);
+ pstate->nparticipants = pcxt->nworkers + 1;
+ pstate->total_tuples = 0;
+ LWLockInitialize(&pstate->lock,
+ LWTRANCHE_PARALLEL_HASH_JOIN);
+ BarrierInit(&pstate->build_barrier, 0);
+ BarrierInit(&pstate->grow_batches_barrier, 0);
+ BarrierInit(&pstate->grow_buckets_barrier, 0);
+
+ /* Set up the space we'll use for shared temporary files. */
+ SharedFileSetInit(&pstate->fileset, pcxt->seg);
+
+ /* Initialize the shared state in the hash node. */
+ hashNode = (HashState *) innerPlanState(state);
+ hashNode->parallel_state = pstate;
+}
+
+/* ----------------------------------------------------------------
+ * ExecHashJoinReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
+{
+ int plan_node_id = state->js.ps.plan->plan_node_id;
+ ParallelHashJoinState *pstate =
+ shm_toc_lookup(cxt->toc, plan_node_id, false);
+
+ /*
+ * It would be possible to reuse the shared hash table in single-batch
+ * cases by resetting and then fast-forwarding build_barrier to
+ * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
+ * currently shared hash tables are already freed by now (by the last
+ * participant to detach from the batch). We could consider keeping it
+ * around for single-batch joins. We'd also need to adjust
+ * finalize_plan() so that it doesn't record a dummy dependency for
+ * Parallel Hash nodes, preventing the rescan optimization. For now we
+ * don't try.
+ */
+
+ /* Detach, freeing any remaining shared memory. */
+ if (state->hj_HashTable != NULL)
+ {
+ ExecHashTableDetachBatch(state->hj_HashTable);
+ ExecHashTableDetach(state->hj_HashTable);
+ }
+
+ /* Clear any shared batch files. */
+ SharedFileSetDeleteAll(&pstate->fileset);
+
+ /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
+ BarrierInit(&pstate->build_barrier, 0);
+}
+
+void
+ExecHashJoinInitializeWorker(HashJoinState *state,
+ ParallelWorkerContext *pwcxt)
+{
+ HashState *hashNode;
+ int plan_node_id = state->js.ps.plan->plan_node_id;
+ ParallelHashJoinState *pstate =
+ shm_toc_lookup(pwcxt->toc, plan_node_id, false);
+
+ /* Attach to the space for shared temporary files. */
+ SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
+
+ /* Attach to the shared state in the hash node. */
+ hashNode = (HashState *) innerPlanState(state);
+ hashNode->parallel_state = pstate;
+
+ ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}