]> granicus.if.org Git - postgresql/commitdiff
Separate reinitialization of shared parallel-scan state from ExecReScan.
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 30 Aug 2017 17:18:16 +0000 (13:18 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 30 Aug 2017 17:18:16 +0000 (13:18 -0400)
Previously, the parallel executor logic did reinitialization of shared
state within the ExecReScan code for parallel-aware scan nodes.  This is
problematic, because it means that the ExecReScan call has to occur
synchronously (ie, during the parent Gather node's ReScan call).  That is
swimming very much against the tide so far as the ExecReScan machinery is
concerned; the fact that it works at all today depends on a lot of fragile
assumptions, such as that no plan node between Gather and a parallel-aware
scan node is parameterized.  Another objection is that because ExecReScan
might be called in workers as well as the leader, hacky extra tests are
needed in some places to prevent unwanted shared-state resets.

Hence, let's separate this code into two functions, a ReInitializeDSM
call and the ReScan call proper.  ReInitializeDSM is called only in
the leader and is guaranteed to run before we start new workers.
ReScan is returned to its traditional function of resetting only local
state, which means that ExecReScan's usual habits of delaying or
eliminating child rescan calls are safe again.

As with the preceding commit 7df2c1f8d, it doesn't seem to be necessary
to make these changes in 9.6, which is a good thing because the FDW and
CustomScan APIs are impacted.

Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com

22 files changed:
doc/src/sgml/custom-scan.sgml
doc/src/sgml/fdwhandler.sgml
src/backend/access/heap/heapam.c
src/backend/executor/execParallel.c
src/backend/executor/nodeBitmapHeapscan.c
src/backend/executor/nodeCustom.c
src/backend/executor/nodeForeignscan.c
src/backend/executor/nodeGather.c
src/backend/executor/nodeGatherMerge.c
src/backend/executor/nodeIndexonlyscan.c
src/backend/executor/nodeIndexscan.c
src/backend/executor/nodeSeqscan.c
src/include/access/heapam.h
src/include/executor/execParallel.h
src/include/executor/nodeBitmapHeapscan.h
src/include/executor/nodeCustom.h
src/include/executor/nodeForeignscan.h
src/include/executor/nodeIndexonlyscan.h
src/include/executor/nodeIndexscan.h
src/include/executor/nodeSeqscan.h
src/include/foreign/fdwapi.h
src/include/nodes/extensible.h

index 6159c3a24ebf09cfe0e96bf9d56534d459628020..9d1ca7bfe162b1160f7b4dc6ab31b7141d51ad89 100644 (file)
@@ -320,22 +320,39 @@ void (*InitializeDSMCustomScan) (CustomScanState *node,
                                  void *coordinate);
 </programlisting>
     Initialize the dynamic shared memory that will be required for parallel
-    operation; <literal>coordinate</> points to an amount of allocated space
-    equal to the return value of <function>EstimateDSMCustomScan</>.
+    operation.  <literal>coordinate</> points to a shared memory area of
+    size equal to the return value of <function>EstimateDSMCustomScan</>.
     This callback is optional, and need only be supplied if this custom
     scan provider supports parallel execution.
    </para>
 
    <para>
 <programlisting>
+void (*ReInitializeDSMCustomScan) (CustomScanState *node,
+                                   ParallelContext *pcxt,
+                                   void *coordinate);
+</programlisting>
+    Re-initialize the dynamic shared memory required for parallel operation
+    when the custom-scan plan node is about to be re-scanned.
+    This callback is optional, and need only be supplied if this custom
+    scan provider supports parallel execution.
+    Recommended practice is that this callback reset only shared state,
+    while the <function>ReScanCustomScan</> callback resets only local
+    state.  Currently, this callback will be called
+    before <function>ReScanCustomScan</>, but it's best not to rely on
+    that ordering.
+   </para>
+
+   <para>
+<programlisting>
 void (*InitializeWorkerCustomScan) (CustomScanState *node,
                                     shm_toc *toc,
                                     void *coordinate);
 </programlisting>
-    Initialize a parallel worker's custom state based on the shared state
-    set up in the leader by <literal>InitializeDSMCustomScan</>.
-    This callback is optional, and needs only be supplied if this
-    custom path supports parallel execution.
+    Initialize a parallel worker's local state based on the shared state
+    set up by the leader during <function>InitializeDSMCustomScan</>.
+    This callback is optional, and need only be supplied if this custom
+    scan provider supports parallel execution.
    </para>
 
    <para>
index dbeaab555db6775624d05a297c4174fcc62292e8..cfa68084179102e43d5d5bfd155a1b12ea2116a4 100644 (file)
@@ -1191,12 +1191,12 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid);
     <para>
      A <structname>ForeignScan</> node can, optionally, support parallel
      execution.  A parallel <structname>ForeignScan</> will be executed
-     in multiple processes and should return each row only once across
+     in multiple processes and must return each row exactly once across
      all cooperating processes.  To do this, processes can coordinate through
-     fixed size chunks of dynamic shared memory.  This shared memory is not
-     guaranteed to be mapped at the same address in every process, so pointers
-     may not be used.  The following callbacks are all optional in general,
-     but required if parallel execution is to be supported.
+     fixed-size chunks of dynamic shared memory.  This shared memory is not
+     guaranteed to be mapped at the same address in every process, so it
+     must not contain pointers.  The following functions are all optional,
+     but most are required if parallel execution is to be supported.
     </para>
 
     <para>
@@ -1215,7 +1215,7 @@ IsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
     </para>
 
     <para>
-    If this callback is not defined, it is assumed that the scan must take
+    If this function is not defined, it is assumed that the scan must take
     place within the parallel leader.  Note that returning true does not mean
     that the scan itself can be done in parallel, only that the scan can be
     performed within a parallel worker.  Therefore, it can be useful to define
@@ -1230,6 +1230,9 @@ EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt);
     Estimate the amount of dynamic shared memory that will be required
     for parallel operation.  This may be higher than the amount that will
     actually be used, but it must not be lower.  The return value is in bytes.
