]> granicus.if.org Git - postgresql/blobdiff - src/backend/executor/nodeHashjoin.c
Make some small planner API cleanups.
[postgresql] / src / backend / executor / nodeHashjoin.c
index 4749d353b99d6e2add568a3cde0891f299612e47..209870886400645312e74c33ae6dbecb5149a3bb 100644 (file)
  * nodeHashjoin.c
  *       Routines to handle hash join nodes
  *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/executor/nodeHashjoin.c,v 1.103 2010/01/02 16:57:41 momjian Exp $
+ *       src/backend/executor/nodeHashjoin.c
+ *
+ * PARALLELISM
+ *
+ * Hash joins can participate in parallel query execution in several ways.  A
+ * parallel-oblivious hash join is one where the node is unaware that it is
+ * part of a parallel plan.  In this case, a copy of the inner plan is used to
+ * build a copy of the hash table in every backend, and the outer plan could
+ * either be built from a partial or complete path, so that the results of the
+ * hash join are correspondingly either partial or complete.  A parallel-aware
+ * hash join is one that behaves differently, coordinating work between
+ * backends, and appears as Parallel Hash Join in EXPLAIN output.  A Parallel
+ * Hash Join always appears with a Parallel Hash node.
+ *
+ * Parallel-aware hash joins use the same per-backend state machine to track
+ * progress through the hash join algorithm as parallel-oblivious hash joins.
+ * In a parallel-aware hash join, there is also a shared state machine that
+ * co-operating backends use to synchronize their local state machines and
+ * program counters.  The shared state machine is managed with a Barrier IPC
+ * primitive.  When all attached participants arrive at a barrier, the phase
+ * advances and all waiting participants are released.
+ *
+ * When a participant begins working on a parallel hash join, it must first
+ * figure out how much progress has already been made, because participants
+ * don't wait for each other to begin.  For this reason there are switch
+ * statements at key points in the code where we have to synchronize our local
+ * state machine with the phase, and then jump to the correct part of the
+ * algorithm so that we can get started.
+ *
+ * One barrier called build_barrier is used to coordinate the hashing phases.
+ * The phase is represented by an integer which begins at zero and increments
+ * one by one, but in the code it is referred to by symbolic names as follows:
+ *
+ *   PHJ_BUILD_ELECTING              -- initial state
+ *   PHJ_BUILD_ALLOCATING            -- one sets up the batches and table 0
+ *   PHJ_BUILD_HASHING_INNER         -- all hash the inner rel
+ *   PHJ_BUILD_HASHING_OUTER         -- (multi-batch only) all hash the outer
+ *   PHJ_BUILD_DONE                  -- building done, probing can begin
+ *
+ * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
+ * be used repeatedly as required to coordinate expansions in the number of
+ * batches or buckets.  Their phases are as follows:
+ *
+ *   PHJ_GROW_BATCHES_ELECTING       -- initial state
+ *   PHJ_GROW_BATCHES_ALLOCATING     -- one allocates new batches
+ *   PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
+ *   PHJ_GROW_BATCHES_FINISHING      -- one cleans up, detects skew
+ *
+ *   PHJ_GROW_BUCKETS_ELECTING       -- initial state
+ *   PHJ_GROW_BUCKETS_ALLOCATING     -- one allocates new buckets
+ *   PHJ_GROW_BUCKETS_REINSERTING    -- all insert tuples
+ *
+ * If the planner got the number of batches and buckets right, those won't be
+ * necessary, but on the other hand we might finish up needing to expand the
+ * buckets or batches multiple times while hashing the inner relation to stay
+ * within our memory budget and load factor target.  For that reason it's a
+ * separate pair of barriers using circular phases.
+ *
+ * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
+ * because we need to divide the outer relation into batches up front in order
+ * to be able to process batches entirely independently.  In contrast, the
+ * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
+ * batches whenever it encounters them while scanning and probing, which it
+ * can do because it processes batches in serial order.
+ *
+ * Once PHJ_BUILD_DONE is reached, backends then split up and process
+ * different batches, or gang up and work together on probing batches if there
+ * aren't enough to go around.  For each batch there is a separate barrier
+ * with the following phases:
+ *
+ *  PHJ_BATCH_ELECTING       -- initial state
+ *  PHJ_BATCH_ALLOCATING     -- one allocates buckets
+ *  PHJ_BATCH_LOADING        -- all load the hash table from disk
+ *  PHJ_BATCH_PROBING        -- all probe
+ *  PHJ_BATCH_DONE           -- end
+ *
+ * Batch 0 is a special case, because it starts out in phase
+ * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
+ * PHJ_BUILD_HASHING_INNER so we can skip loading.
+ *
+ * Initially we try to plan for a single-batch hash join using the combined
+ * work_mem of all participants to create a large shared hash table.  If that
+ * turns out either at planning or execution time to be impossible then we
+ * fall back to regular work_mem sized hash tables.
+ *
+ * To avoid deadlocks, we never wait for any barrier unless it is known that
+ * all other backends attached to it are actively executing the node or have
+ * already arrived.  Practically, that means that we never return a tuple
+ * while attached to a barrier, unless the barrier has reached its final
+ * state.  In the slightly special case of the per-batch barrier, we return
+ * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
  *
  *-------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
+#include "access/htup_details.h"
+#include "access/parallel.h"
 #include "executor/executor.h"
 #include "executor/hashjoin.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "miscadmin.h"
+#include "pgstat.h"
 #include "utils/memutils.h"
+#include "utils/sharedtuplestore.h"
 
 
-/* Returns true for JOIN_LEFT and JOIN_ANTI jointypes */
-#define HASHJOIN_IS_OUTER(hjstate)     ((hjstate)->hj_NullInnerTupleSlot != NULL)
+/*
+ * States of the ExecHashJoin state machine
+ */
+#define HJ_BUILD_HASHTABLE             1
+#define HJ_NEED_NEW_OUTER              2
+#define HJ_SCAN_BUCKET                 3
+#define HJ_FILL_OUTER_TUPLE            4
+#define HJ_FILL_INNER_TUPLES   5
+#define HJ_NEED_NEW_BATCH              6
+
+/* Returns true if doing null-fill on outer relation */
+#define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
+/* Returns true if doing null-fill on inner relation */
+#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
 
 static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
                                                  HashJoinState *hjstate,
                                                  uint32 *hashvalue);
+static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
+                                                                 HashJoinState *hjstate,
+                                                                 uint32 *hashvalue);
 static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
                                                  BufFile *file,
                                                  uint32 *hashvalue,
                                                  TupleTableSlot *tupleSlot);
-static int     ExecHashJoinNewBatch(HashJoinState *hjstate);
+static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
+static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
+static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
 
 
 /* ----------------------------------------------------------------
- *             ExecHashJoin
+ *             ExecHashJoinImpl
  *
- *             This function implements the Hybrid Hashjoin algorithm.
+ *             This function implements the Hybrid Hashjoin algorithm.  It is marked
+ *             with an always-inline attribute so that ExecHashJoin() and
+ *             ExecParallelHashJoin() can inline it.  Compilers that respect the
+ *             attribute should create versions specialized for parallel == true and
+ *             parallel == false with unnecessary branches removed.
  *
  *             Note: the relation we build hash table on is the "inner"
  *                       the other one is "outer".
  * ----------------------------------------------------------------
  */
