]> granicus.if.org Git - postgresql/blobdiff - src/backend/executor/nodeHashjoin.c
Make some small planner API cleanups.
[postgresql] / src / backend / executor / nodeHashjoin.c
index 07063af84f838d3003da7383dd0601c2cb896003..209870886400645312e74c33ae6dbecb5149a3bb 100644 (file)
  * nodeHashjoin.c
  *       Routines to handle hash join nodes
  *
- * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/executor/nodeHashjoin.c,v 1.67 2004/12/31 21:59:45 pgsql Exp $
+ *       src/backend/executor/nodeHashjoin.c
+ *
+ * PARALLELISM
+ *
+ * Hash joins can participate in parallel query execution in several ways.  A
+ * parallel-oblivious hash join is one where the node is unaware that it is
+ * part of a parallel plan.  In this case, a copy of the inner plan is used to
+ * build a copy of the hash table in every backend, and the outer plan could
+ * either be built from a partial or complete path, so that the results of the
+ * hash join are correspondingly either partial or complete.  A parallel-aware
+ * hash join is one that behaves differently, coordinating work between
+ * backends, and appears as Parallel Hash Join in EXPLAIN output.  A Parallel
+ * Hash Join always appears with a Parallel Hash node.
+ *
+ * Parallel-aware hash joins use the same per-backend state machine to track
+ * progress through the hash join algorithm as parallel-oblivious hash joins.
+ * In a parallel-aware hash join, there is also a shared state machine that
+ * co-operating backends use to synchronize their local state machines and
+ * program counters.  The shared state machine is managed with a Barrier IPC
+ * primitive.  When all attached participants arrive at a barrier, the phase
+ * advances and all waiting participants are released.
+ *
+ * When a participant begins working on a parallel hash join, it must first
+ * figure out how much progress has already been made, because participants
+ * don't wait for each other to begin.  For this reason there are switch
+ * statements at key points in the code where we have to synchronize our local
+ * state machine with the phase, and then jump to the correct part of the
+ * algorithm so that we can get started.
+ *
+ * One barrier called build_barrier is used to coordinate the hashing phases.
+ * The phase is represented by an integer which begins at zero and increments
+ * one by one, but in the code it is referred to by symbolic names as follows:
+ *
+ *   PHJ_BUILD_ELECTING              -- initial state
+ *   PHJ_BUILD_ALLOCATING            -- one sets up the batches and table 0
+ *   PHJ_BUILD_HASHING_INNER         -- all hash the inner rel
+ *   PHJ_BUILD_HASHING_OUTER         -- (multi-batch only) all hash the outer
+ *   PHJ_BUILD_DONE                  -- building done, probing can begin
+ *
+ * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
+ * be used repeatedly as required to coordinate expansions in the number of
+ * batches or buckets.  Their phases are as follows:
+ *
+ *   PHJ_GROW_BATCHES_ELECTING       -- initial state
+ *   PHJ_GROW_BATCHES_ALLOCATING     -- one allocates new batches
+ *   PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
+ *   PHJ_GROW_BATCHES_FINISHING      -- one cleans up, detects skew
+ *
+ *   PHJ_GROW_BUCKETS_ELECTING       -- initial state
+ *   PHJ_GROW_BUCKETS_ALLOCATING     -- one allocates new buckets
+ *   PHJ_GROW_BUCKETS_REINSERTING    -- all insert tuples
+ *
+ * If the planner got the number of batches and buckets right, those won't be
+ * necessary, but on the other hand we might finish up needing to expand the
+ * buckets or batches multiple times while hashing the inner relation to stay
+ * within our memory budget and load factor target.  For that reason it's a
+ * separate pair of barriers using circular phases.
+ *
+ * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
+ * because we need to divide the outer relation into batches up front in order
+ * to be able to process batches entirely independently.  In contrast, the
+ * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
+ * batches whenever it encounters them while scanning and probing, which it
+ * can do because it processes batches in serial order.
+ *
+ * Once PHJ_BUILD_DONE is reached, backends then split up and process
+ * different batches, or gang up and work together on probing batches if there
+ * aren't enough to go around.  For each batch there is a separate barrier
+ * with the following phases:
+ *
+ *  PHJ_BATCH_ELECTING       -- initial state
+ *  PHJ_BATCH_ALLOCATING     -- one allocates buckets
+ *  PHJ_BATCH_LOADING        -- all load the hash table from disk
+ *  PHJ_BATCH_PROBING        -- all probe
+ *  PHJ_BATCH_DONE           -- end
+ *
+ * Batch 0 is a special case, because it starts out in phase
+ * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
+ * PHJ_BUILD_HASHING_INNER so we can skip loading.
+ *
+ * Initially we try to plan for a single-batch hash join using the combined
+ * work_mem of all participants to create a large shared hash table.  If that
+ * turns out either at planning or execution time to be impossible then we
+ * fall back to regular work_mem sized hash tables.
+ *
+ * To avoid deadlocks, we never wait for any barrier unless it is known that
+ * all other backends attached to it are actively executing the node or have
+ * already arrived.  Practically, that means that we never return a tuple
+ * while attached to a barrier, unless the barrier has reached its final
+ * state.  In the slightly special case of the per-batch barrier, we return
+ * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
  *
  *-------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
+#include "access/htup_details.h"
+#include "access/parallel.h"
 #include "executor/executor.h"
+#include "executor/hashjoin.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
-#include "optimizer/clauses.h"
+#include "miscadmin.h"
+#include "pgstat.h"
 #include "utils/memutils.h"
+#include "utils/sharedtuplestore.h"
 
 
-static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *node,
-                                                 HashJoinState *hjstate);
+/*
+ * States of the ExecHashJoin state machine
+ */
+#define HJ_BUILD_HASHTABLE             1
+#define HJ_NEED_NEW_OUTER              2
+#define HJ_SCAN_BUCKET                 3
+#define HJ_FILL_OUTER_TUPLE            4
+#define HJ_FILL_INNER_TUPLES   5
+#define HJ_NEED_NEW_BATCH              6
+
+/* Returns true if doing null-fill on outer relation */
+#define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
+/* Returns true if doing null-fill on inner relation */
+#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
+
+static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
+                                                 HashJoinState *hjstate,
+                                                 uint32 *hashvalue);
+static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
+                                                                 HashJoinState *hjstate,
+                                                                 uint32 *hashvalue);
 static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
                                                  BufFile *file,
+                                                 uint32 *hashvalue,
                                                  TupleTableSlot *tupleSlot);
-static int     ExecHashJoinNewBatch(HashJoinState *hjstate);
+static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
+static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
+static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
 
 
 /* ----------------------------------------------------------------
- *             ExecHashJoin
+ *             ExecHashJoinImpl
  *
- *             This function implements the Hybrid Hashjoin algorithm.
- *             recursive partitioning remains to be added.
- *             Note: the relation we build hash table on is the inner
- *                       the other one is outer.
+ *             This function implements the Hybrid Hashjoin algorithm.  It is marked
+ *             with an always-inline attribute so that ExecHashJoin() and
+ *             ExecParallelHashJoin() can inline it.  Compilers that respect the
+ *             attribute should create versions specialized for parallel == true and
+ *             parallel == false with unnecessary branches removed.
+ *
+ *             Note: the relation we build hash table on is the "inner"
+ *                       the other one is "outer".
  * ----------------------------------------------------------------
  */