+    This function is optional, and can be omitted if not needed; but if it
+    is omitted, the next three functions must be omitted as well, because
+    no shared memory will be allocated for the FDW's use.
     </para>
 
     <para>
@@ -1239,8 +1242,25 @@ InitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
                          void *coordinate);
 </programlisting>
     Initialize the dynamic shared memory that will be required for parallel
-    operation; <literal>coordinate</> points to an amount of allocated space
-    equal to the return value of <function>EstimateDSMForeignScan</>.
+    operation.  <literal>coordinate</> points to a shared memory area of
+    size equal to the return value of <function>EstimateDSMForeignScan</>.
+    This function is optional, and can be omitted if not needed.
+   </para>
+
+    <para>
+<programlisting>
+void
+ReInitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
+                           void *coordinate);
+</programlisting>
+    Re-initialize the dynamic shared memory required for parallel operation
+    when the foreign-scan plan node is about to be re-scanned.
+    This function is optional, and can be omitted if not needed.
+    Recommended practice is that this function reset only shared state,
+    while the <function>ReScanForeignScan</> function resets only local
+    state.  Currently, this function will be called
+    before <function>ReScanForeignScan</>, but it's best not to rely on
+    that ordering.
    </para>
 
    <para>
@@ -1249,10 +1269,9 @@ void
 InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
                             void *coordinate);
 </programlisting>
-    Initialize a parallel worker's custom state based on the shared state
-    set up in the leader by <literal>InitializeDSMForeignScan</>.
-    This callback is optional, and needs only be supplied if this
-    custom path supports parallel execution.
+    Initialize a parallel worker's local state based on the shared state
+    set up by the leader during <function>InitializeDSMForeignScan</>.
+    This function is optional, and can be omitted if not needed.
    </para>
 
    <para>