-TupleTableSlot *                               /* return: a tuple or NULL */
-ExecHashJoin(HashJoinState *node)
+static pg_attribute_always_inline TupleTableSlot *
+ExecHashJoinImpl(PlanState *pstate, bool parallel)
 {
-       EState     *estate;
+       HashJoinState *node = castNode(HashJoinState, pstate);
        PlanState  *outerNode;
        HashState  *hashNode;
-       List       *joinqual;
-       List       *otherqual;
-       TupleTableSlot *inntuple;
+       ExprState  *joinqual;
+       ExprState  *otherqual;
        ExprContext *econtext;
-       ExprDoneCond isDone;
        HashJoinTable hashtable;
-       HashJoinTuple curtuple;
        TupleTableSlot *outerTupleSlot;
        uint32          hashvalue;
        int                     batchno;
+       ParallelHashJoinState *parallel_state;
 
        /*
         * get information from HashJoin node
         */
-       estate = node->js.ps.state;
        joinqual = node->js.joinqual;
        otherqual = node->js.ps.qual;
        hashNode = (HashState *) innerPlanState(node);
        outerNode = outerPlanState(node);
-
-       /*
-        * get information from HashJoin state
-        */
        hashtable = node->hj_HashTable;
        econtext = node->js.ps.ps_ExprContext;
-
-       /*
-        * Check to see if we're still projecting out tuples from a previous join
-        * tuple (because there is a function-returning-set in the projection
-        * expressions).  If so, try to project another one.
-        */
-       if (node->js.ps.ps_TupFromTlist)
-       {
-               TupleTableSlot *result;
-
-               result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
-               if (isDone == ExprMultipleResult)
-                       return result;
-               /* Done with that source tuple... */
-               node->js.ps.ps_TupFromTlist = false;
-       }
+       parallel_state = hashNode->parallel_state;
 
        /*
         * Reset per-tuple memory context to free any expression evaluation
-        * storage allocated in the previous tuple cycle.  Note this can't happen
-        * until we're done projecting out tuples from a join tuple.
+        * storage allocated in the previous tuple cycle.
         */
        ResetExprContext(econtext);
 
        /*
-        * if this is the first call, build the hash table for inner relation
+        * run the hash join state machine
         */
-       if (hashtable == NULL)
+       for (;;)
        {
                /*
-                * If the outer relation is completely empty, we can quit without
-                * building the hash table.  However, for an inner join it is only a
-                * win to check this when the outer relation's startup cost is less
-                * than the projected cost of building the hash table.  Otherwise it's
-                * best to build the hash table first and see if the inner relation is
-                * empty.  (When it's an outer join, we should always make this check,
-                * since we aren't going to be able to skip the join on the strength
-                * of an empty inner relation anyway.)
-                *
-                * If we are rescanning the join, we make use of information gained on
-                * the previous scan: don't bother to try the prefetch if the previous
-                * scan found the outer relation nonempty.      This is not 100% reliable
-                * since with new parameters the outer relation might yield different
-                * results, but it's a good heuristic.
-                *
-                * The only way to make the check is to try to fetch a tuple from the
-                * outer plan node.  If we succeed, we have to stash it away for later
-                * consumption by ExecHashJoinOuterGetTuple.
+                * It's possible to iterate this loop many times before returning a
+                * tuple, in some pathological cases such as needing to move much of
+                * the current batch to a later batch.  So let's check for interrupts
+                * each time through.
                 */
-               if (HASHJOIN_IS_OUTER(node) ||
-                       (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
-                        !node->hj_OuterNotEmpty))
+               CHECK_FOR_INTERRUPTS();
+
+               switch (node->hj_JoinState)
                {
-                       node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
-                       if (TupIsNull(node->hj_FirstOuterTupleSlot))
-                       {
+                       case HJ_BUILD_HASHTABLE:
+
+                               /*
+                                * First time through: build hash table for inner relation.
+                                */
+                               Assert(hashtable == NULL);
+
+                               /*
+                                * If the outer relation is completely empty, and it's not
+                                * right/full join, we can quit without building the hash
+                                * table.  However, for an inner join it is only a win to
+                                * check this when the outer relation's startup cost is less
+                                * than the projected cost of building the hash table.
+                                * Otherwise it's best to build the hash table first and see
+                                * if the inner relation is empty.  (When it's a left join, we
+                                * should always make this check, since we aren't going to be
+                                * able to skip the join on the strength of an empty inner
+                                * relation anyway.)
+                                *
+                                * If we are rescanning the join, we make use of information
+                                * gained on the previous scan: don't bother to try the
+                                * prefetch if the previous scan found the outer relation
+                                * nonempty. This is not 100% reliable since with new
+                                * parameters the outer relation might yield different
+                                * results, but it's a good heuristic.
+                                *
+                                * The only way to make the check is to try to fetch a tuple
+                                * from the outer plan node.  If we succeed, we have to stash
+                                * it away for later consumption by ExecHashJoinOuterGetTuple.
+                                */
+                               if (HJ_FILL_INNER(node))
+                               {
+                                       /* no chance to not build the hash table */
+                                       node->hj_FirstOuterTupleSlot = NULL;
+                               }
+                               else if (parallel)
+                               {
+                                       /*
+                                        * The empty-outer optimization is not implemented for
+                                        * shared hash tables, because no one participant can
+                                        * determine that there are no outer tuples, and it's not
+                                        * yet clear that it's worth the synchronization overhead
+                                        * of reaching consensus to figure that out.  So we have
+                                        * to build the hash table.
+                                        */
+                                       node->hj_FirstOuterTupleSlot = NULL;
+                               }
+                               else if (HJ_FILL_OUTER(node) ||
+                                                (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
+                                                 !node->hj_OuterNotEmpty))
+                               {
+                                       node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
+                                       if (TupIsNull(node->hj_FirstOuterTupleSlot))
+                                       {
+                                               node->hj_OuterNotEmpty = false;
+                                               return NULL;
+                                       }
+                                       else
+                                               node->hj_OuterNotEmpty = true;
+                               }
+                               else
+                                       node->hj_FirstOuterTupleSlot = NULL;
+
+                               /*
+                                * Create the hash table.  If using Parallel Hash, then
+                                * whoever gets here first will create the hash table and any
+                                * later arrivals will merely attach to it.
+                                */
+                               hashtable = ExecHashTableCreate(hashNode,
+                                                                                               node->hj_HashOperators,
+                                                                                               HJ_FILL_INNER(node));
+                               node->hj_HashTable = hashtable;
+
+                               /*
+                                * Execute the Hash node, to build the hash table.  If using
+                                * Parallel Hash, then we'll try to help hashing unless we
+                                * arrived too late.
+                                */
+                               hashNode->hashtable = hashtable;
+                               (void) MultiExecProcNode((PlanState *) hashNode);
+
+                               /*
+                                * If the inner relation is completely empty, and we're not
+                                * doing a left outer join, we can quit without scanning the
+                                * outer relation.
+                                */
+                               if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
+                                       return NULL;
+
+                               /*
+                                * need to remember whether nbatch has increased since we
+                                * began scanning the outer relation
+                                */
+                               hashtable->nbatch_outstart = hashtable->nbatch;
+
+                               /*
+                                * Reset OuterNotEmpty for scan.  (It's OK if we fetched a
+                                * tuple above, because ExecHashJoinOuterGetTuple will
+                                * immediately set it again.)
+                                */
                                node->hj_OuterNotEmpty = false;
-                               return NULL;
-                       }
-                       else
-                               node->hj_OuterNotEmpty = true;
-               }
-               else
-                       node->hj_FirstOuterTupleSlot = NULL;
 
-               /*
-                * create the hash table
-                */
-               hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan,
-                                                                               node->hj_HashOperators);
-               node->hj_HashTable = hashtable;
+                               if (parallel)
+                               {
+                                       Barrier    *build_barrier;
 
-               /*
-                * execute the Hash node, to build the hash table
-                */
-               hashNode->hashtable = hashtable;
-               (void) MultiExecProcNode((PlanState *) hashNode);
+                                       build_barrier = &parallel_state->build_barrier;
+                                       Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
+                                                  BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+                                       if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
+                                       {
+                                               /*
+                                                * If multi-batch, we need to hash the outer relation
+                                                * up front.
+                                                */
+                                               if (hashtable->nbatch > 1)
+                                                       ExecParallelHashJoinPartitionOuter(node);
+                                               BarrierArriveAndWait(build_barrier,
+                                                                                        WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
+                                       }
+                                       Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
 
-               /*
-                * If the inner relation is completely empty, and we're not doing an
-                * outer join, we can quit without scanning the outer relation.
-                */
-               if (hashtable->totalTuples == 0 && !HASHJOIN_IS_OUTER(node))
-                       return NULL;
+                                       /* Each backend should now select a batch to work on. */
+                                       hashtable->curbatch = -1;
+                                       node->hj_JoinState = HJ_NEED_NEW_BATCH;
 
-               /*
-                * need to remember whether nbatch has increased since we began
-                * scanning the outer relation
-                */
-               hashtable->nbatch_outstart = hashtable->nbatch;
+                                       continue;
+                               }
+                               else
+                                       node->hj_JoinState = HJ_NEED_NEW_OUTER;
 
-               /*
-                * Reset OuterNotEmpty for scan.  (It's OK if we fetched a tuple
-                * above, because ExecHashJoinOuterGetTuple will immediately set it
-                * again.)
-                */
-               node->hj_OuterNotEmpty = false;
-       }
+                               /* FALL THRU */
 
-       /*
-        * run the hash join process
-        */
-       for (;;)
-       {
-               /*
-                * If we don't have an outer tuple, get the next one
-                */
-               if (node->hj_NeedNewOuter)
-               {
-                       outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
-                                                                                                          node,
-                                                                                                          &hashvalue);
-                       if (TupIsNull(outerTupleSlot))
-                       {
-                               /* end of join */
-                               return NULL;
-                       }
+                       case HJ_NEED_NEW_OUTER:
 
-                       econtext->ecxt_outertuple = outerTupleSlot;
-                       node->hj_NeedNewOuter = false;
-                       node->hj_MatchedOuter = false;
+                               /*
+                                * We don't have an outer tuple, try to get the next one
+                                */
+                               if (parallel)
+                                       outerTupleSlot =
+                                               ExecParallelHashJoinOuterGetTuple(outerNode, node,
+                                                                                                                 &hashvalue);
+                               else
+                                       outerTupleSlot =
+                                               ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
+
+                               if (TupIsNull(outerTupleSlot))
+                               {
+                                       /* end of batch, or maybe whole join */
+                                       if (HJ_FILL_INNER(node))
+                                       {
+                                               /* set up to scan for unmatched inner tuples */
+                                               ExecPrepHashTableForUnmatched(node);
+                                               node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+                                       }
+                                       else
+                                               node->hj_JoinState = HJ_NEED_NEW_BATCH;
+                                       continue;
+                               }
 
-                       /*
-                        * Now we have an outer tuple; find the corresponding bucket for
-                        * this tuple in the main hash table or skew hash table.
-                        */
-                       node->hj_CurHashValue = hashvalue;
-                       ExecHashGetBucketAndBatch(hashtable, hashvalue,
-                                                                         &node->hj_CurBucketNo, &batchno);
-                       node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
-                                                                                                                        hashvalue);
-                       node->hj_CurTuple = NULL;
+                               econtext->ecxt_outertuple = outerTupleSlot;
+                               node->hj_MatchedOuter = false;
 
-                       /*
-                        * Now we've got an outer tuple and the corresponding hash bucket,
-                        * but it might not belong to the current batch, or it might match
-                        * a skew bucket.
-                        */
-                       if (batchno != hashtable->curbatch &&
-                               node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
-                       {
                                /*
-                                * Need to postpone this outer tuple to a later batch. Save it
-                                * in the corresponding outer-batch file.
+                                * Find the corresponding bucket for this tuple in the main
+                                * hash table or skew hash table.
                                 */
-                               Assert(batchno > hashtable->curbatch);
-                               ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
-                                                                         hashvalue,
-                                                                         &hashtable->outerBatchFile[batchno]);
-                               node->hj_NeedNewOuter = true;
-                               continue;               /* loop around for a new outer tuple */
-                       }
-               }
+                               node->hj_CurHashValue = hashvalue;
+                               ExecHashGetBucketAndBatch(hashtable, hashvalue,
+                                                                                 &node->hj_CurBucketNo, &batchno);
+                               node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
+                                                                                                                                hashvalue);
+                               node->hj_CurTuple = NULL;
 