-TupleTableSlot *                               /* return: a tuple or NULL */
-ExecHashJoin(HashJoinState *node)
+static pg_attribute_always_inline TupleTableSlot *
+ExecHashJoinImpl(PlanState *pstate, bool parallel)
 {
-       EState     *estate;
+       HashJoinState *node = castNode(HashJoinState, pstate);
        PlanState  *outerNode;
        HashState  *hashNode;
-       List       *hjclauses;
-       List       *outerkeys;
-       List       *joinqual;
-       List       *otherqual;
-       ScanDirection dir;
-       TupleTableSlot *inntuple;
+       ExprState  *joinqual;
+       ExprState  *otherqual;
        ExprContext *econtext;
-       ExprDoneCond isDone;
        HashJoinTable hashtable;
-       HeapTuple       curtuple;
        TupleTableSlot *outerTupleSlot;
-       int                     i;
+       uint32          hashvalue;
+       int                     batchno;
+       ParallelHashJoinState *parallel_state;
 
        /*
         * get information from HashJoin node
         */
-       hjclauses = node->hashclauses;
-       estate = node->js.ps.state;
        joinqual = node->js.joinqual;
        otherqual = node->js.ps.qual;
        hashNode = (HashState *) innerPlanState(node);
        outerNode = outerPlanState(node);
-       dir = estate->es_direction;
-
-       /*
-        * get information from HashJoin state
-        */
        hashtable = node->hj_HashTable;
-       outerkeys = node->hj_OuterHashKeys;
        econtext = node->js.ps.ps_ExprContext;
-
-       /*
-        * Check to see if we're still projecting out tuples from a previous
-        * join tuple (because there is a function-returning-set in the
-        * projection expressions).  If so, try to project another one.
-        */
-       if (node->js.ps.ps_TupFromTlist)
-       {
-               TupleTableSlot *result;
-
-               result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
-               if (isDone == ExprMultipleResult)
-                       return result;
-               /* Done with that source tuple... */
-               node->js.ps.ps_TupFromTlist = false;
-       }
-
-       /*
-        * If we're doing an IN join, we want to return at most one row per
-        * outer tuple; so we can stop scanning the inner scan if we matched
-        * on the previous try.
-        */
-       if (node->js.jointype == JOIN_IN &&
-               node->hj_MatchedOuter)
-               node->hj_NeedNewOuter = true;
+       parallel_state = hashNode->parallel_state;
 
        /*
         * Reset per-tuple memory context to free any expression evaluation
-        * storage allocated in the previous tuple cycle.  Note this can't
-        * happen until we're done projecting out tuples from a join tuple.
+        * storage allocated in the previous tuple cycle.
         */
        ResetExprContext(econtext);
 
        /*
-        * if this is the first call, build the hash table for inner relation
+        * run the hash join state machine
         */
-       if (!node->hj_hashdone)
+       for (;;)
        {
                /*
-                * create the hash table
-                */
-               Assert(hashtable == NULL);
-               hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan,
-                                                                               node->hj_HashOperators);
-               node->hj_HashTable = hashtable;
-
-               /*
-                * execute the Hash node, to build the hash table
+                * It's possible to iterate this loop many times before returning a
+                * tuple, in some pathological cases such as needing to move much of
+                * the current batch to a later batch.  So let's check for interrupts
+                * each time through.
                 */
-               hashNode->hashtable = hashtable;
-               (void) ExecProcNode((PlanState *) hashNode);
+               CHECK_FOR_INTERRUPTS();
 
-               /*
-                * If the inner relation is completely empty, and we're not doing
-                * an outer join, we can quit without scanning the outer relation.
-                */
-               if (!hashtable->hashNonEmpty && node->js.jointype != JOIN_LEFT)
+               switch (node->hj_JoinState)
                {
-                       ExecHashTableDestroy(hashtable);
-                       node->hj_HashTable = NULL;
-                       return NULL;
-               }
+                       case HJ_BUILD_HASHTABLE:
 
-               /*
-                * Open temp files for outer batches, if needed. Note that file
-                * buffers are palloc'd in regular executor context.
-                */
-               for (i = 0; i < hashtable->nbatch; i++)
-                       hashtable->outerBatchFile[i] = BufFileCreateTemp(false);
+                               /*
+                                * First time through: build hash table for inner relation.
+                                */
+                               Assert(hashtable == NULL);
 
-               node->hj_hashdone = true;
-       }
+                               /*
+                                * If the outer relation is completely empty, and it's not
+                                * right/full join, we can quit without building the hash
+                                * table.  However, for an inner join it is only a win to
+                                * check this when the outer relation's startup cost is less
+                                * than the projected cost of building the hash table.
+                                * Otherwise it's best to build the hash table first and see
+                                * if the inner relation is empty.  (When it's a left join, we
+                                * should always make this check, since we aren't going to be
+                                * able to skip the join on the strength of an empty inner
+                                * relation anyway.)
+                                *
+                                * If we are rescanning the join, we make use of information
+                                * gained on the previous scan: don't bother to try the
+                                * prefetch if the previous scan found the outer relation
+                                * nonempty. This is not 100% reliable since with new
+                                * parameters the outer relation might yield different
+                                * results, but it's a good heuristic.
+                                *
+                                * The only way to make the check is to try to fetch a tuple
+                                * from the outer plan node.  If we succeed, we have to stash
+                                * it away for later consumption by ExecHashJoinOuterGetTuple.
+                                */
+                               if (HJ_FILL_INNER(node))
+                               {
+                                       /* no chance to not build the hash table */
+                                       node->hj_FirstOuterTupleSlot = NULL;
+                               }
+                               else if (parallel)
+                               {
+                                       /*
+                                        * The empty-outer optimization is not implemented for
+                                        * shared hash tables, because no one participant can
+                                        * determine that there are no outer tuples, and it's not
+                                        * yet clear that it's worth the synchronization overhead
+                                        * of reaching consensus to figure that out.  So we have
+                                        * to build the hash table.
+                                        */
+                                       node->hj_FirstOuterTupleSlot = NULL;
+                               }
+                               else if (HJ_FILL_OUTER(node) ||
+                                                (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
+                                                 !node->hj_OuterNotEmpty))
+                               {
+                                       node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
+                                       if (TupIsNull(node->hj_FirstOuterTupleSlot))
+                                       {
+                                               node->hj_OuterNotEmpty = false;
+                                               return NULL;
+                                       }
+                                       else
+                                               node->hj_OuterNotEmpty = true;
+                               }
+                               else
+                                       node->hj_FirstOuterTupleSlot = NULL;
 
-       /*
-        * run the hash join process
-        */
-       for (;;)
-       {
-               /*
-                * If we don't have an outer tuple, get the next one
-                */
-               if (node->hj_NeedNewOuter)
-               {
-                       outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
-                                                                                                          node);
-                       if (TupIsNull(outerTupleSlot))
-                       {
-                               /* end of join */
-                               return NULL;
-                       }
+                               /*
+                                * Create the hash table.  If using Parallel Hash, then
+                                * whoever gets here first will create the hash table and any
+                                * later arrivals will merely attach to it.
+                                */
+                               hashtable = ExecHashTableCreate(hashNode,
+                                                                                               node->hj_HashOperators,
+                                                                                               HJ_FILL_INNER(node));
+                               node->hj_HashTable = hashtable;
 
-                       node->js.ps.ps_OuterTupleSlot = outerTupleSlot;
-                       econtext->ecxt_outertuple = outerTupleSlot;
-                       node->hj_NeedNewOuter = false;
-                       node->hj_MatchedOuter = false;
+                               /*
+                                * Execute the Hash node, to build the hash table.  If using
+                                * Parallel Hash, then we'll try to help hashing unless we
+                                * arrived too late.
+                                */
+                               hashNode->hashtable = hashtable;
+                               (void) MultiExecProcNode((PlanState *) hashNode);
 
-                       /*
-                        * now we have an outer tuple, find the corresponding bucket
-                        * for this tuple from the hash table
-                        */
-                       node->hj_CurBucketNo = ExecHashGetBucket(hashtable, econtext,
-                                                                                                        outerkeys);
-                       node->hj_CurTuple = NULL;
+                               /*
+                                * If the inner relation is completely empty, and we're not
+                                * doing a left outer join, we can quit without scanning the
+                                * outer relation.
+                                */
+                               if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
+                                       return NULL;
 
-                       /*
-                        * Now we've got an outer tuple and the corresponding hash
-                        * bucket, but this tuple may not belong to the current batch.
-                        * This need only be checked in the first pass.
-                        */
-                       if (hashtable->curbatch == 0)
-                       {
-                               int                     batchno = ExecHashGetBatch(node->hj_CurBucketNo,
-                                                                                                          hashtable);
+                               /*
+                                * need to remember whether nbatch has increased since we
+                                * began scanning the outer relation
+                                */
+                               hashtable->nbatch_outstart = hashtable->nbatch;
+
+                               /*
+                                * Reset OuterNotEmpty for scan.  (It's OK if we fetched a
+                                * tuple above, because ExecHashJoinOuterGetTuple will
+                                * immediately set it again.)
+                                */
+                               node->hj_OuterNotEmpty = false;
+
+                               if (parallel)
+                               {
+                                       Barrier    *build_barrier;
+
+                                       build_barrier = &parallel_state->build_barrier;
+                                       Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
+                                                  BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+                                       if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
+                                       {
+                                               /*
+                                                * If multi-batch, we need to hash the outer relation
+                                                * up front.
+                                                */
+                                               if (hashtable->nbatch > 1)
+                                                       ExecParallelHashJoinPartitionOuter(node);
+                                               BarrierArriveAndWait(build_barrier,
+                                                                                        WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
+                                       }
+                                       Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+
+                                       /* Each backend should now select a batch to work on. */
+                                       hashtable->curbatch = -1;
+                                       node->hj_JoinState = HJ_NEED_NEW_BATCH;
 
-                               if (batchno >= 0)
+                                       continue;
+                               }
+                               else
+                                       node->hj_JoinState = HJ_NEED_NEW_OUTER;
+
+                               /* FALL THRU */
+
+                       case HJ_NEED_NEW_OUTER:
+
+                               /*
+                                * We don't have an outer tuple, try to get the next one
+                                */
+                               if (parallel)
+                                       outerTupleSlot =
+                                               ExecParallelHashJoinOuterGetTuple(outerNode, node,
+                                                                                                                 &hashvalue);
+                               else
+                                       outerTupleSlot =
+                                               ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
+
+                               if (TupIsNull(outerTupleSlot))
                                {
+                                       /* end of batch, or maybe whole join */
+                                       if (HJ_FILL_INNER(node))
+                                       {
+                                               /* set up to scan for unmatched inner tuples */
+                                               ExecPrepHashTableForUnmatched(node);
+                                               node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+                                       }
+                                       else
+                                               node->hj_JoinState = HJ_NEED_NEW_BATCH;
+                                       continue;
+                               }
+
+                               econtext->ecxt_outertuple = outerTupleSlot;
+                               node->hj_MatchedOuter = false;
+
+                               /*
+                                * Find the corresponding bucket for this tuple in the main
+                                * hash table or skew hash table.
+                                */
+                               node->hj_CurHashValue = hashvalue;
+                               ExecHashGetBucketAndBatch(hashtable, hashvalue,
+                                                                                 &node->hj_CurBucketNo, &batchno);
+                               node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
+                                                                                                                                hashvalue);
+                               node->hj_CurTuple = NULL;
+
+                               /*
+                                * The tuple might not belong to the current batch (where
+                                * "current batch" includes the skew buckets if any).
+                                */
+                               if (batchno != hashtable->curbatch &&
+                                       node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
+                               {
+                                       bool            shouldFree;
+                                       MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
+                                                                                                                                         &shouldFree);
+
                                        /*
                                         * Need to postpone this outer tuple to a later batch.
                                         * Save it in the corresponding outer-batch file.
                                         */
-                                       hashtable->outerBatchSize[batchno]++;
-                                       ExecHashJoinSaveTuple(outerTupleSlot->val,
-                                                                        hashtable->outerBatchFile[batchno]);
-                                       node->hj_NeedNewOuter = true;
-                                       continue;       /* loop around for a new outer tuple */
-                               }
-                       }
-               }
+                                       Assert(parallel_state == NULL);
+                                       Assert(batchno > hashtable->curbatch);
+                                       ExecHashJoinSaveTuple(mintuple, hashvalue,
+                                                                                 &hashtable->outerBatchFile[batchno]);
 