index e283fe5b1f51ac8dbc2712b16f6774efda96ee87..e61dd771d0cff3d215bff2cd1ca144daca16269e 100644 (file)
@@ -1525,25 +1525,6 @@ heap_rescan(HeapScanDesc scan,
         * reinitialize scan descriptor
         */
        initscan(scan, key, true);
-
-       /*
-        * reset parallel scan, if present
-        */
-       if (scan->rs_parallel != NULL)
-       {
-               ParallelHeapScanDesc parallel_scan;
-
-               /*
-                * Caller is responsible for making sure that all workers have
-                * finished the scan before calling this, so it really shouldn't be
-                * necessary to acquire the mutex at all.  We acquire it anyway, just
-                * to be tidy.
-                */
-               parallel_scan = scan->rs_parallel;
-               SpinLockAcquire(&parallel_scan->phs_mutex);
-               parallel_scan->phs_cblock = parallel_scan->phs_startblock;
-               SpinLockRelease(&parallel_scan->phs_mutex);
-       }
 }
 
 /* ----------------
@@ -1640,6 +1621,25 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
        SerializeSnapshot(snapshot, target->phs_snapshot_data);
 }
 
+/* ----------------
+ *             heap_parallelscan_reinitialize - reset a parallel scan
+ *
+ *             Call this in the leader process.  Caller is responsible for
+ *             making sure that all workers have finished the scan beforehand.
+ * ----------------
+ */
+void
+heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan)
+{
+       /*
+        * It shouldn't be necessary to acquire the mutex here, but we do it
+        * anyway, just to be tidy.
+        */
+       SpinLockAcquire(&parallel_scan->phs_mutex);
+       parallel_scan->phs_cblock = parallel_scan->phs_startblock;
+       SpinLockRelease(&parallel_scan->phs_mutex);
+}
+
 /* ----------------
  *             heap_beginscan_parallel - join a parallel scan
  *
index ce47f1d4a8b7eacd1fc9091582501f85bfaede3b..2313b4c45cbd27538b7a6a615faa8730eeaa3a16 100644 (file)
@@ -109,6 +109,8 @@ static bool ExecParallelInitializeDSM(PlanState *node,
                                                  ExecParallelInitializeDSMContext *d);
 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
                                                         bool reinitialize);
+static bool ExecParallelReInitializeDSM(PlanState *planstate,
+                                                       ParallelContext *pcxt);
 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
                                                                        SharedExecutorInstrumentation *instrumentation);
 
@@ -364,18 +366,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
        return responseq;
 }
 
-/*
- * Re-initialize the parallel executor info such that it can be reused by
- * workers.
- */
-void
-ExecParallelReinitialize(ParallelExecutorInfo *pei)
-{
-       ReinitializeParallelDSM(pei->pcxt);
-       pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
-       pei->finished = false;
-}
-
 /*
  * Sets up the required infrastructure for backend workers to perform
  * execution and return results to the main backend.
@@ -567,7 +557,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        ExecParallelInitializeDSM(planstate, &d);
 
        /*
-        * Make sure that the world hasn't shifted under our feat.  This could
+        * Make sure that the world hasn't shifted under our feet.  This could
         * probably just be an Assert(), but let's be conservative for now.
         */
        if (e.nnodes != d.nnodes)
@@ -577,6 +567,75 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        return pei;
 }
 