-               /*
-                * OK, scan the selected hash bucket for matches
-                */
-               for (;;)
-               {
-                       curtuple = ExecScanHashBucket(node, econtext);
-                       if (curtuple == NULL)
-                               break;                  /* out of matches */
+                               /*
+                                * The tuple might not belong to the current batch (where
+                                * "current batch" includes the skew buckets if any).
+                                */
+                               if (batchno != hashtable->curbatch &&
+                                       node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
+                               {
+                                       bool            shouldFree;
+                                       MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
+                                                                                                                                         &shouldFree);
+
+                                       /*
+                                        * Need to postpone this outer tuple to a later batch.
+                                        * Save it in the corresponding outer-batch file.
+                                        */
+                                       Assert(parallel_state == NULL);
+                                       Assert(batchno > hashtable->curbatch);
+                                       ExecHashJoinSaveTuple(mintuple, hashvalue,
+                                                                                 &hashtable->outerBatchFile[batchno]);
+
+                                       if (shouldFree)
+                                               heap_free_minimal_tuple(mintuple);
+
+                                       /* Loop around, staying in HJ_NEED_NEW_OUTER state */
+                                       continue;
+                               }
 
-                       /*
-                        * we've got a match, but still need to test non-hashed quals
-                        */
-                       inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(curtuple),
-                                                                                        node->hj_HashTupleSlot,
-                                                                                        false);        /* don't pfree */
-                       econtext->ecxt_innertuple = inntuple;
+                               /* OK, let's scan the bucket for matches */
+                               node->hj_JoinState = HJ_SCAN_BUCKET;
 
-                       /* reset temp memory each time to avoid leaks from qual expr */
-                       ResetExprContext(econtext);
+                               /* FALL THRU */
 