-               /*
-                * OK, scan the selected hash bucket for matches
-                */
-               for (;;)
-               {
-                       curtuple = ExecScanHashBucket(node,
-                                                                                 hjclauses,
-                                                                                 econtext);
-                       if (curtuple == NULL)
-                               break;                  /* out of matches */
+                                       if (shouldFree)
+                                               heap_free_minimal_tuple(mintuple);
 
-                       /*
-                        * we've got a match, but still need to test non-hashed quals
-                        */
-                       inntuple = ExecStoreTuple(curtuple,
-                                                                         node->hj_HashTupleSlot,
-                                                                         InvalidBuffer,
-                                                                         false);       /* don't pfree this tuple */
-                       econtext->ecxt_innertuple = inntuple;
+                                       /* Loop around, staying in HJ_NEED_NEW_OUTER state */
+                                       continue;
+                               }
 
-                       /* reset temp memory each time to avoid leaks from qual expr */
-                       ResetExprContext(econtext);
+                               /* OK, let's scan the bucket for matches */
+                               node->hj_JoinState = HJ_SCAN_BUCKET;
 
-                       /*
-                        * if we pass the qual, then save state for next call and have
-                        * ExecProject form the projection, store it in the tuple
-                        * table, and return the slot.
-                        *
-                        * Only the joinquals determine MatchedOuter status, but all
-                        * quals must pass to actually return the tuple.
-                        */
-                       if (joinqual == NIL || ExecQual(joinqual, econtext, false))
-                       {
-                               node->hj_MatchedOuter = true;
+                               /* FALL THRU */
 
-                               if (otherqual == NIL || ExecQual(otherqual, econtext, false))
+                       case HJ_SCAN_BUCKET:
+
+                               /*
+                                * Scan the selected hash bucket for matches to current outer
+                                */
+                               if (parallel)
                                {
-                                       TupleTableSlot *result;
+                                       if (!ExecParallelScanHashBucket(node, econtext))
+                                       {
+                                               /* out of matches; check for possible outer-join fill */
+                                               node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+                                               continue;
+                                       }
+                               }
+                               else
+                               {
+                                       if (!ExecScanHashBucket(node, econtext))
+                                       {
+                                               /* out of matches; check for possible outer-join fill */
+                                               node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+                                               continue;
+                                       }
+                               }
 
-                                       result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
+                               /*
+                                * We've got a match, but still need to test non-hashed quals.
+                                * ExecScanHashBucket already set up all the state needed to
+                                * call ExecQual.
+                                *
+                                * If we pass the qual, then save state for next call and have
+                                * ExecProject form the projection, store it in the tuple
+                                * table, and return the slot.
+                                *
+                                * Only the joinquals determine tuple match status, but all
+                                * quals must pass to actually return the tuple.
+                                */
+                               if (joinqual == NULL || ExecQual(joinqual, econtext))
+                               {
+                                       node->hj_MatchedOuter = true;
+                                       HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
 
-                                       if (isDone != ExprEndResult)
+                                       /* In an antijoin, we never return a matched tuple */
+                                       if (node->js.jointype == JOIN_ANTI)
                                        {
-                                               node->js.ps.ps_TupFromTlist =
-                                                       (isDone == ExprMultipleResult);
-                                               return result;
+                                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
+                                               continue;
                                        }
+
+                                       /*
+                                        * If we only need to join to the first matching inner
+                                        * tuple, then consider returning this one, but after that
+                                        * continue with next outer tuple.
+                                        */
+                                       if (node->js.single_match)
+                                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
+
+                                       if (otherqual == NULL || ExecQual(otherqual, econtext))
+                                               return ExecProject(node->js.ps.ps_ProjInfo);
+                                       else
+                                               InstrCountFiltered2(node, 1);
                                }
+                               else
+                                       InstrCountFiltered1(node, 1);
+                               break;
+
+                       case HJ_FILL_OUTER_TUPLE:
 
                                /*
-                                * If we didn't return a tuple, may need to set
-                                * NeedNewOuter
+                                * The current outer tuple has run out of matches, so check
+                                * whether to emit a dummy outer-join tuple.  Whether we emit
+                                * one or not, the next state is NEED_NEW_OUTER.
                                 */
-                               if (node->js.jointype == JOIN_IN)
+                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
+
+                               if (!node->hj_MatchedOuter &&
+                                       HJ_FILL_OUTER(node))
                                {
-                                       node->hj_NeedNewOuter = true;
-                                       break;          /* out of loop over hash bucket */
+                                       /*
+                                        * Generate a fake join tuple with nulls for the inner
+                                        * tuple, and return it if it passes the non-join quals.
+                                        */
+                                       econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
+
+                                       if (otherqual == NULL || ExecQual(otherqual, econtext))
+                                               return ExecProject(node->js.ps.ps_ProjInfo);
+                                       else
+                                               InstrCountFiltered2(node, 1);
                                }
-                       }
-               }
+                               break;
 
-               /*
-                * Now the current outer tuple has run out of matches, so check
-                * whether to emit a dummy outer-join tuple. If not, loop around
-                * to get a new outer tuple.
-                */
-               node->hj_NeedNewOuter = true;
+                       case HJ_FILL_INNER_TUPLES:
 