+/*
+ * Re-initialize the parallel executor shared memory state before launching
+ * a fresh batch of workers.
+ */
+void
+ExecParallelReinitialize(PlanState *planstate,
+                                                ParallelExecutorInfo *pei)
+{
+       /* Old workers must already be shut down */
+       Assert(pei->finished);
+
+       ReinitializeParallelDSM(pei->pcxt);
+       pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+       pei->finished = false;
+
+       /* Traverse plan tree and let each child node reset associated state. */
+       ExecParallelReInitializeDSM(planstate, pei->pcxt);
+}
+
+/*
+ * Traverse plan tree to reinitialize per-node dynamic shared memory state
+ */
+static bool
+ExecParallelReInitializeDSM(PlanState *planstate,
+                                                       ParallelContext *pcxt)
+{
+       if (planstate == NULL)
+               return false;
+
+       /*
+        * Call reinitializers for DSM-using plan nodes.
+        */
+       if (planstate->plan->parallel_aware)
+       {
+               switch (nodeTag(planstate))
+               {
+                       case T_SeqScanState:
+                               ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
+                                                                                  pcxt);
+                               break;
+                       case T_IndexScanState:
+                               ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
+                                                                                        pcxt);
+                               break;
+                       case T_IndexOnlyScanState:
+                               ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
+                                                                                                pcxt);
+                               break;
+                       case T_ForeignScanState:
+                               ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
+                                                                                          pcxt);
+                               break;
+                       case T_CustomScanState:
+                               ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
+                                                                                         pcxt);
+                               break;
+                       case T_BitmapHeapScanState:
+                               ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
+                                                                                         pcxt);
+                               break;
+
+                       default:
+                               break;
+               }
+       }
+
+       return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
+}
+
 /*
  * Copy instrumentation information about this node and its descendants from
  * dynamic shared memory.
index 79f534e4e92b7be2ccc19127700fe0462406953e..f7e55e0b45bd54e4f2fcd571147c53b012e53873 100644 (file)
@@ -705,23 +705,6 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
        node->shared_tbmiterator = NULL;
        node->shared_prefetch_iterator = NULL;
 
-       /* Reset parallel bitmap state, if present */
-       if (node->pstate)
-       {
-               dsa_area   *dsa = node->ss.ps.state->es_query_dsa;
-
-               node->pstate->state = BM_INITIAL;
-
-               if (DsaPointerIsValid(node->pstate->tbmiterator))
-                       tbm_free_shared_area(dsa, node->pstate->tbmiterator);
-
-               if (DsaPointerIsValid(node->pstate->prefetch_iterator))
-                       tbm_free_shared_area(dsa, node->pstate->prefetch_iterator);
-
-               node->pstate->tbmiterator = InvalidDsaPointer;
-               node->pstate->prefetch_iterator = InvalidDsaPointer;
-       }
-
        ExecScanReScan(&node->ss);
 
        /*
@@ -999,6 +982,31 @@ ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
        node->pstate = pstate;
 }
 
+/* ----------------------------------------------------------------
+ *             ExecBitmapHeapReInitializeDSM
+ *
+ *             Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
+                                                         ParallelContext *pcxt)
+{
+       ParallelBitmapHeapState *pstate = node->pstate;
+       dsa_area   *dsa = node->ss.ps.state->es_query_dsa;
+
+       pstate->state = BM_INITIAL;
+
+       if (DsaPointerIsValid(pstate->tbmiterator))
+               tbm_free_shared_area(dsa, pstate->tbmiterator);
+
+       if (DsaPointerIsValid(pstate->prefetch_iterator))
+               tbm_free_shared_area(dsa, pstate->prefetch_iterator);
+
+       pstate->tbmiterator = InvalidDsaPointer;
+       pstate->prefetch_iterator = InvalidDsaPointer;
+}
+
 /* ----------------------------------------------------------------
  *             ExecBitmapHeapInitializeWorker
  *
index fb7645b1f46da351c574708686be07e1965aeca1..07dcabef5514633e43b93d629a1d352303ca7551 100644 (file)
@@ -194,6 +194,21 @@ ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
        }
 }
 
+void
+ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
+{
+       const CustomExecMethods *methods = node->methods;
+
+       if (methods->ReInitializeDSMCustomScan)
+       {
+               int                     plan_node_id = node->ss.ps.plan->plan_node_id;
+               void       *coordinate;
+
+               coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
+               methods->ReInitializeDSMCustomScan(node, pcxt, coordinate);
+       }
+}
+
 void
 ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
 {
index 140e82ef5e4d071381fb716b34a72f912e243363..20892d6d5fb1952a233b66b5f14c840163131187 100644 (file)
@@ -332,7 +332,28 @@ ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
 }
 
 /* ----------------------------------------------------------------
- *             ExecForeignScanInitializeDSM
+ *             ExecForeignScanReInitializeDSM
+ *
+ *             Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
+{
+       FdwRoutine *fdwroutine = node->fdwroutine;
+
+       if (fdwroutine->ReInitializeDSMForeignScan)
+       {
+               int                     plan_node_id = node->ss.ps.plan->plan_node_id;
+               void       *coordinate;
+
+               coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
+               fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
+       }
+}
+
+/* ----------------------------------------------------------------
+ *             ExecForeignScanInitializeWorker
  *
  *             Initialization according to the parallel coordination information
  * ----------------------------------------------------------------
index 3aa819f450aeb876d3472741d697ab015cb8ff94..55fce231ce1e54b123ba930ba824339f000d1493 100644 (file)
@@ -152,11 +152,14 @@ ExecGather(PlanState *pstate)
                {
                        ParallelContext *pcxt;
 
-                       /* Initialize the workers required to execute Gather node. */
+                       /* Initialize, or re-initialize, shared state needed by workers. */
                        if (!node->pei)
                                node->pei = ExecInitParallelPlan(node->ps.lefttree,
                                                                                                 estate,
                                                                                                 gather->num_workers);