-                       /*
-                        * if we pass the qual, then save state for next call and have
-                        * ExecProject form the projection, store it in the tuple table,
-                        * and return the slot.
-                        *
-                        * Only the joinquals determine MatchedOuter status, but all quals
-                        * must pass to actually return the tuple.
-                        */
-                       if (joinqual == NIL || ExecQual(joinqual, econtext, false))
-                       {
-                               node->hj_MatchedOuter = true;
+                       case HJ_SCAN_BUCKET:
 
-                               /* In an antijoin, we never return a matched tuple */
-                               if (node->js.jointype == JOIN_ANTI)
+                               /*
+                                * Scan the selected hash bucket for matches to current outer
+                                */
+                               if (parallel)
                                {
-                                       node->hj_NeedNewOuter = true;
-                                       break;          /* out of loop over hash bucket */
+                                       if (!ExecParallelScanHashBucket(node, econtext))
+                                       {
+                                               /* out of matches; check for possible outer-join fill */
+                                               node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+                                               continue;
+                                       }
+                               }
+                               else
+                               {
+                                       if (!ExecScanHashBucket(node, econtext))
+                                       {
+                                               /* out of matches; check for possible outer-join fill */
+                                               node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+                                               continue;
+                                       }
                                }
 
                                /*
-                                * In a semijoin, we'll consider returning the first match,
-                                * but after that we're done with this outer tuple.
+                                * We've got a match, but still need to test non-hashed quals.
+                                * ExecScanHashBucket already set up all the state needed to
+                                * call ExecQual.
+                                *
+                                * If we pass the qual, then save state for next call and have
+                                * ExecProject form the projection, store it in the tuple
+                                * table, and return the slot.
+                                *
+                                * Only the joinquals determine tuple match status, but all
+                                * quals must pass to actually return the tuple.
                                 */
-                               if (node->js.jointype == JOIN_SEMI)
-                                       node->hj_NeedNewOuter = true;
-
-                               if (otherqual == NIL || ExecQual(otherqual, econtext, false))
+                               if (joinqual == NULL || ExecQual(joinqual, econtext))
                                {
-                                       TupleTableSlot *result;
-
-                                       result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
+                                       node->hj_MatchedOuter = true;
+                                       HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
 
-                                       if (isDone != ExprEndResult)
+                                       /* In an antijoin, we never return a matched tuple */
+                                       if (node->js.jointype == JOIN_ANTI)
                                        {
-                                               node->js.ps.ps_TupFromTlist =
-                                                       (isDone == ExprMultipleResult);
-                                               return result;
+                                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
+                                               continue;
                                        }
+
+                                       /*
+                                        * If we only need to join to the first matching inner
+                                        * tuple, then consider returning this one, but after that
+                                        * continue with next outer tuple.
+                                        */
+                                       if (node->js.single_match)
+                                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
+
+                                       if (otherqual == NULL || ExecQual(otherqual, econtext))
+                                               return ExecProject(node->js.ps.ps_ProjInfo);
+                                       else
+                                               InstrCountFiltered2(node, 1);
                                }
+                               else
+                                       InstrCountFiltered1(node, 1);
+                               break;
+
+                       case HJ_FILL_OUTER_TUPLE:
 
                                /*
-                                * If semijoin and we didn't return the tuple, we're still
-                                * done with this outer tuple.
+                                * The current outer tuple has run out of matches, so check
+                                * whether to emit a dummy outer-join tuple.  Whether we emit
+                                * one or not, the next state is NEED_NEW_OUTER.
                                 */
-                               if (node->js.jointype == JOIN_SEMI)
-                                       break;          /* out of loop over hash bucket */
-                       }
-               }
+                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
 
-               /*
-                * Now the current outer tuple has run out of matches, so check
-                * whether to emit a dummy outer-join tuple. If not, loop around to
-                * get a new outer tuple.
-                */
-               node->hj_NeedNewOuter = true;
+                               if (!node->hj_MatchedOuter &&
+                                       HJ_FILL_OUTER(node))
+                               {
+                                       /*
+                                        * Generate a fake join tuple with nulls for the inner
+                                        * tuple, and return it if it passes the non-join quals.
+                                        */
+                                       econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
+
+                                       if (otherqual == NULL || ExecQual(otherqual, econtext))
+                                               return ExecProject(node->js.ps.ps_ProjInfo);
+                                       else
+                                               InstrCountFiltered2(node, 1);
+                               }
+                               break;
 
-               if (!node->hj_MatchedOuter &&
-                       HASHJOIN_IS_OUTER(node))
-               {
-                       /*
-                        * We are doing an outer join and there were no join matches for
-                        * this outer tuple.  Generate a fake join tuple with nulls for
-                        * the inner tuple, and return it if it passes the non-join quals.
-                        */
-                       econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
+                       case HJ_FILL_INNER_TUPLES:
+
+                               /*
+                                * We have finished a batch, but we are doing right/full join,
+                                * so any unmatched inner tuples in the hashtable have to be
+                                * emitted before we continue to the next batch.
+                                */
+                               if (!ExecScanHashTableForUnmatched(node, econtext))
+                               {
+                                       /* no more unmatched tuples */
+                                       node->hj_JoinState = HJ_NEED_NEW_BATCH;
+                                       continue;
+                               }
 
-                       if (otherqual == NIL || ExecQual(otherqual, econtext, false))
-                       {
                                /*
-                                * qualification was satisfied so we project and return the
-                                * slot containing the result tuple using ExecProject().
+                                * Generate a fake join tuple with nulls for the outer tuple,
+                                * and return it if it passes the non-join quals.
                                 */
-                               TupleTableSlot *result;
+                               econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
+
+                               if (otherqual == NULL || ExecQual(otherqual, econtext))
+                                       return ExecProject(node->js.ps.ps_ProjInfo);
+                               else
+                                       InstrCountFiltered2(node, 1);
+                               break;
 
-                               result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
+                       case HJ_NEED_NEW_BATCH:
 
-                               if (isDone != ExprEndResult)
+                               /*
+                                * Try to advance to next batch.  Done if there are no more.
+                                */
+                               if (parallel)
                                {
-                                       node->js.ps.ps_TupFromTlist =
-                                               (isDone == ExprMultipleResult);
-                                       return result;
+                                       if (!ExecParallelHashJoinNewBatch(node))
+                                               return NULL;    /* end of parallel-aware join */
                                }
-                       }
+                               else
+                               {
+                                       if (!ExecHashJoinNewBatch(node))
+                                               return NULL;    /* end of parallel-oblivious join */
+                               }
+                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
+                               break;
+
+                       default:
+                               elog(ERROR, "unrecognized hashjoin state: %d",
+                                        (int) node->hj_JoinState);
                }
        }
 }
 
+/* ----------------------------------------------------------------
+ *             ExecHashJoin
+ *
+ *             Parallel-oblivious version.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot *                        /* return: a tuple or NULL */
+ExecHashJoin(PlanState *pstate)
+{
+       /*
+        * On sufficiently smart compilers this should be inlined with the
+        * parallel-aware branches removed.
+        */
+       return ExecHashJoinImpl(pstate, false);
+}
+
+/* ----------------------------------------------------------------
+ *             ExecParallelHashJoin
+ *
+ *             Parallel-aware version.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot *                        /* return: a tuple or NULL */
+ExecParallelHashJoin(PlanState *pstate)
+{
+       /*
+        * On sufficiently smart compilers this should be inlined with the
+        * parallel-oblivious branches removed.
+        */
+       return ExecHashJoinImpl(pstate, true);
+}
+
 /* ----------------------------------------------------------------
  *             ExecInitHashJoin
  *
@@ -350,8 +601,12 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
        Hash       *hashNode;
        List       *lclauses;
        List       *rclauses;
+       List       *rhclauses;
        List       *hoperators;
+       TupleDesc       outerDesc,
+                               innerDesc;
        ListCell   *l;
+       const TupleTableSlotOps *ops;
 
        /* check for unsupported flags */
        Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@@ -363,6 +618,14 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
        hjstate->js.ps.plan = (Plan *) node;
        hjstate->js.ps.state = estate;
 