-               if (!node->hj_MatchedOuter &&
-                       node->js.jointype == JOIN_LEFT)
-               {
-                       /*
-                        * We are doing an outer join and there were no join matches
-                        * for this outer tuple.  Generate a fake join tuple with
-                        * nulls for the inner tuple, and return it if it passes the
-                        * non-join quals.
-                        */
-                       econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
+                               /*
+                                * We have finished a batch, but we are doing right/full join,
+                                * so any unmatched inner tuples in the hashtable have to be
+                                * emitted before we continue to the next batch.
+                                */
+                               if (!ExecScanHashTableForUnmatched(node, econtext))
+                               {
+                                       /* no more unmatched tuples */
+                                       node->hj_JoinState = HJ_NEED_NEW_BATCH;
+                                       continue;
+                               }
 
-                       if (ExecQual(otherqual, econtext, false))
-                       {
                                /*
-                                * qualification was satisfied so we project and return
-                                * the slot containing the result tuple using
-                                * ExecProject().
+                                * Generate a fake join tuple with nulls for the outer tuple,
+                                * and return it if it passes the non-join quals.
                                 */
-                               TupleTableSlot *result;
+                               econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
+
+                               if (otherqual == NULL || ExecQual(otherqual, econtext))
+                                       return ExecProject(node->js.ps.ps_ProjInfo);
+                               else
+                                       InstrCountFiltered2(node, 1);
+                               break;
 
-                               result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
+                       case HJ_NEED_NEW_BATCH:
 
-                               if (isDone != ExprEndResult)
+                               /*
+                                * Try to advance to next batch.  Done if there are no more.
+                                */
+                               if (parallel)
                                {
-                                       node->js.ps.ps_TupFromTlist =
-                                               (isDone == ExprMultipleResult);
-                                       return result;
+                                       if (!ExecParallelHashJoinNewBatch(node))
+                                               return NULL;    /* end of parallel-aware join */
                                }
-                       }
+                               else
+                               {
+                                       if (!ExecHashJoinNewBatch(node))
+                                               return NULL;    /* end of parallel-oblivious join */
+                               }
+                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
+                               break;
+
+                       default:
+                               elog(ERROR, "unrecognized hashjoin state: %d",
+                                        (int) node->hj_JoinState);
                }
        }
 }
 
+/* ----------------------------------------------------------------
+ *             ExecHashJoin
+ *
+ *             Parallel-oblivious version.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot *                        /* return: a tuple or NULL */
+ExecHashJoin(PlanState *pstate)
+{
+       /*
+        * On sufficiently smart compilers this should be inlined with the
+        * parallel-aware branches removed.
+        */
+       return ExecHashJoinImpl(pstate, false);
+}
+
+/* ----------------------------------------------------------------
+ *             ExecParallelHashJoin
+ *
+ *             Parallel-aware version.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot *                        /* return: a tuple or NULL */
+ExecParallelHashJoin(PlanState *pstate)
+{
+       /*
+        * On sufficiently smart compilers this should be inlined with the
+        * parallel-oblivious branches removed.
+        */
+       return ExecHashJoinImpl(pstate, true);
+}
+
 /* ----------------------------------------------------------------
  *             ExecInitHashJoin
  *
@@ -312,15 +594,22 @@ ExecHashJoin(HashJoinState *node)
  * ----------------------------------------------------------------
  */
 HashJoinState *
-ExecInitHashJoin(HashJoin *node, EState *estate)
+ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
 {
        HashJoinState *hjstate;
        Plan       *outerNode;
        Hash       *hashNode;
        List       *lclauses;
        List       *rclauses;
+       List       *rhclauses;
        List       *hoperators;
+       TupleDesc       outerDesc,
+                               innerDesc;
        ListCell   *l;
+       const TupleTableSlotOps *ops;
+
+       /* check for unsupported flags */
+       Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
 
        /*
         * create state structure
@@ -329,6 +618,14 @@ ExecInitHashJoin(HashJoin *node, EState *estate)
        hjstate->js.ps.plan = (Plan *) node;
        hjstate->js.ps.state = estate;
 
+       /*
+        * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
+        * where this function may be replaced with a parallel version, if we
+        * managed to launch a parallel query.
+        */
+       hjstate->js.ps.ExecProcNode = ExecHashJoin;
+       hjstate->js.jointype = node->join.jointype;
+
        /*
         * Miscellaneous initialization
         *
@@ -336,49 +633,60 @@ ExecInitHashJoin(HashJoin *node, EState *estate)
         */
        ExecAssignExprContext(estate, &hjstate->js.ps);
 
-       /*
-        * initialize child expressions
-        */
-       hjstate->js.ps.targetlist = (List *)
-               ExecInitExpr((Expr *) node->join.plan.targetlist,
-                                        (PlanState *) hjstate);
-       hjstate->js.ps.qual = (List *)
-               ExecInitExpr((Expr *) node->join.plan.qual,
-                                        (PlanState *) hjstate);
-       hjstate->js.jointype = node->join.jointype;
-       hjstate->js.joinqual = (List *)
-               ExecInitExpr((Expr *) node->join.joinqual,
-                                        (PlanState *) hjstate);
-       hjstate->hashclauses = (List *)
-               ExecInitExpr((Expr *) node->hashclauses,
-                                        (PlanState *) hjstate);
-
        /*
         * initialize child nodes
+        *
+        * Note: we could suppress the REWIND flag for the inner input, which
+        * would amount to betting that the hash will be a single batch.  Not
+        * clear if this would be a win or not.
         */
        outerNode = outerPlan(node);
        hashNode = (Hash *) innerPlan(node);
 
-       outerPlanState(hjstate) = ExecInitNode(outerNode, estate);
-       innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate);
+       outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
+       outerDesc = ExecGetResultType(outerPlanState(hjstate));
+       innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
+       innerDesc = ExecGetResultType(innerPlanState(hjstate));
 
-#define HASHJOIN_NSLOTS 3
+       /*
+        * Initialize result slot, type and projection.
+        */
+       ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
+       ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
 
        /*
         * tuple table initialization
         */
-       ExecInitResultTupleSlot(estate, &hjstate->js.ps);
-       hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate);
+       ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
+       hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
+                                                                                                               ops);
 
+       /*
+        * detect whether we need only consider the first matching inner tuple
+        */
+       hjstate->js.single_match = (node->join.inner_unique ||
+                                                               node->join.jointype == JOIN_SEMI);
+
+       /* set up null tuples for outer joins, if needed */
        switch (node->join.jointype)
        {
                case JOIN_INNER:
-               case JOIN_IN:
+               case JOIN_SEMI:
                        break;
                case JOIN_LEFT:
+               case JOIN_ANTI:
+                       hjstate->hj_NullInnerTupleSlot =
+                               ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
+                       break;
+               case JOIN_RIGHT:
+                       hjstate->hj_NullOuterTupleSlot =
+                               ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
+                       break;
+               case JOIN_FULL:
+                       hjstate->hj_NullOuterTupleSlot =
+                               ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
                        hjstate->hj_NullInnerTupleSlot =
-                               ExecInitNullTupleSlot(estate,
-                                                        ExecGetResultType(innerPlanState(hjstate)));
+                               ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
                        break;
                default:
                        elog(ERROR, "unrecognized join type: %d",
@@ -386,11 +694,11 @@ ExecInitHashJoin(HashJoin *node, EState *estate)
        }
 
        /*
-        * now for some voodoo.  our temporary tuple slot is actually the
-        * result tuple slot of the Hash node (which is our inner plan).  we
-        * do this because Hash nodes don't return tuples via ExecProcNode()
-        * -- instead the hash join node uses ExecScanHashBucket() to get at
-        * the contents of the hash table.      -cim 6/9/91
+        * now for some voodoo.  our temporary tuple slot is actually the result
+        * tuple slot of the Hash node (which is our inner plan).  we can do this
+        * because Hash nodes don't return tuples via ExecProcNode() -- instead
+        * the hash join node uses ExecScanHashBucket() to get at the contents of
+        * the hash table.  -cim 6/9/91
         */
        {
                HashState  *hashstate = (HashState *) innerPlanState(hjstate);
@@ -400,68 +708,61 @@ ExecInitHashJoin(HashJoin *node, EState *estate)
        }
 
        /*
-        * initialize tuple type and projection info
+        * initialize child expressions
         */
-       ExecAssignResultTypeFromTL(&hjstate->js.ps);
-       ExecAssignProjectionInfo(&hjstate->js.ps);
-
-       ExecSetSlotDescriptor(hjstate->hj_OuterTupleSlot,
-                                                 ExecGetResultType(outerPlanState(hjstate)),
-                                                 false);
+       hjstate->js.ps.qual =
+               ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
+       hjstate->js.joinqual =
+               ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
+       hjstate->hashclauses =
+               ExecInitQual(node->hashclauses, (PlanState *) hjstate);
 
        /*
         * initialize hash-specific info
         */
-
-       hjstate->hj_hashdone = false;
        hjstate->hj_HashTable = NULL;
+       hjstate->hj_FirstOuterTupleSlot = NULL;
 
+       hjstate->hj_CurHashValue = 0;
        hjstate->hj_CurBucketNo = 0;
+       hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
        hjstate->hj_CurTuple = NULL;
 
        /*
-        * Deconstruct the hash clauses into outer and inner argument values,
-        * so that we can evaluate those subexpressions separately.  Also make
-        * a list of the hash operator OIDs, in preparation for looking up the
-        * hash functions to use.
+        * Deconstruct the hash clauses into outer and inner argument values, so
+        * that we can evaluate those subexpressions separately.  Also make a list
+        * of the hash operator OIDs, in preparation for looking up the hash
+        * functions to use.
         */
        lclauses = NIL;
        rclauses = NIL;
+       rhclauses = NIL;
        hoperators = NIL;
-       foreach(l, hjstate->hashclauses)
+       foreach(l, node->hashclauses)
        {
-               FuncExprState *fstate = (FuncExprState *) lfirst(l);
-               OpExpr     *hclause;
-
-               Assert(IsA(fstate, FuncExprState));
-               hclause = (OpExpr *) fstate->xprstate.expr;
-               Assert(IsA(hclause, OpExpr));
-               lclauses = lappend(lclauses, linitial(fstate->args));
-               rclauses = lappend(rclauses, lsecond(fstate->args));
+               OpExpr     *hclause = lfirst_node(OpExpr, l);
+
+               lclauses = lappend(lclauses, ExecInitExpr(linitial(hclause->args),
+                                                                                                 (PlanState *) hjstate));
+               rclauses = lappend(rclauses, ExecInitExpr(lsecond(hclause->args),
+                                                                                                 (PlanState *) hjstate));
+               rhclauses = lappend(rhclauses, ExecInitExpr(lsecond(hclause->args),
+                                                                                                  innerPlanState(hjstate)));
                hoperators = lappend_oid(hoperators, hclause->opno);
        }
        hjstate->hj_OuterHashKeys = lclauses;
        hjstate->hj_InnerHashKeys = rclauses;
        hjstate->hj_HashOperators = hoperators;
        /* child Hash node needs to evaluate inner hash keys, too */
-       ((HashState *) innerPlanState(hjstate))->hashkeys = rclauses;
+       ((HashState *) innerPlanState(hjstate))->hashkeys = rhclauses;
 
-       hjstate->js.ps.ps_OuterTupleSlot = NULL;
-       hjstate->js.ps.ps_TupFromTlist = false;
-       hjstate->hj_NeedNewOuter = true;
+       hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
        hjstate->hj_MatchedOuter = false;
+       hjstate->hj_OuterNotEmpty = false;
 
        return hjstate;
 }
 