+                       else
+                               ExecParallelReinitialize(node->ps.lefttree,
+                                                                                node->pei);
 
                        /*
                         * Register backend workers. We might not get as many as we
@@ -424,7 +427,7 @@ ExecShutdownGather(GatherState *node)
 /* ----------------------------------------------------------------
  *             ExecReScanGather
  *
- *             Re-initialize the workers and rescans a relation via them.
+ *             Prepare to re-scan the result of a Gather.
  * ----------------------------------------------------------------
  */
 void
@@ -433,19 +436,12 @@ ExecReScanGather(GatherState *node)
        Gather     *gather = (Gather *) node->ps.plan;
        PlanState  *outerPlan = outerPlanState(node);
 
-       /*
-        * Re-initialize the parallel workers to perform rescan of relation. We
-        * want to gracefully shutdown all the workers so that they should be able
-        * to propagate any error or other information to master backend before
-        * dying.  Parallel context will be reused for rescan.
-        */
+       /* Make sure any existing workers are gracefully shut down */
        ExecShutdownGatherWorkers(node);
 
+       /* Mark node so that shared state will be rebuilt at next call */
        node->initialized = false;
 
-       if (node->pei)
-               ExecParallelReinitialize(node->pei);
-
        /*
         * Set child node's chgParam to tell it that the next scan might deliver a
         * different set of rows within the leader process.  (The overall rowset
@@ -457,10 +453,15 @@ ExecReScanGather(GatherState *node)
                outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
                                                                                         gather->rescan_param);
 
-
        /*
-        * if chgParam of subnode is not null then plan will be re-scanned by
-        * first ExecProcNode.
+        * If chgParam of subnode is not null then plan will be re-scanned by
+        * first ExecProcNode.  Note: because this does nothing if we have a
+        * rescan_param, it's currently guaranteed that parallel-aware child nodes
+        * will not see a ReScan call until after they get a ReInitializeDSM call.
+        * That ordering might not be something to rely on, though.  A good rule
+        * of thumb is that ReInitializeDSM should reset only shared state, ReScan
+        * should reset only local state, and anything that depends on both of
+        * those steps being finished must wait until the first ExecProcNode call.
         */
        if (outerPlan->chgParam == NULL)
                ExecReScan(outerPlan);