+       /*
+        * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
+        * where this function may be replaced with a parallel version, if we
+        * managed to launch a parallel query.
+        */
+       hjstate->js.ps.ExecProcNode = ExecHashJoin;
+       hjstate->js.jointype = node->join.jointype;
+
        /*
         * Miscellaneous initialization
         *
@@ -370,23 +633,6 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
         */
        ExecAssignExprContext(estate, &hjstate->js.ps);
 
-       /*
-        * initialize child expressions
-        */
-       hjstate->js.ps.targetlist = (List *)
-               ExecInitExpr((Expr *) node->join.plan.targetlist,
-                                        (PlanState *) hjstate);
-       hjstate->js.ps.qual = (List *)
-               ExecInitExpr((Expr *) node->join.plan.qual,
-                                        (PlanState *) hjstate);
-       hjstate->js.jointype = node->join.jointype;
-       hjstate->js.joinqual = (List *)
-               ExecInitExpr((Expr *) node->join.joinqual,
-                                        (PlanState *) hjstate);
-       hjstate->hashclauses = (List *)
-               ExecInitExpr((Expr *) node->hashclauses,
-                                        (PlanState *) hjstate);
-
        /*
         * initialize child nodes
         *
@@ -398,15 +644,30 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
        hashNode = (Hash *) innerPlan(node);
 
        outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
+       outerDesc = ExecGetResultType(outerPlanState(hjstate));
        innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
+       innerDesc = ExecGetResultType(innerPlanState(hjstate));
+
+       /*
+        * Initialize result slot, type and projection.
+        */
+       ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
+       ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
 
        /*
         * tuple table initialization
         */
-       ExecInitResultTupleSlot(estate, &hjstate->js.ps);
-       hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate);
+       ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
+       hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
+                                                                                                               ops);
+
+       /*
+        * detect whether we need only consider the first matching inner tuple
+        */
+       hjstate->js.single_match = (node->join.inner_unique ||
+                                                               node->join.jointype == JOIN_SEMI);
 
-       /* note: HASHJOIN_IS_OUTER macro depends on this initialization */
+       /* set up null tuples for outer joins, if needed */
        switch (node->join.jointype)
        {
                case JOIN_INNER:
@@ -415,8 +676,17 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
                case JOIN_LEFT:
                case JOIN_ANTI:
                        hjstate->hj_NullInnerTupleSlot =
-                               ExecInitNullTupleSlot(estate,
-                                                                ExecGetResultType(innerPlanState(hjstate)));
+                               ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
+                       break;
+               case JOIN_RIGHT:
+                       hjstate->hj_NullOuterTupleSlot =
+                               ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
+                       break;
+               case JOIN_FULL:
+                       hjstate->hj_NullOuterTupleSlot =
+                               ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
+                       hjstate->hj_NullInnerTupleSlot =
+                               ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
                        break;
                default:
                        elog(ERROR, "unrecognized join type: %d",
@@ -425,10 +695,10 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
 
        /*
         * now for some voodoo.  our temporary tuple slot is actually the result
-        * tuple slot of the Hash node (which is our inner plan).  we do this
+        * tuple slot of the Hash node (which is our inner plan).  we can do this
         * because Hash nodes don't return tuples via ExecProcNode() -- instead
         * the hash join node uses ExecScanHashBucket() to get at the contents of
-        * the hash table.      -cim 6/9/91
+        * the hash table.  -cim 6/9/91
         */
        {
                HashState  *hashstate = (HashState *) innerPlanState(hjstate);
@@ -438,13 +708,14 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
        }
 
        /*
-        * initialize tuple type and projection info
+        * initialize child expressions
         */
-       ExecAssignResultTypeFromTL(&hjstate->js.ps);
-       ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
-
-       ExecSetSlotDescriptor(hjstate->hj_OuterTupleSlot,
-                                                 ExecGetResultType(outerPlanState(hjstate)));
+       hjstate->js.ps.qual =
+               ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
+       hjstate->js.joinqual =
+               ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
+       hjstate->hashclauses =
+               ExecInitQual(node->hashclauses, (PlanState *) hjstate);
 
        /*
         * initialize hash-specific info
@@ -465,27 +736,27 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
         */
        lclauses = NIL;
        rclauses = NIL;
+       rhclauses = NIL;
        hoperators = NIL;
-       foreach(l, hjstate->hashclauses)
+       foreach(l, node->hashclauses)
        {
-               FuncExprState *fstate = (FuncExprState *) lfirst(l);
-               OpExpr     *hclause;
-
-               Assert(IsA(fstate, FuncExprState));
-               hclause = (OpExpr *) fstate->xprstate.expr;
-               Assert(IsA(hclause, OpExpr));
-               lclauses = lappend(lclauses, linitial(fstate->args));
-               rclauses = lappend(rclauses, lsecond(fstate->args));
+               OpExpr     *hclause = lfirst_node(OpExpr, l);
+
+               lclauses = lappend(lclauses, ExecInitExpr(linitial(hclause->args),
+                                                                                                 (PlanState *) hjstate));
+               rclauses = lappend(rclauses, ExecInitExpr(lsecond(hclause->args),
+                                                                                                 (PlanState *) hjstate));
+               rhclauses = lappend(rhclauses, ExecInitExpr(lsecond(hclause->args),
+                                                                                                  innerPlanState(hjstate)));
                hoperators = lappend_oid(hoperators, hclause->opno);
        }
        hjstate->hj_OuterHashKeys = lclauses;
        hjstate->hj_InnerHashKeys = rclauses;
        hjstate->hj_HashOperators = hoperators;
        /* child Hash node needs to evaluate inner hash keys, too */
-       ((HashState *) innerPlanState(hjstate))->hashkeys = rclauses;
+       ((HashState *) innerPlanState(hjstate))->hashkeys = rhclauses;
 
-       hjstate->js.ps.ps_TupFromTlist = false;
-       hjstate->hj_NeedNewOuter = true;
+       hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
        hjstate->hj_MatchedOuter = false;
        hjstate->hj_OuterNotEmpty = false;
 
@@ -532,13 +803,14 @@ ExecEndHashJoin(HashJoinState *node)
 /*
  * ExecHashJoinOuterGetTuple
  *
- *             get the next outer tuple for hashjoin: either by
- *             executing a plan node in the first pass, or from
- *             the temp files for the hashjoin batches.
+ *             get the next outer tuple for a parallel oblivious hashjoin: either by
+ *             executing the outer plan node in the first pass, or from the temp
+ *             files for the hashjoin batches.
  *
- * Returns a null slot if no more outer tuples.  On success, the tuple's
- * hash value is stored at *hashvalue --- this is either originally computed,
- * or re-read from the temp file.
+ * Returns a null slot if no more outer tuples (within the current batch).
+ *
+ * On success, the tuple's hash value is stored at *hashvalue --- this is
+ * either originally computed, or re-read from the temp file.
  */
 static TupleTableSlot *
 ExecHashJoinOuterGetTuple(PlanState *outerNode,
@@ -571,8 +843,8 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
                        econtext->ecxt_outertuple = slot;
                        if (ExecHashGetHashValue(hashtable, econtext,
                                                                         hjstate->hj_OuterHashKeys,
-                                                                        true,          /* outer tuple */
-                                                                        HASHJOIN_IS_OUTER(hjstate),
+                                                                        true,  /* outer tuple */
+                                                                        HJ_FILL_OUTER(hjstate),
                                                                         hashvalue))
                        {
                                /* remember outer relation is not empty for possible rescan */
@@ -587,31 +859,89 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
                         */
                        slot = ExecProcNode(outerNode);
                }
+       }
+       else if (curbatch < hashtable->nbatch)
+       {
+               BufFile    *file = hashtable->outerBatchFile[curbatch];
 
                /*
-                * We have just reached the end of the first pass. Try to switch to a
-                * saved batch.
+                * In outer-join cases, we could get here even though the batch file
+                * is empty.
                 */
-               curbatch = ExecHashJoinNewBatch(hjstate);
-       }
+               if (file == NULL)
+                       return NULL;
 
-       /*
-        * Try to read from a temp file. Loop allows us to advance to new batches
-        * as needed.  NOTE: nbatch could increase inside ExecHashJoinNewBatch, so
-        * don't try to optimize this loop.
-        */
-       while (curbatch < hashtable->nbatch)
-       {
                slot = ExecHashJoinGetSavedTuple(hjstate,
-                                                                                hashtable->outerBatchFile[curbatch],
+                                                                                file,
                                                                                 hashvalue,
                                                                                 hjstate->hj_OuterTupleSlot);
                if (!TupIsNull(slot))
                        return slot;
-               curbatch = ExecHashJoinNewBatch(hjstate);
        }
 
-       /* Out of batches... */
+       /* End of this batch */
+       return NULL;
+}
+
+/*
+ * ExecHashJoinOuterGetTuple variant for the parallel case.
+ */
+static TupleTableSlot *
+ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
+                                                                 HashJoinState *hjstate,
+                                                                 uint32 *hashvalue)
+{
+       HashJoinTable hashtable = hjstate->hj_HashTable;
+       int                     curbatch = hashtable->curbatch;
+       TupleTableSlot *slot;
+
+       /*
+        * In the Parallel Hash case we only run the outer plan directly for
+        * single-batch hash joins.  Otherwise we have to go to batch files, even
+        * for batch 0.
+        */
+       if (curbatch == 0 && hashtable->nbatch == 1)
+       {
+               slot = ExecProcNode(outerNode);
+
+               while (!TupIsNull(slot))
+               {
+                       ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+
+                       econtext->ecxt_outertuple = slot;
+                       if (ExecHashGetHashValue(hashtable, econtext,
+                                                                        hjstate->hj_OuterHashKeys,
+                                                                        true,  /* outer tuple */
+                                                                        HJ_FILL_OUTER(hjstate),
+                                                                        hashvalue))
+                               return slot;
+
+                       /*
+                        * That tuple couldn't match because of a NULL, so discard it and
+                        * continue with the next one.
+                        */
+                       slot = ExecProcNode(outerNode);
+               }
+       }
+       else if (curbatch < hashtable->nbatch)
+       {
+               MinimalTuple tuple;
+
+               tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
+                                                                          hashvalue);
+               if (tuple != NULL)
+               {
+                       ExecForceStoreMinimalTuple(tuple,
+                                                                          hjstate->hj_OuterTupleSlot,
+                                                                          false);
+                       slot = hjstate->hj_OuterTupleSlot;
+                       return slot;
+               }
+               else
+                       ExecClearTuple(hjstate->hj_OuterTupleSlot);
+       }
+
+       /* End of this batch */
        return NULL;
 }
 