-int
-ExecCountSlotsHashJoin(HashJoin *node)
-{
-       return ExecCountSlotsNode(outerPlan(node)) +
-               ExecCountSlotsNode(innerPlan(node)) +
-               HASHJOIN_NSLOTS;
-}
-
 /* ----------------------------------------------------------------
  *             ExecEndHashJoin
  *
@@ -499,262 +800,754 @@ ExecEndHashJoin(HashJoinState *node)
        ExecEndNode(innerPlanState(node));
 }
 
-/* ----------------------------------------------------------------
- *             ExecHashJoinOuterGetTuple
+/*
+ * ExecHashJoinOuterGetTuple
  *
- *             get the next outer tuple for hashjoin: either by
- *             executing a plan node as in the first pass, or from
- *             the tmp files for the hashjoin batches.
- * ----------------------------------------------------------------
+ *             get the next outer tuple for a parallel oblivious hashjoin: either by
+ *             executing the outer plan node in the first pass, or from the temp
+ *             files for the hashjoin batches.
+ *
+ * Returns a null slot if no more outer tuples (within the current batch).
+ *
+ * On success, the tuple's hash value is stored at *hashvalue --- this is
+ * either originally computed, or re-read from the temp file.
  */
-
 static TupleTableSlot *
-ExecHashJoinOuterGetTuple(PlanState *node, HashJoinState *hjstate)
+ExecHashJoinOuterGetTuple(PlanState *outerNode,
+                                                 HashJoinState *hjstate,
+                                                 uint32 *hashvalue)
 {
        HashJoinTable hashtable = hjstate->hj_HashTable;
        int                     curbatch = hashtable->curbatch;
        TupleTableSlot *slot;
 
-       if (curbatch == 0)
-       {                                                       /* if it is the first pass */
-               slot = ExecProcNode(node);
+       if (curbatch == 0)                      /* if it is the first pass */
+       {
+               /*
+                * Check to see if first outer tuple was already fetched by
+                * ExecHashJoin() and not used yet.
+                */
+               slot = hjstate->hj_FirstOuterTupleSlot;
                if (!TupIsNull(slot))
-                       return slot;
+                       hjstate->hj_FirstOuterTupleSlot = NULL;
+               else
+                       slot = ExecProcNode(outerNode);
+
+               while (!TupIsNull(slot))
+               {
+                       /*
+                        * We have to compute the tuple's hash value.
+                        */
+                       ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+
+                       econtext->ecxt_outertuple = slot;
+                       if (ExecHashGetHashValue(hashtable, econtext,
+                                                                        hjstate->hj_OuterHashKeys,
+                                                                        true,  /* outer tuple */
+                                                                        HJ_FILL_OUTER(hjstate),
+                                                                        hashvalue))
+                       {
+                               /* remember outer relation is not empty for possible rescan */
+                               hjstate->hj_OuterNotEmpty = true;
+
+                               return slot;
+                       }
+
+                       /*
+                        * That tuple couldn't match because of a NULL, so discard it and
+                        * continue with the next one.
+                        */
+                       slot = ExecProcNode(outerNode);
+               }
+       }
+       else if (curbatch < hashtable->nbatch)
+       {
+               BufFile    *file = hashtable->outerBatchFile[curbatch];
 
                /*
-                * We have just reached the end of the first pass. Try to switch
-                * to a saved batch.
+                * In outer-join cases, we could get here even though the batch file
+                * is empty.
                 */
-               curbatch = ExecHashJoinNewBatch(hjstate);
-       }
+               if (file == NULL)
+                       return NULL;
 
-       /*
-        * Try to read from a temp file. Loop allows us to advance to new
-        * batch as needed.
-        */
-       while (curbatch <= hashtable->nbatch)
-       {
                slot = ExecHashJoinGetSavedTuple(hjstate,
-                                                                hashtable->outerBatchFile[curbatch - 1],
+                                                                                file,
+                                                                                hashvalue,
                                                                                 hjstate->hj_OuterTupleSlot);
                if (!TupIsNull(slot))
                        return slot;
-               curbatch = ExecHashJoinNewBatch(hjstate);
        }
 
-       /* Out of batches... */
+       /* End of this batch */
        return NULL;
 }
 
-/* ----------------------------------------------------------------
- *             ExecHashJoinGetSavedTuple
- *
- *             read the next tuple from a tmp file
- * ----------------------------------------------------------------
+/*
+ * ExecHashJoinOuterGetTuple variant for the parallel case.
  */
-
 static TupleTableSlot *
-ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
-                                                 BufFile *file,
-                                                 TupleTableSlot *tupleSlot)
+ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
+                                                                 HashJoinState *hjstate,
+                                                                 uint32 *hashvalue)
 {
-       HeapTupleData htup;
-       size_t          nread;
-       HeapTuple       heapTuple;
+       HashJoinTable hashtable = hjstate->hj_HashTable;
+       int                     curbatch = hashtable->curbatch;
+       TupleTableSlot *slot;
 
-       nread = BufFileRead(file, (void *) &htup, sizeof(HeapTupleData));
-       if (nread == 0)
-               return NULL;                    /* end of file */
-       if (nread != sizeof(HeapTupleData))
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                       errmsg("could not read from hash-join temporary file: %m")));
-       heapTuple = palloc(HEAPTUPLESIZE + htup.t_len);
-       memcpy((char *) heapTuple, (char *) &htup, sizeof(HeapTupleData));
-       heapTuple->t_datamcxt = CurrentMemoryContext;
-       heapTuple->t_data = (HeapTupleHeader)
-               ((char *) heapTuple + HEAPTUPLESIZE);
-       nread = BufFileRead(file, (void *) heapTuple->t_data, htup.t_len);
-       if (nread != (size_t) htup.t_len)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                       errmsg("could not read from hash-join temporary file: %m")));
-       return ExecStoreTuple(heapTuple, tupleSlot, InvalidBuffer, true);
+       /*
+        * In the Parallel Hash case we only run the outer plan directly for
+        * single-batch hash joins.  Otherwise we have to go to batch files, even
+        * for batch 0.
+        */
+       if (curbatch == 0 && hashtable->nbatch == 1)
+       {
+               slot = ExecProcNode(outerNode);
+
+               while (!TupIsNull(slot))
+               {
+                       ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+
+                       econtext->ecxt_outertuple = slot;
+                       if (ExecHashGetHashValue(hashtable, econtext,
+                                                                        hjstate->hj_OuterHashKeys,
+                                                                        true,  /* outer tuple */
+                                                                        HJ_FILL_OUTER(hjstate),
+                                                                        hashvalue))
+                               return slot;
+
+                       /*
+                        * That tuple couldn't match because of a NULL, so discard it and
+                        * continue with the next one.
+                        */
+                       slot = ExecProcNode(outerNode);
+               }
+       }
+       else if (curbatch < hashtable->nbatch)
+       {
+               MinimalTuple tuple;
+
+               tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
+                                                                          hashvalue);
+               if (tuple != NULL)
+               {
+                       ExecForceStoreMinimalTuple(tuple,
+                                                                          hjstate->hj_OuterTupleSlot,
+                                                                          false);
+                       slot = hjstate->hj_OuterTupleSlot;
+                       return slot;
+               }
+               else
+                       ExecClearTuple(hjstate->hj_OuterTupleSlot);
+       }
+
+       /* End of this batch */
+       return NULL;
 }
 