index e8c70dfc60ae0ea2769334c65128e4f81d59cf19..90c1be96ef40c4165a252abc230726a6c7c5b0da 100644 (file)
@@ -186,11 +186,14 @@ ExecGatherMerge(PlanState *pstate)
                {
                        ParallelContext *pcxt;
 
-                       /* Initialize data structures for workers. */
+                       /* Initialize, or re-initialize, shared state needed by workers. */
                        if (!node->pei)
                                node->pei = ExecInitParallelPlan(node->ps.lefttree,
                                                                                                 estate,
                                                                                                 gm->num_workers);
+                       else
+                               ExecParallelReinitialize(node->ps.lefttree,
+                                                                                node->pei);
 
                        /* Try to launch workers. */
                        pcxt = node->pei->pcxt;
@@ -319,7 +322,7 @@ ExecShutdownGatherMergeWorkers(GatherMergeState *node)
 /* ----------------------------------------------------------------
  *             ExecReScanGatherMerge
  *
- *             Re-initialize the workers and rescans a relation via them.
+ *             Prepare to re-scan the result of a GatherMerge.
  * ----------------------------------------------------------------
  */
 void
@@ -328,20 +331,13 @@ ExecReScanGatherMerge(GatherMergeState *node)
        GatherMerge *gm = (GatherMerge *) node->ps.plan;
        PlanState  *outerPlan = outerPlanState(node);
 
-       /*
-        * Re-initialize the parallel workers to perform rescan of relation. We
-        * want to gracefully shutdown all the workers so that they should be able
-        * to propagate any error or other information to master backend before
-        * dying.  Parallel context will be reused for rescan.
-        */
+       /* Make sure any existing workers are gracefully shut down */
        ExecShutdownGatherMergeWorkers(node);
 
+       /* Mark node so that shared state will be rebuilt at next call */
        node->initialized = false;
        node->gm_initialized = false;
 
-       if (node->pei)
-               ExecParallelReinitialize(node->pei);
-
        /*
         * Set child node's chgParam to tell it that the next scan might deliver a
         * different set of rows within the leader process.  (The overall rowset
@@ -353,10 +349,15 @@ ExecReScanGatherMerge(GatherMergeState *node)
                outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
                                                                                         gm->rescan_param);
 
-
        /*
-        * if chgParam of subnode is not null then plan will be re-scanned by
-        * first ExecProcNode.
+        * If chgParam of subnode is not null then plan will be re-scanned by
+        * first ExecProcNode.  Note: because this does nothing if we have a
+        * rescan_param, it's currently guaranteed that parallel-aware child nodes
+        * will not see a ReScan call until after they get a ReInitializeDSM call.
+        * That ordering might not be something to rely on, though.  A good rule
+        * of thumb is that ReInitializeDSM should reset only shared state, ReScan
+        * should reset only local state, and anything that depends on both of
+        * those steps being finished must wait until the first ExecProcNode call.
         */
        if (outerPlan->chgParam == NULL)
                ExecReScan(outerPlan);
index fe7ba3f1a4f155de068b9c2d934c99e64cc8c6ea..5351cb8981e11293a63532710dd2404433030857 100644 (file)
@@ -25,6 +25,7 @@
  *                                             parallel index-only scan
  *             ExecIndexOnlyScanInitializeDSM  initialize DSM for parallel
  *                                             index-only scan
+ *             ExecIndexOnlyScanReInitializeDSM        reinitialize DSM for fresh scan
  *             ExecIndexOnlyScanInitializeWorker attach to DSM info in parallel worker
  */
 #include "postgres.h"
@@ -336,16 +337,6 @@ ExecIndexOnlyScan(PlanState *pstate)
 void
 ExecReScanIndexOnlyScan(IndexOnlyScanState *node)
 {
-       bool            reset_parallel_scan = true;
-
-       /*
-        * If we are here to just update the scan keys, then don't reset parallel
-        * scan. For detailed reason behind this look in the comments for
-        * ExecReScanIndexScan.
-        */
-       if (node->ioss_NumRuntimeKeys != 0 && !node->ioss_RuntimeKeysReady)
-               reset_parallel_scan = false;
-
        /*
         * If we are doing runtime key calculations (ie, any of the index key
         * values weren't simple Consts), compute the new key values.  But first,
@@ -366,15 +357,10 @@ ExecReScanIndexOnlyScan(IndexOnlyScanState *node)
 
        /* reset index scan */
        if (node->ioss_ScanDesc)
-       {
-
                index_rescan(node->ioss_ScanDesc,
                                         node->ioss_ScanKeys, node->ioss_NumScanKeys,
                                         node->ioss_OrderByKeys, node->ioss_NumOrderByKeys);
 
-               if (reset_parallel_scan && node->ioss_ScanDesc->parallel_scan)
-                       index_parallelrescan(node->ioss_ScanDesc);
-       }
        ExecScanReScan(&node->ss);
 }
 
@@ -671,6 +657,19 @@ ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
                                         node->ioss_OrderByKeys, node->ioss_NumOrderByKeys);
 }
 
+/* ----------------------------------------------------------------
+ *             ExecIndexOnlyScanReInitializeDSM
+ *
+ *             Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
+                                                                ParallelContext *pcxt)
+{
+       index_parallelrescan(node->ioss_ScanDesc);
+}
+
 /* ----------------------------------------------------------------
  *             ExecIndexOnlyScanInitializeWorker
  *
index 404076d5930d6e729acdf6f28e9a52955bc88d10..638b17b07cbd9e7e6f5768e6ea691cdba30f22fb 100644 (file)
@@ -24,6 +24,7 @@
  *             ExecIndexRestrPos               restores scan position.
  *             ExecIndexScanEstimate   estimates DSM space needed for parallel index scan
  *             ExecIndexScanInitializeDSM initialize DSM for parallel indexscan
+ *             ExecIndexScanReInitializeDSM reinitialize DSM for fresh scan
  *             ExecIndexScanInitializeWorker attach to DSM info in parallel worker
  */
 #include "postgres.h"