@@ -619,10 +949,9 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
  * ExecHashJoinNewBatch
  *             switch to a new hashjoin batch
  *
- * Returns the number of the new batch (1..nbatch-1), or nbatch if no more.
- * We will never return a batch number that has an empty outer batch file.
+ * Returns true if successful, false if there are no more batches.
  */
-static int
+static bool
 ExecHashJoinNewBatch(HashJoinState *hjstate)
 {
        HashJoinTable hashtable = hjstate->hj_HashTable;
@@ -632,7 +961,6 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
        TupleTableSlot *slot;
        uint32          hashvalue;
 
-start_over:
        nbatch = hashtable->nbatch;
        curbatch = hashtable->curbatch;
 
@@ -646,7 +974,7 @@ start_over:
                        BufFileClose(hashtable->outerBatchFile[curbatch]);
                hashtable->outerBatchFile[curbatch] = NULL;
        }
-       else    /* we just finished the first batch */
+       else                                            /* we just finished the first batch */
        {
                /*
                 * Reset some of the skew optimization state variables, since we no
@@ -657,6 +985,7 @@ start_over:
                hashtable->skewEnabled = false;
                hashtable->skewBucket = NULL;
                hashtable->skewBucketNums = NULL;
+               hashtable->nSkewBuckets = 0;
                hashtable->spaceUsedSkew = 0;
        }
 
@@ -665,8 +994,9 @@ start_over:
         * sides.  We can sometimes skip over batches that are empty on only one
         * side, but there are exceptions:
         *
-        * 1. In an outer join, we have to process outer batches even if the inner
-        * batch is empty.
+        * 1. In a left/full outer join, we have to process outer batches even if
+        * the inner batch is empty.  Similarly, in a right/full outer join, we
+        * have to process inner batches even if the outer batch is empty.
         *
         * 2. If we have increased nbatch since the initial estimate, we have to
         * scan inner batches since they might contain tuples that need to be
@@ -682,7 +1012,10 @@ start_over:
                        hashtable->innerBatchFile[curbatch] == NULL))
        {
                if (hashtable->outerBatchFile[curbatch] &&
-                       HASHJOIN_IS_OUTER(hjstate))
+                       HJ_FILL_OUTER(hjstate))
+                       break;                          /* must process due to rule 1 */
+               if (hashtable->innerBatchFile[curbatch] &&
+                       HJ_FILL_INNER(hjstate))
                        break;                          /* must process due to rule 1 */
                if (hashtable->innerBatchFile[curbatch] &&
                        nbatch != hashtable->nbatch_original)
@@ -702,7 +1035,7 @@ start_over:
        }
 
        if (curbatch >= nbatch)
-               return curbatch;                /* no more batches */
+               return false;                   /* no more batches */
 
        hashtable->curbatch = curbatch;
 