-/* ----------------------------------------------------------------
- *             ExecHashJoinNewBatch
- *
+/*
+ * ExecHashJoinNewBatch
  *             switch to a new hashjoin batch
- * ----------------------------------------------------------------
+ *
+ * Returns true if successful, false if there are no more batches.
  */
-static int
+static bool
 ExecHashJoinNewBatch(HashJoinState *hjstate)
 {
        HashJoinTable hashtable = hjstate->hj_HashTable;
-       int                     nbatch = hashtable->nbatch;
-       int                     newbatch = hashtable->curbatch + 1;
-       long       *innerBatchSize = hashtable->innerBatchSize;
-       long       *outerBatchSize = hashtable->outerBatchSize;
+       int                     nbatch;
+       int                     curbatch;
        BufFile    *innerFile;
        TupleTableSlot *slot;
-       ExprContext *econtext;
-       List       *innerhashkeys;
+       uint32          hashvalue;
+
+       nbatch = hashtable->nbatch;
+       curbatch = hashtable->curbatch;
 
-       if (newbatch > 1)
+       if (curbatch > 0)
        {
                /*
                 * We no longer need the previous outer batch file; close it right
                 * away to free disk space.
                 */
-               BufFileClose(hashtable->outerBatchFile[newbatch - 2]);
-               hashtable->outerBatchFile[newbatch - 2] = NULL;
+               if (hashtable->outerBatchFile[curbatch])
+                       BufFileClose(hashtable->outerBatchFile[curbatch]);
+               hashtable->outerBatchFile[curbatch] = NULL;
+       }
+       else                                            /* we just finished the first batch */
+       {
+               /*
+                * Reset some of the skew optimization state variables, since we no
+                * longer need to consider skew tuples after the first batch. The
+                * memory context reset we are about to do will release the skew
+                * hashtable itself.
+                */
+               hashtable->skewEnabled = false;
+               hashtable->skewBucket = NULL;
+               hashtable->skewBucketNums = NULL;
+               hashtable->nSkewBuckets = 0;
+               hashtable->spaceUsedSkew = 0;
        }
 
        /*
-        * Normally we can skip over any batches that are empty on either side
-        * --- but for JOIN_LEFT, can only skip when left side is empty.
-        * Release associated temp files right away.
+        * We can always skip over any batches that are completely empty on both
+        * sides.  We can sometimes skip over batches that are empty on only one
+        * side, but there are exceptions:
+        *
+        * 1. In a left/full outer join, we have to process outer batches even if
+        * the inner batch is empty.  Similarly, in a right/full outer join, we
+        * have to process inner batches even if the outer batch is empty.
+        *
+        * 2. If we have increased nbatch since the initial estimate, we have to
+        * scan inner batches since they might contain tuples that need to be
+        * reassigned to later inner batches.
+        *
+        * 3. Similarly, if we have increased nbatch since starting the outer
+        * scan, we have to rescan outer batches in case they contain tuples that
+        * need to be reassigned.
         */
-       while (newbatch <= nbatch &&
-                  (outerBatchSize[newbatch - 1] == 0L ||
-                       (innerBatchSize[newbatch - 1] == 0L &&
-                        hjstate->js.jointype != JOIN_LEFT)))
+       curbatch++;
+       while (curbatch < nbatch &&
+                  (hashtable->outerBatchFile[curbatch] == NULL ||
+                       hashtable->innerBatchFile[curbatch] == NULL))
        {
-               BufFileClose(hashtable->innerBatchFile[newbatch - 1]);
-               hashtable->innerBatchFile[newbatch - 1] = NULL;
-               BufFileClose(hashtable->outerBatchFile[newbatch - 1]);
-               hashtable->outerBatchFile[newbatch - 1] = NULL;
-               newbatch++;
+               if (hashtable->outerBatchFile[curbatch] &&
+                       HJ_FILL_OUTER(hjstate))
+                       break;                          /* must process due to rule 1 */
+               if (hashtable->innerBatchFile[curbatch] &&
+                       HJ_FILL_INNER(hjstate))
+                       break;                          /* must process due to rule 1 */
+               if (hashtable->innerBatchFile[curbatch] &&
+                       nbatch != hashtable->nbatch_original)
+                       break;                          /* must process due to rule 2 */
+               if (hashtable->outerBatchFile[curbatch] &&
+                       nbatch != hashtable->nbatch_outstart)
+                       break;                          /* must process due to rule 3 */
+               /* We can ignore this batch. */
+               /* Release associated temp files right away. */
+               if (hashtable->innerBatchFile[curbatch])
+                       BufFileClose(hashtable->innerBatchFile[curbatch]);
+               hashtable->innerBatchFile[curbatch] = NULL;
+               if (hashtable->outerBatchFile[curbatch])
+                       BufFileClose(hashtable->outerBatchFile[curbatch]);
+               hashtable->outerBatchFile[curbatch] = NULL;
+               curbatch++;
        }
 
-       if (newbatch > nbatch)
-               return newbatch;                /* no more batches */
+       if (curbatch >= nbatch)
+               return false;                   /* no more batches */
+
+       hashtable->curbatch = curbatch;
 
        /*
-        * Rewind inner and outer batch files for this batch, so that we can
-        * start reading them.
+        * Reload the hash table with the new inner batch (which could be empty)
         */
-       if (BufFileSeek(hashtable->outerBatchFile[newbatch - 1], 0, 0L, SEEK_SET))
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                          errmsg("could not rewind hash-join temporary file: %m")));
+       ExecHashTableReset(hashtable);
 
-       innerFile = hashtable->innerBatchFile[newbatch - 1];
+       innerFile = hashtable->innerBatchFile[curbatch];
 
-       if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                          errmsg("could not rewind hash-join temporary file: %m")));
+       if (innerFile != NULL)
+       {
+               if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not rewind hash-join temporary file: %m")));
+
+               while ((slot = ExecHashJoinGetSavedTuple(hjstate,
+                                                                                                innerFile,
+                                                                                                &hashvalue,
+                                                                                                hjstate->hj_HashTupleSlot)))
+               {
+                       /*
+                        * NOTE: some tuples may be sent to future batches.  Also, it is
+                        * possible for hashtable->nbatch to be increased here!
+                        */
+                       ExecHashTableInsert(hashtable, slot, hashvalue);
+               }
+
+               /*
+                * after we build the hash table, the inner batch file is no longer
+                * needed
+                */
+               BufFileClose(innerFile);
+               hashtable->innerBatchFile[curbatch] = NULL;
+       }
 
        /*
-        * Reload the hash table with the new inner batch
+        * Rewind outer batch file (if present), so that we can start reading it.
         */