@@ -577,18 +578,6 @@ ExecIndexScan(PlanState *pstate)
 void
 ExecReScanIndexScan(IndexScanState *node)
 {
-       bool            reset_parallel_scan = true;
-
-       /*
-        * If we are here to just update the scan keys, then don't reset parallel
-        * scan.  We don't want each of the participating process in the parallel
-        * scan to update the shared parallel scan state at the start of the scan.
-        * It is quite possible that one of the participants has already begun
-        * scanning the index when another has yet to start it.
-        */
-       if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady)
-               reset_parallel_scan = false;
-
        /*
         * If we are doing runtime key calculations (ie, any of the index key
         * values weren't simple Consts), compute the new key values.  But first,
@@ -614,21 +603,11 @@ ExecReScanIndexScan(IndexScanState *node)
                        reorderqueue_pop(node);
        }
 
-       /*
-        * Reset (parallel) index scan.  For parallel-aware nodes, the scan
-        * descriptor is initialized during actual execution of node and we can
-        * reach here before that (ex. during execution of nest loop join).  So,
-        * avoid updating the scan descriptor at that time.
-        */
+       /* reset index scan */
        if (node->iss_ScanDesc)
-       {
                index_rescan(node->iss_ScanDesc,
                                         node->iss_ScanKeys, node->iss_NumScanKeys,
                                         node->iss_OrderByKeys, node->iss_NumOrderByKeys);
-
-               if (reset_parallel_scan && node->iss_ScanDesc->parallel_scan)
-                       index_parallelrescan(node->iss_ScanDesc);
-       }
        node->iss_ReachedEnd = false;
 
        ExecScanReScan(&node->ss);
@@ -1716,6 +1695,19 @@ ExecIndexScanInitializeDSM(IndexScanState *node,
                                         node->iss_OrderByKeys, node->iss_NumOrderByKeys);
 }
 
+/* ----------------------------------------------------------------
+ *             ExecIndexScanReInitializeDSM
+ *
+ *             Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIndexScanReInitializeDSM(IndexScanState *node,
+                                                        ParallelContext *pcxt)
+{
+       index_parallelrescan(node->iss_ScanDesc);
+}
+
 /* ----------------------------------------------------------------
  *             ExecIndexScanInitializeWorker
  *
index 5c49d4ca8a9f75fc0d09aa69b903479be7f63c55..d4ac939c9b068785a8fe4ac7990af8fec1aeb587 100644 (file)
@@ -22,6 +22,7 @@
  *
  *             ExecSeqScanEstimate             estimates DSM space needed for parallel scan
  *             ExecSeqScanInitializeDSM initialize DSM for parallel scan
+ *             ExecSeqScanReInitializeDSM reinitialize DSM for fresh parallel scan
  *             ExecSeqScanInitializeWorker attach to DSM info in parallel worker
  */
 #include "postgres.h"
@@ -324,6 +325,21 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
                heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
 }
 