@@ -718,7 +1051,7 @@ start_over:
                if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
                        ereport(ERROR,
                                        (errcode_for_file_access(),
-                                  errmsg("could not rewind hash-join temporary file: %m")));
+                                        errmsg("could not rewind hash-join temporary file: %m")));
 
                while ((slot = ExecHashJoinGetSavedTuple(hjstate,
                                                                                                 innerFile,
@@ -741,20 +1074,141 @@ start_over:
        }
 
        /*
-        * If there's no outer batch file, advance to next batch.
+        * Rewind outer batch file (if present), so that we can start reading it.
         */
-       if (hashtable->outerBatchFile[curbatch] == NULL)
-               goto start_over;
+       if (hashtable->outerBatchFile[curbatch] != NULL)
+       {
+               if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not rewind hash-join temporary file: %m")));
+       }
+
+       return true;
+}
+
+/*
+ * Choose a batch to work on, and attach to it.  Returns true if successful,
+ * false if there are no more batches.
+ */
+static bool
+ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
+{
+       HashJoinTable hashtable = hjstate->hj_HashTable;
+       int                     start_batchno;
+       int                     batchno;
 
        /*
-        * Rewind outer batch file, so that we can start reading it.
+        * If we started up so late that the batch tracking array has been freed
+        * already by ExecHashTableDetach(), then we are finished.  See also
+        * ExecParallelHashEnsureBatchAccessors().
         */
-       if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not rewind hash-join temporary file: %m")));
+       if (hashtable->batches == NULL)
+               return false;
+
+       /*
+        * If we were already attached to a batch, remember not to bother checking
+        * it again, and detach from it (possibly freeing the hash table if we are
+        * last to detach).
+        */
+       if (hashtable->curbatch >= 0)
+       {
+               hashtable->batches[hashtable->curbatch].done = true;
+               ExecHashTableDetachBatch(hashtable);
+       }
 
-       return curbatch;
+       /*
+        * Search for a batch that isn't done.  We use an atomic counter to start
+        * our search at a different batch in every participant when there are
+        * more batches than participants.
+        */
+       batchno = start_batchno =
+               pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
+               hashtable->nbatch;
+       do
+       {
+               uint32          hashvalue;
+               MinimalTuple tuple;
+               TupleTableSlot *slot;
+
+               if (!hashtable->batches[batchno].done)
+               {
+                       SharedTuplestoreAccessor *inner_tuples;
+                       Barrier    *batch_barrier =
+                       &hashtable->batches[batchno].shared->batch_barrier;
+
+                       switch (BarrierAttach(batch_barrier))
+                       {
+                               case PHJ_BATCH_ELECTING:
+
+                                       /* One backend allocates the hash table. */
+                                       if (BarrierArriveAndWait(batch_barrier,
+                                                                                        WAIT_EVENT_HASH_BATCH_ELECTING))
+                                               ExecParallelHashTableAlloc(hashtable, batchno);
+                                       /* Fall through. */
+
+                               case PHJ_BATCH_ALLOCATING:
+                                       /* Wait for allocation to complete. */
+                                       BarrierArriveAndWait(batch_barrier,
+                                                                                WAIT_EVENT_HASH_BATCH_ALLOCATING);
+                                       /* Fall through. */
+
+                               case PHJ_BATCH_LOADING:
+                                       /* Start (or join in) loading tuples. */
+                                       ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+                                       inner_tuples = hashtable->batches[batchno].inner_tuples;
+                                       sts_begin_parallel_scan(inner_tuples);
+                                       while ((tuple = sts_parallel_scan_next(inner_tuples,
+                                                                                                                  &hashvalue)))
+                                       {
+                                               ExecForceStoreMinimalTuple(tuple,
+                                                                                                  hjstate->hj_HashTupleSlot,
+                                                                                                  false);
+                                               slot = hjstate->hj_HashTupleSlot;
+                                               ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
+                                                                                                                               hashvalue);
+                                       }
+                                       sts_end_parallel_scan(inner_tuples);
+                                       BarrierArriveAndWait(batch_barrier,
+                                                                                WAIT_EVENT_HASH_BATCH_LOADING);
+                                       /* Fall through. */
+
+                               case PHJ_BATCH_PROBING:
+
+                                       /*
+                                        * This batch is ready to probe.  Return control to
+                                        * caller. We stay attached to batch_barrier so that the
+                                        * hash table stays alive until everyone's finished
+                                        * probing it, but no participant is allowed to wait at
+                                        * this barrier again (or else a deadlock could occur).
+                                        * All attached participants must eventually call
+                                        * BarrierArriveAndDetach() so that the final phase
+                                        * PHJ_BATCH_DONE can be reached.
+                                        */
+                                       ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+                                       sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
+                                       return true;
+
+                               case PHJ_BATCH_DONE:
+
+                                       /*
+                                        * Already done.  Detach and go around again (if any
+                                        * remain).
+                                        */
+                                       BarrierDetach(batch_barrier);
+                                       hashtable->batches[batchno].done = true;
+                                       hashtable->curbatch = -1;
+                                       break;
+
+                               default:
+                                       elog(ERROR, "unexpected batch phase %d",
+                                                BarrierPhase(batch_barrier));
+                       }
+               }
+               batchno = (batchno + 1) % hashtable->nbatch;
+       } while (batchno != start_batchno);
+
+       return false;
 }
 
 /*
@@ -797,7 +1251,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
 
 /*
  * ExecHashJoinGetSavedTuple
- *             read the next tuple from a batch file.  Return NULL if no more.
+ *             read the next tuple from a batch file.  Return NULL if no more.
  *
  * On success, *hashvalue is set to the tuple's hash value, and the tuple
  * itself is stored in the given slot.
@@ -812,6 +1266,13 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
        size_t          nread;
        MinimalTuple tuple;
 
+       /*
+        * We check for interrupts here because this is typically taken as an
+        * alternative code path to an ExecProcNode() call, which would include
+        * such a check.
+        */
+       CHECK_FOR_INTERRUPTS();
+
        /*
         * Since both the hash value and the MinimalTuple length word are uint32,
         * we can read them both in one BufFileRead() call without any type
@@ -837,12 +1298,13 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not read from hash-join temporary file: %m")));
-       return ExecStoreMinimalTuple(tuple, tupleSlot, true);
+       ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
+       return tupleSlot;
 }
 
 
 void
-ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
+ExecReScanHashJoin(HashJoinState *node)
 {
        /*
         * In a multi-batch join, we currently have to do rescans the hard way,
@@ -854,34 +1316,44 @@ ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
        if (node->hj_HashTable != NULL)
        {
                if (node->hj_HashTable->nbatch == 1 &&
-                       ((PlanState *) node)->righttree->chgParam == NULL)
+                       node->js.ps.righttree->chgParam == NULL)
                {
                        /*
-                        * okay to reuse the hash table; needn't rescan inner, either.
+                        * Okay to reuse the hash table; needn't rescan inner, either.
                         *
-                        * What we do need to do is reset our state about the emptiness of
-                        * the outer relation, so that the new scan of the outer will
-                        * update it correctly if it turns out to be empty this time.
-                        * (There's no harm in clearing it now because ExecHashJoin won't
-                        * need the info.  In the other cases, where the hash table
-                        * doesn't exist or we are destroying it, we leave this state
-                        * alone because ExecHashJoin will need it the first time
-                        * through.)
+                        * However, if it's a right/full join, we'd better reset the
+                        * inner-tuple match flags contained in the table.
+                        */
+                       if (HJ_FILL_INNER(node))
+                               ExecHashTableResetMatchFlags(node->hj_HashTable);
+
+                       /*
+                        * Also, we need to reset our state about the emptiness of the
+                        * outer relation, so that the new scan of the outer will update
+                        * it correctly if it turns out to be empty this time. (There's no
+                        * harm in clearing it now because ExecHashJoin won't need the
+                        * info.  In the other cases, where the hash table doesn't exist
+                        * or we are destroying it, we leave this state alone because
+                        * ExecHashJoin will need it the first time through.)
                         */
                        node->hj_OuterNotEmpty = false;