-       ExecHashTableReset(hashtable, innerBatchSize[newbatch - 1]);
+       if (hashtable->outerBatchFile[curbatch] != NULL)
+       {
+               if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not rewind hash-join temporary file: %m")));
+       }
+
+       return true;
+}
+
+/*
+ * Choose a batch to work on, and attach to it.  Returns true if successful,
+ * false if there are no more batches.
+ */
+static bool
+ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
+{
+       HashJoinTable hashtable = hjstate->hj_HashTable;
+       int                     start_batchno;
+       int                     batchno;
 
-       econtext = hjstate->js.ps.ps_ExprContext;
-       innerhashkeys = hjstate->hj_InnerHashKeys;
+       /*
+        * If we started up so late that the batch tracking array has been freed
+        * already by ExecHashTableDetach(), then we are finished.  See also
+        * ExecParallelHashEnsureBatchAccessors().
+        */
+       if (hashtable->batches == NULL)
+               return false;
 
-       while ((slot = ExecHashJoinGetSavedTuple(hjstate,
-                                                                                        innerFile,
-                                                                                        hjstate->hj_HashTupleSlot))
-                  && !TupIsNull(slot))
+       /*
+        * If we were already attached to a batch, remember not to bother checking
+        * it again, and detach from it (possibly freeing the hash table if we are
+        * last to detach).
+        */
+       if (hashtable->curbatch >= 0)
        {
-               econtext->ecxt_innertuple = slot;
-               ExecHashTableInsert(hashtable, econtext, innerhashkeys);
+               hashtable->batches[hashtable->curbatch].done = true;
+               ExecHashTableDetachBatch(hashtable);
        }
 
        /*
-        * after we build the hash table, the inner batch file is no longer
-        * needed
+        * Search for a batch that isn't done.  We use an atomic counter to start
+        * our search at a different batch in every participant when there are
+        * more batches than participants.
         */
-       BufFileClose(innerFile);
-       hashtable->innerBatchFile[newbatch - 1] = NULL;
+       batchno = start_batchno =
+               pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
+               hashtable->nbatch;
+       do
+       {
+               uint32          hashvalue;
+               MinimalTuple tuple;
+               TupleTableSlot *slot;
+
+               if (!hashtable->batches[batchno].done)
+               {
+                       SharedTuplestoreAccessor *inner_tuples;
+                       Barrier    *batch_barrier =
+                       &hashtable->batches[batchno].shared->batch_barrier;
 
-       hashtable->curbatch = newbatch;
-       return newbatch;
+                       switch (BarrierAttach(batch_barrier))
+                       {
+                               case PHJ_BATCH_ELECTING:
+
+                                       /* One backend allocates the hash table. */
+                                       if (BarrierArriveAndWait(batch_barrier,
+                                                                                        WAIT_EVENT_HASH_BATCH_ELECTING))
+                                               ExecParallelHashTableAlloc(hashtable, batchno);
+                                       /* Fall through. */
+
+                               case PHJ_BATCH_ALLOCATING:
+                                       /* Wait for allocation to complete. */
+                                       BarrierArriveAndWait(batch_barrier,
+                                                                                WAIT_EVENT_HASH_BATCH_ALLOCATING);
+                                       /* Fall through. */
+
+                               case PHJ_BATCH_LOADING:
+                                       /* Start (or join in) loading tuples. */
+                                       ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+                                       inner_tuples = hashtable->batches[batchno].inner_tuples;
+                                       sts_begin_parallel_scan(inner_tuples);
+                                       while ((tuple = sts_parallel_scan_next(inner_tuples,
+                                                                                                                  &hashvalue)))
+                                       {
+                                               ExecForceStoreMinimalTuple(tuple,
+                                                                                                  hjstate->hj_HashTupleSlot,
+                                                                                                  false);
+                                               slot = hjstate->hj_HashTupleSlot;
+                                               ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
+                                                                                                                               hashvalue);
+                                       }
+                                       sts_end_parallel_scan(inner_tuples);
+                                       BarrierArriveAndWait(batch_barrier,
+                                                                                WAIT_EVENT_HASH_BATCH_LOADING);
+                                       /* Fall through. */
+
+                               case PHJ_BATCH_PROBING:
+
+                                       /*
+                                        * This batch is ready to probe.  Return control to
+                                        * caller. We stay attached to batch_barrier so that the
+                                        * hash table stays alive until everyone's finished
+                                        * probing it, but no participant is allowed to wait at
+                                        * this barrier again (or else a deadlock could occur).
+                                        * All attached participants must eventually call
+                                        * BarrierArriveAndDetach() so that the final phase
+                                        * PHJ_BATCH_DONE can be reached.
+                                        */
+                                       ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+                                       sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
+                                       return true;
+
+                               case PHJ_BATCH_DONE:
+
+                                       /*
+                                        * Already done.  Detach and go around again (if any
+                                        * remain).
+                                        */
+                                       BarrierDetach(batch_barrier);
+                                       hashtable->batches[batchno].done = true;
+                                       hashtable->curbatch = -1;
+                                       break;
+
+                               default:
+                                       elog(ERROR, "unexpected batch phase %d",
+                                                BarrierPhase(batch_barrier));
+                       }
+               }
+               batchno = (batchno + 1) % hashtable->nbatch;
+       } while (batchno != start_batchno);
+
+       return false;
 }
 
-/* ----------------------------------------------------------------
- *             ExecHashJoinSaveTuple
+/*
+ * ExecHashJoinSaveTuple
+ *             save a tuple to a batch file.
  *
- *             save a tuple to a tmp file.
+ * The data recorded in the file for each tuple is its hash value,
+ * then the tuple in MinimalTuple format.
  *
- * The data recorded in the file for each tuple is an image of its
- * HeapTupleData (with meaningless t_data pointer) followed by the
- * HeapTupleHeader and tuple data.
- * ----------------------------------------------------------------
+ * Note: it is important always to call this in the regular executor
+ * context, not in a shorter-lived context; else the temp file buffers
+ * will get messed up.
  */
-
 void
-ExecHashJoinSaveTuple(HeapTuple heapTuple,
-                                         BufFile *file)
+ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
+                                         BufFile **fileptr)
 {
+       BufFile    *file = *fileptr;
        size_t          written;
 
-       written = BufFileWrite(file, (void *) heapTuple, sizeof(HeapTupleData));
-       if (written != sizeof(HeapTupleData))
+       if (file == NULL)
+       {
+               /* First write to this batch file, so open it. */
+               file = BufFileCreateTemp(false);
+               *fileptr = file;
+       }
+
+       written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
+       if (written != sizeof(uint32))
                ereport(ERROR,
                                (errcode_for_file_access(),
-                        errmsg("could not write to hash-join temporary file: %m")));
-       written = BufFileWrite(file, (void *) heapTuple->t_data, heapTuple->t_len);
-       if (written != (size_t) heapTuple->t_len)
+                                errmsg("could not write to hash-join temporary file: %m")));
+
+       written = BufFileWrite(file, (void *) tuple, tuple->t_len);
+       if (written != tuple->t_len)
                ereport(ERROR,
                                (errcode_for_file_access(),
-                        errmsg("could not write to hash-join temporary file: %m")));
+                                errmsg("could not write to hash-join temporary file: %m")));
 }
 