+/* ----------------------------------------------------------------
+ *             ExecSeqScanReInitializeDSM
+ *
+ *             Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanReInitializeDSM(SeqScanState *node,
+                                                  ParallelContext *pcxt)
+{
+       HeapScanDesc scan = node->ss.ss_currentScanDesc;
+
+       heap_parallelscan_reinitialize(scan->rs_parallel);
+}
+
 /* ----------------------------------------------------------------
  *             ExecSeqScanInitializeWorker
  *
index b2132e723edbf0b093553756a2b1ce4e2f1df67a..4e41024e9260701445cdd1daa0608ccb2e4962a2 100644 (file)
@@ -130,6 +130,7 @@ extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction);
 extern Size heap_parallelscan_estimate(Snapshot snapshot);
 extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
                                                         Relation relation, Snapshot snapshot);
+extern void heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan);
 extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
 
 extern bool heap_fetch(Relation relation, Snapshot snapshot,
index bd0a87fa0416a7ef2224ebd7f3e7b14593fdf098..a6512245b15d5104a2ad2ff0b2e95778ee2fc776 100644 (file)
@@ -36,7 +36,8 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
                                         EState *estate, int nworkers);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
-extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
+extern void ExecParallelReinitialize(PlanState *planstate,
+                                                ParallelExecutorInfo *pei);
 
 extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
 
index c77694cf22ffd1f4b84adb29d6f855f5d3494809..10844a405a522b000ccd4aa0730a7eefcb81b232 100644 (file)
@@ -24,6 +24,8 @@ extern void ExecBitmapHeapEstimate(BitmapHeapScanState *node,
                                           ParallelContext *pcxt);
 extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
                                                        ParallelContext *pcxt);
+extern void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
+                                                         ParallelContext *pcxt);
 extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
                                                           shm_toc *toc);
 
index a1cc63ae1f35ff89fb9b9f9b66547395cef97cd2..25767b6a4a52d39eb1a065892cca077dff6d300b 100644 (file)
@@ -34,6 +34,8 @@ extern void ExecCustomScanEstimate(CustomScanState *node,
                                           ParallelContext *pcxt);
 extern void ExecCustomScanInitializeDSM(CustomScanState *node,
                                                        ParallelContext *pcxt);
+extern void ExecCustomScanReInitializeDSM(CustomScanState *node,
+                                                         ParallelContext *pcxt);
 extern void ExecCustomScanInitializeWorker(CustomScanState *node,
                                                           shm_toc *toc);
 extern void ExecShutdownCustomScan(CustomScanState *node);
index 0b662597d8f449b3b345dc496f3fc88c9d232a45..0354c2c43085410b0aae8607d4ae0cb5e5dddef7 100644 (file)
@@ -25,6 +25,8 @@ extern void ExecForeignScanEstimate(ForeignScanState *node,
                                                ParallelContext *pcxt);
 extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
                                                         ParallelContext *pcxt);
+extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
+                                                          ParallelContext *pcxt);
 extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
                                                                shm_toc *toc);
 extern void ExecShutdownForeignScan(ForeignScanState *node);
index c8a709c26ed472e4ed9be93ace32317ae6a2c5c4..690b5dbfe5964cc1070669c0d46670acafbe4016 100644 (file)
@@ -28,6 +28,8 @@ extern void ExecIndexOnlyScanEstimate(IndexOnlyScanState *node,
                                                  ParallelContext *pcxt);
 extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
                                                           ParallelContext *pcxt);
+extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
+                                                                ParallelContext *pcxt);
 extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
                                                                  shm_toc *toc);
 
index 1668e347eef58c85b24a93618e1bcb51886e5d80..0670e87e395506ae4ce03e552ea5aa353d1833b9 100644 (file)
@@ -24,6 +24,7 @@ extern void ExecIndexRestrPos(IndexScanState *node);
 extern void ExecReScanIndexScan(IndexScanState *node);
 extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt);
 extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
+extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
 extern void ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc);
 
 /*
index 0fba79f8de6a164f6d11f16f5e9b997d7d481ee1..eb96799cade5ddd2bc23101c480899b91da3666a 100644 (file)
@@ -24,6 +24,7 @@ extern void ExecReScanSeqScan(SeqScanState *node);
 /* parallel scan support */
 extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt);
 extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
+extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
 extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc);
 
 #endif                                                 /* NODESEQSCAN_H */
index e391f20fb86b9aed47489135039779d219742d79..ef0fbe6f9c635a0871d34d3c49a27ec72b1f0246 100644 (file)
@@ -148,6 +148,9 @@ typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
 typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
                                                                                                   ParallelContext *pcxt,
                                                                                                   void *coordinate);
+typedef void (*ReInitializeDSMForeignScan_function) (ForeignScanState *node,
+                                                                                                        ParallelContext *pcxt,
+                                                                                                        void *coordinate);
 typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
                                                                                                          shm_toc *toc,
                                                                                                          void *coordinate);
@@ -224,6 +227,7 @@ typedef struct FdwRoutine
        IsForeignScanParallelSafe_function IsForeignScanParallelSafe;
        EstimateDSMForeignScan_function EstimateDSMForeignScan;
        InitializeDSMForeignScan_function InitializeDSMForeignScan;
+       ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
        InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
        ShutdownForeignScan_function ShutdownForeignScan;
 } FdwRoutine;
index 7325bf536afb8e9c5a5ee677bdfdffdd5ca246ef..0654e79c7bafacbe26fff2714762e27539740f12 100644 (file)
@@ -136,6 +136,9 @@ typedef struct CustomExecMethods
        void            (*InitializeDSMCustomScan) (CustomScanState *node,
                                                                                        ParallelContext *pcxt,
                                                                                        void *coordinate);
+       void            (*ReInitializeDSMCustomScan) (CustomScanState *node,
+                                                                                         ParallelContext *pcxt,
+                                                                                         void *coordinate);
        void            (*InitializeWorkerCustomScan) (CustomScanState *node,
                                                                                           shm_toc *toc,
                                                                                           void *coordinate);