+
+                       /* ExecHashJoin can skip the BUILD_HASHTABLE step */
+                       node->hj_JoinState = HJ_NEED_NEW_OUTER;
                }
                else
                {
                        /* must destroy and rebuild hash table */
                        ExecHashTableDestroy(node->hj_HashTable);
                        node->hj_HashTable = NULL;
+                       node->hj_JoinState = HJ_BUILD_HASHTABLE;
 
                        /*
                         * if chgParam of subnode is not null then plan will be re-scanned
                         * by first ExecProcNode.
                         */
-                       if (((PlanState *) node)->righttree->chgParam == NULL)
-                               ExecReScan(((PlanState *) node)->righttree, exprCtxt);
+                       if (node->js.ps.righttree->chgParam == NULL)
+                               ExecReScan(node->js.ps.righttree);
                }
        }
 
@@ -891,8 +1363,6 @@ ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
        node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
        node->hj_CurTuple = NULL;
 
-       node->js.ps.ps_TupFromTlist = false;
-       node->hj_NeedNewOuter = true;
        node->hj_MatchedOuter = false;
        node->hj_FirstOuterTupleSlot = NULL;
 
@@ -900,6 +1370,184 @@ ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
         * if chgParam of subnode is not null then plan will be re-scanned by
         * first ExecProcNode.
         */
-       if (((PlanState *) node)->lefttree->chgParam == NULL)
-               ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
+       if (node->js.ps.lefttree->chgParam == NULL)
+               ExecReScan(node->js.ps.lefttree);
+}
+
+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+       if (node->hj_HashTable)
+       {
+               /*
+                * Detach from shared state before DSM memory goes away.  This makes
+                * sure that we don't have any pointers into DSM memory by the time
+                * ExecEndHashJoin runs.
+                */
+               ExecHashTableDetachBatch(node->hj_HashTable);
+               ExecHashTableDetach(node->hj_HashTable);
+       }
+}
+
+static void
+ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
+{
+       PlanState  *outerState = outerPlanState(hjstate);
+       ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+       HashJoinTable hashtable = hjstate->hj_HashTable;
+       TupleTableSlot *slot;
+       uint32          hashvalue;
+       int                     i;
+
+       Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
+
+       /* Execute outer plan, writing all tuples to shared tuplestores. */
+       for (;;)
+       {
+               slot = ExecProcNode(outerState);
+               if (TupIsNull(slot))
+                       break;
+               econtext->ecxt_outertuple = slot;
+               if (ExecHashGetHashValue(hashtable, econtext,
+                                                                hjstate->hj_OuterHashKeys,
+                                                                true,  /* outer tuple */
+                                                                HJ_FILL_OUTER(hjstate),
+                                                                &hashvalue))
+               {
+                       int                     batchno;
+                       int                     bucketno;
+                       bool            shouldFree;
+                       MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
+
+                       ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
+                                                                         &batchno);
+                       sts_puttuple(hashtable->batches[batchno].outer_tuples,
+                                                &hashvalue, mintup);
+
+                       if (shouldFree)
+                               heap_free_minimal_tuple(mintup);
+               }
+               CHECK_FOR_INTERRUPTS();
+       }
+
+       /* Make sure all outer partitions are readable by any backend. */
+       for (i = 0; i < hashtable->nbatch; ++i)
+               sts_end_write(hashtable->batches[i].outer_tuples);
+}
+
+void
+ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
+{
+       shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
+       shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+void
+ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
+{
+       int                     plan_node_id = state->js.ps.plan->plan_node_id;
+       HashState  *hashNode;
+       ParallelHashJoinState *pstate;
+
+       /*
+        * Disable shared hash table mode if we failed to create a real DSM
+        * segment, because that means that we don't have a DSA area to work with.
+        */
+       if (pcxt->seg == NULL)
+               return;
+
+       ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
+
+       /*
+        * Set up the state needed to coordinate access to the shared hash
+        * table(s), using the plan node ID as the toc key.
+        */
+       pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
+       shm_toc_insert(pcxt->toc, plan_node_id, pstate);
+
+       /*
+        * Set up the shared hash join state with no batches initially.
+        * ExecHashTableCreate() will prepare at least one later and set nbatch
+        * and space_allowed.
+        */
+       pstate->nbatch = 0;
+       pstate->space_allowed = 0;
+       pstate->batches = InvalidDsaPointer;
+       pstate->old_batches = InvalidDsaPointer;
+       pstate->nbuckets = 0;
+       pstate->growth = PHJ_GROWTH_OK;
+       pstate->chunk_work_queue = InvalidDsaPointer;
+       pg_atomic_init_u32(&pstate->distributor, 0);
+       pstate->nparticipants = pcxt->nworkers + 1;
+       pstate->total_tuples = 0;
+       LWLockInitialize(&pstate->lock,
+                                        LWTRANCHE_PARALLEL_HASH_JOIN);
+       BarrierInit(&pstate->build_barrier, 0);
+       BarrierInit(&pstate->grow_batches_barrier, 0);
+       BarrierInit(&pstate->grow_buckets_barrier, 0);
+
+       /* Set up the space we'll use for shared temporary files. */
+       SharedFileSetInit(&pstate->fileset, pcxt->seg);
+
+       /* Initialize the shared state in the hash node. */
+       hashNode = (HashState *) innerPlanState(state);
+       hashNode->parallel_state = pstate;
+}
+
+/* ----------------------------------------------------------------
+ *             ExecHashJoinReInitializeDSM
+ *
+ *             Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
+{
+       int                     plan_node_id = state->js.ps.plan->plan_node_id;
+       ParallelHashJoinState *pstate =
+       shm_toc_lookup(cxt->toc, plan_node_id, false);
+
+       /*
+        * It would be possible to reuse the shared hash table in single-batch
+        * cases by resetting and then fast-forwarding build_barrier to
+        * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
+        * currently shared hash tables are already freed by now (by the last
+        * participant to detach from the batch).  We could consider keeping it
+        * around for single-batch joins.  We'd also need to adjust
+        * finalize_plan() so that it doesn't record a dummy dependency for
+        * Parallel Hash nodes, preventing the rescan optimization.  For now we
+        * don't try.
+        */
+
+       /* Detach, freeing any remaining shared memory. */
+       if (state->hj_HashTable != NULL)
+       {
+               ExecHashTableDetachBatch(state->hj_HashTable);
+               ExecHashTableDetach(state->hj_HashTable);
+       }
+
+       /* Clear any shared batch files. */
+       SharedFileSetDeleteAll(&pstate->fileset);
+
+       /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
+       BarrierInit(&pstate->build_barrier, 0);
+}
+
+void
+ExecHashJoinInitializeWorker(HashJoinState *state,
+                                                        ParallelWorkerContext *pwcxt)
+{
+       HashState  *hashNode;
+       int                     plan_node_id = state->js.ps.plan->plan_node_id;
+       ParallelHashJoinState *pstate =
+       shm_toc_lookup(pwcxt->toc, plan_node_id, false);
+
+       /* Attach to the space for shared temporary files. */
+       SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
+
+       /* Attach to the shared state in the hash node. */
+       hashNode = (HashState *) innerPlanState(state);
+       hashNode->parallel_state = pstate;
+
+       ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
 }