-void
-ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
+/*
+ * ExecHashJoinGetSavedTuple
+ *             read the next tuple from a batch file.  Return NULL if no more.
+ *
+ * On success, *hashvalue is set to the tuple's hash value, and the tuple
+ * itself is stored in the given slot.
+ */
+static TupleTableSlot *
+ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
+                                                 BufFile *file,
+                                                 uint32 *hashvalue,
+                                                 TupleTableSlot *tupleSlot)
 {
+       uint32          header[2];
+       size_t          nread;
+       MinimalTuple tuple;
+
        /*
-        * If we haven't yet built the hash table then we can just return;
-        * nothing done yet, so nothing to undo.
+        * We check for interrupts here because this is typically taken as an
+        * alternative code path to an ExecProcNode() call, which would include
+        * such a check.
         */
-       if (!node->hj_hashdone)
-               return;
-       Assert(node->hj_HashTable != NULL);
+       CHECK_FOR_INTERRUPTS();
 
        /*
-        * In a multi-batch join, we currently have to do rescans the hard
-        * way, primarily because batch temp files may have already been
-        * released. But if it's a single-batch join, and there is no
-        * parameter change for the inner subnode, then we can just re-use the
-        * existing hash table without rebuilding it.
+        * Since both the hash value and the MinimalTuple length word are uint32,
+        * we can read them both in one BufFileRead() call without any type
+        * cheating.
         */
-       if (node->hj_HashTable->nbatch == 0 &&
-               ((PlanState *) node)->righttree->chgParam == NULL)
+       nread = BufFileRead(file, (void *) header, sizeof(header));
+       if (nread == 0)                         /* end of file */
        {
-               /* okay to reuse the hash table; needn't rescan inner, either */
+               ExecClearTuple(tupleSlot);
+               return NULL;
        }
-       else
+       if (nread != sizeof(header))
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not read from hash-join temporary file: %m")));
+       *hashvalue = header[0];
+       tuple = (MinimalTuple) palloc(header[1]);
+       tuple->t_len = header[1];
+       nread = BufFileRead(file,
+                                               (void *) ((char *) tuple + sizeof(uint32)),
+                                               header[1] - sizeof(uint32));
+       if (nread != header[1] - sizeof(uint32))
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not read from hash-join temporary file: %m")));
+       ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
+       return tupleSlot;
+}
+
+
+void
+ExecReScanHashJoin(HashJoinState *node)
+{
+       /*
+        * In a multi-batch join, we currently have to do rescans the hard way,
+        * primarily because batch temp files may have already been released. But
+        * if it's a single-batch join, and there is no parameter change for the
+        * inner subnode, then we can just re-use the existing hash table without
+        * rebuilding it.
+        */
+       if (node->hj_HashTable != NULL)
        {
-               /* must destroy and rebuild hash table */
-               node->hj_hashdone = false;
-               ExecHashTableDestroy(node->hj_HashTable);
-               node->hj_HashTable = NULL;
+               if (node->hj_HashTable->nbatch == 1 &&
+                       node->js.ps.righttree->chgParam == NULL)
+               {
+                       /*
+                        * Okay to reuse the hash table; needn't rescan inner, either.
+                        *
+                        * However, if it's a right/full join, we'd better reset the
+                        * inner-tuple match flags contained in the table.
+                        */
+                       if (HJ_FILL_INNER(node))
+                               ExecHashTableResetMatchFlags(node->hj_HashTable);
 
-               /*
-                * if chgParam of subnode is not null then plan will be re-scanned
-                * by first ExecProcNode.
-                */
-               if (((PlanState *) node)->righttree->chgParam == NULL)
-                       ExecReScan(((PlanState *) node)->righttree, exprCtxt);
+                       /*
+                        * Also, we need to reset our state about the emptiness of the
+                        * outer relation, so that the new scan of the outer will update
+                        * it correctly if it turns out to be empty this time. (There's no
+                        * harm in clearing it now because ExecHashJoin won't need the
+                        * info.  In the other cases, where the hash table doesn't exist
+                        * or we are destroying it, we leave this state alone because
+                        * ExecHashJoin will need it the first time through.)
+                        */
+                       node->hj_OuterNotEmpty = false;
+
+                       /* ExecHashJoin can skip the BUILD_HASHTABLE step */
+                       node->hj_JoinState = HJ_NEED_NEW_OUTER;
+               }
+               else
+               {
+                       /* must destroy and rebuild hash table */
+                       ExecHashTableDestroy(node->hj_HashTable);
+                       node->hj_HashTable = NULL;
+                       node->hj_JoinState = HJ_BUILD_HASHTABLE;
+
+                       /*
+                        * if chgParam of subnode is not null then plan will be re-scanned
+                        * by first ExecProcNode.
+                        */
+                       if (node->js.ps.righttree->chgParam == NULL)
+                               ExecReScan(node->js.ps.righttree);
+               }
        }
 
        /* Always reset intra-tuple state */
+       node->hj_CurHashValue = 0;
        node->hj_CurBucketNo = 0;
+       node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
        node->hj_CurTuple = NULL;
 
-       node->js.ps.ps_OuterTupleSlot = NULL;
-       node->js.ps.ps_TupFromTlist = false;
-       node->hj_NeedNewOuter = true;
        node->hj_MatchedOuter = false;
+       node->hj_FirstOuterTupleSlot = NULL;
 
        /*
         * if chgParam of subnode is not null then plan will be re-scanned by
         * first ExecProcNode.
         */
-       if (((PlanState *) node)->lefttree->chgParam == NULL)
-               ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
+       if (node->js.ps.lefttree->chgParam == NULL)
+               ExecReScan(node->js.ps.lefttree);
+}
+
+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+       if (node->hj_HashTable)
+       {
+               /*
+                * Detach from shared state before DSM memory goes away.  This makes
+                * sure that we don't have any pointers into DSM memory by the time
+                * ExecEndHashJoin runs.
+                */
+               ExecHashTableDetachBatch(node->hj_HashTable);
+               ExecHashTableDetach(node->hj_HashTable);
+       }
+}
+
+static void
+ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
+{
+       PlanState  *outerState = outerPlanState(hjstate);
+       ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+       HashJoinTable hashtable = hjstate->hj_HashTable;
+       TupleTableSlot *slot;
+       uint32          hashvalue;
+       int                     i;
+
+       Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
+
+       /* Execute outer plan, writing all tuples to shared tuplestores. */
+       for (;;)
+       {
+               slot = ExecProcNode(outerState);
+               if (TupIsNull(slot))
+                       break;
+               econtext->ecxt_outertuple = slot;
+               if (ExecHashGetHashValue(hashtable, econtext,
+                                                                hjstate->hj_OuterHashKeys,
+                                                                true,  /* outer tuple */
+                                                                HJ_FILL_OUTER(hjstate),
+                                                                &hashvalue))
+               {
+                       int                     batchno;
+                       int                     bucketno;
+                       bool            shouldFree;
+                       MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
+
+                       ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
+                                                                         &batchno);
+                       sts_puttuple(hashtable->batches[batchno].outer_tuples,
+                                                &hashvalue, mintup);
+
+                       if (shouldFree)
+                               heap_free_minimal_tuple(mintup);
+               }
+               CHECK_FOR_INTERRUPTS();
+       }
+
+       /* Make sure all outer partitions are readable by any backend. */
+       for (i = 0; i < hashtable->nbatch; ++i)
+               sts_end_write(hashtable->batches[i].outer_tuples);
+}
+
+void
+ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
+{
+       shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
+       shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+void
+ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
+{
+       int                     plan_node_id = state->js.ps.plan->plan_node_id;
+       HashState  *hashNode;
+       ParallelHashJoinState *pstate;
+
+       /*
+        * Disable shared hash table mode if we failed to create a real DSM
+        * segment, because that means that we don't have a DSA area to work with.
+        */
+       if (pcxt->seg == NULL)
+               return;
+
+       ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
+
+       /*
+        * Set up the state needed to coordinate access to the shared hash
+        * table(s), using the plan node ID as the toc key.
+        */
+       pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
+       shm_toc_insert(pcxt->toc, plan_node_id, pstate);
+
+       /*
+        * Set up the shared hash join state with no batches initially.
+        * ExecHashTableCreate() will prepare at least one later and set nbatch
+        * and space_allowed.
+        */
+       pstate->nbatch = 0;
+       pstate->space_allowed = 0;
+       pstate->batches = InvalidDsaPointer;
+       pstate->old_batches = InvalidDsaPointer;
+       pstate->nbuckets = 0;
+       pstate->growth = PHJ_GROWTH_OK;
+       pstate->chunk_work_queue = InvalidDsaPointer;
+       pg_atomic_init_u32(&pstate->distributor, 0);
+       pstate->nparticipants = pcxt->nworkers + 1;
+       pstate->total_tuples = 0;
+       LWLockInitialize(&pstate->lock,
+                                        LWTRANCHE_PARALLEL_HASH_JOIN);
+       BarrierInit(&pstate->build_barrier, 0);
+       BarrierInit(&pstate->grow_batches_barrier, 0);
+       BarrierInit(&pstate->grow_buckets_barrier, 0);
+
+       /* Set up the space we'll use for shared temporary files. */
+       SharedFileSetInit(&pstate->fileset, pcxt->seg);
+
+       /* Initialize the shared state in the hash node. */
+       hashNode = (HashState *) innerPlanState(state);
+       hashNode->parallel_state = pstate;
+}
+
+/* ----------------------------------------------------------------
+ *             ExecHashJoinReInitializeDSM
+ *
+ *             Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
+{
+       int                     plan_node_id = state->js.ps.plan->plan_node_id;
+       ParallelHashJoinState *pstate =
+       shm_toc_lookup(cxt->toc, plan_node_id, false);
+
+       /*
+        * It would be possible to reuse the shared hash table in single-batch
+        * cases by resetting and then fast-forwarding build_barrier to
+        * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
+        * currently shared hash tables are already freed by now (by the last
+        * participant to detach from the batch).  We could consider keeping it
+        * around for single-batch joins.  We'd also need to adjust
+        * finalize_plan() so that it doesn't record a dummy dependency for
+        * Parallel Hash nodes, preventing the rescan optimization.  For now we
+        * don't try.
+        */
+
+       /* Detach, freeing any remaining shared memory. */
+       if (state->hj_HashTable != NULL)
+       {
+               ExecHashTableDetachBatch(state->hj_HashTable);
+               ExecHashTableDetach(state->hj_HashTable);
+       }
+
+       /* Clear any shared batch files. */
+       SharedFileSetDeleteAll(&pstate->fileset);
+
+       /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
+       BarrierInit(&pstate->build_barrier, 0);
+}
+
+void
+ExecHashJoinInitializeWorker(HashJoinState *state,
+                                                        ParallelWorkerContext *pwcxt)
+{
+       HashState  *hashNode;
+       int                     plan_node_id = state->js.ps.plan->plan_node_id;
+       ParallelHashJoinState *pstate =
+       shm_toc_lookup(pwcxt->toc, plan_node_id, false);
+
+       /* Attach to the space for shared temporary files. */
+       SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
+
+       /* Attach to the shared state in the hash node. */
+       hashNode = (HashState *) innerPlanState(state);
+       hashNode->parallel_state = pstate;
+
+       ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
 }