]> granicus.if.org Git - postgresql/commitdiff
Provide DSM segment to ExecXXXInitializeWorker functions.
authorAndres Freund <andres@anarazel.de>
Fri, 17 Nov 2017 01:28:11 +0000 (17:28 -0800)
committerAndres Freund <andres@anarazel.de>
Fri, 17 Nov 2017 01:39:18 +0000 (17:39 -0800)
Previously, executor nodes running in parallel worker processes didn't
have access to the dsm_segment object used for parallel execution.  In
order to support resource management based on DSM segment lifetime,
they need that.  So create a ParallelWorkerContext object to hold it
and pass it to all InitializeWorker functions.

Author: Thomas Munro
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com

17 files changed:
src/backend/executor/execParallel.c
src/backend/executor/nodeBitmapHeapscan.c
src/backend/executor/nodeCustom.c
src/backend/executor/nodeForeignscan.c
src/backend/executor/nodeIndexonlyscan.c
src/backend/executor/nodeIndexscan.c
src/backend/executor/nodeSeqscan.c
src/backend/executor/nodeSort.c
src/include/access/parallel.h
src/include/executor/nodeBitmapHeapscan.h
src/include/executor/nodeCustom.h
src/include/executor/nodeForeignscan.h
src/include/executor/nodeIndexonlyscan.h
src/include/executor/nodeIndexscan.h
src/include/executor/nodeSeqscan.h
src/include/executor/nodeSort.h
src/tools/pgindent/typedefs.list

index c4355506378e9f9840ef6f56d8c4041615b53126..2ead32d5ad5d0f28199accbd9342156cc35edf87 100644 (file)
@@ -1122,7 +1122,7 @@ ExecParallelReportInstrumentation(PlanState *planstate,
  * is allocated and initialized by executor; that is, after ExecutorStart().
  */
 static bool
-ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
+ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 {
        if (planstate == NULL)
                return false;
@@ -1131,40 +1131,44 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
        {
                case T_SeqScanState:
                        if (planstate->plan->parallel_aware)
-                               ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
+                               ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
                        break;
                case T_IndexScanState:
                        if (planstate->plan->parallel_aware)
-                               ExecIndexScanInitializeWorker((IndexScanState *) planstate, toc);
+                               ExecIndexScanInitializeWorker((IndexScanState *) planstate,
+                                                                                         pwcxt);
                        break;
                case T_IndexOnlyScanState:
                        if (planstate->plan->parallel_aware)
-                               ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate, toc);
+                               ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
+                                                                                                 pwcxt);
                        break;
                case T_ForeignScanState:
                        if (planstate->plan->parallel_aware)
                                ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
-                                                                                               toc);
+                                                                                               pwcxt);
                        break;
                case T_CustomScanState:
                        if (planstate->plan->parallel_aware)
                                ExecCustomScanInitializeWorker((CustomScanState *) planstate,
-                                                                                          toc);
+                                                                                          pwcxt);
                        break;
                case T_BitmapHeapScanState:
                        if (planstate->plan->parallel_aware)
-                               ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc);
+                               ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
+                                                                                          pwcxt);
                        break;
                case T_SortState:
                        /* even when not parallel-aware */
-                       ExecSortInitializeWorker((SortState *) planstate, toc);
+                       ExecSortInitializeWorker((SortState *) planstate, pwcxt);
                        break;
 
                default:
                        break;
        }
 
-       return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
+       return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
+                                                                pwcxt);
 }
 
 /*
@@ -1194,6 +1198,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        int                     instrument_options = 0;
        void       *area_space;
        dsa_area   *area;
+       ParallelWorkerContext pwcxt;
 
        /* Get fixed-size state. */
        fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
@@ -1231,7 +1236,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
                RestoreParamExecParams(paramexec_space, queryDesc->estate);
 
        }
-       ExecParallelInitializeWorker(queryDesc->planstate, toc);
+       pwcxt.toc = toc;
+       pwcxt.seg = seg;
+       ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
 
        /* Pass down any tuple bound */
        ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
index b885f2a3a65f1231b66768ba7715381273c9d784..221391908c718d09574a7aeff9c2d9df65125f2b 100644 (file)
@@ -1102,12 +1102,13 @@ ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
  * ----------------------------------------------------------------
  */
 void
-ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc)
+ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
+                                                          ParallelWorkerContext *pwcxt)
 {
        ParallelBitmapHeapState *pstate;
        Snapshot        snapshot;
 
-       pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
+       pstate = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
        node->pstate = pstate;
 
        snapshot = RestoreSnapshot(pstate->phs_snapshot_data);
index 07dcabef5514633e43b93d629a1d352303ca7551..5f1732d6ac02661c30add10c1d8b9cf9b44df7d9 100644 (file)
@@ -210,7 +210,8 @@ ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
 }
 
 void
-ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
+ExecCustomScanInitializeWorker(CustomScanState *node,
+                                                          ParallelWorkerContext *pwcxt)
 {
        const CustomExecMethods *methods = node->methods;
 
@@ -219,8 +220,8 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
                int                     plan_node_id = node->ss.ps.plan->plan_node_id;
                void       *coordinate;
 
-               coordinate = shm_toc_lookup(toc, plan_node_id, false);
-               methods->InitializeWorkerCustomScan(node, toc, coordinate);
+               coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
+               methods->InitializeWorkerCustomScan(node, pwcxt->toc, coordinate);
        }
 }
 
index 20892d6d5fb1952a233b66b5f14c840163131187..dc6cfcfa66be3add85d62e38d513bac18ca2e934 100644 (file)
@@ -359,7 +359,8 @@ ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
  * ----------------------------------------------------------------
  */
 void
-ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
+ExecForeignScanInitializeWorker(ForeignScanState *node,
+                                                               ParallelWorkerContext *pwcxt)
 {
        FdwRoutine *fdwroutine = node->fdwroutine;
 
@@ -368,8 +369,8 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
                int                     plan_node_id = node->ss.ps.plan->plan_node_id;
                void       *coordinate;
 
-               coordinate = shm_toc_lookup(toc, plan_node_id, false);
-               fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
+               coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
+               fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
        }
 }
 
index 9368ca04f847c565de7d6fd65f47d858f6d019ed..c54c5aa6591f68ac8d46d12fa92dc1328eb7b506 100644 (file)
@@ -678,11 +678,12 @@ ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
  * ----------------------------------------------------------------
  */
 void
-ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, shm_toc *toc)
+ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
+                                                                 ParallelWorkerContext *pwcxt)
 {
        ParallelIndexScanDesc piscan;
 
-       piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
+       piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
        node->ioss_ScanDesc =
                index_beginscan_parallel(node->ss.ss_currentRelation,
                                                                 node->ioss_RelationDesc,
index 2d6da28fbd97bf866d162ef5d3a0601335c2a412..2ffef231077586c901638f558f2cabab8457c97b 100644 (file)
@@ -1716,11 +1716,12 @@ ExecIndexScanReInitializeDSM(IndexScanState *node,
  * ----------------------------------------------------------------
  */
 void
-ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc)
+ExecIndexScanInitializeWorker(IndexScanState *node,
+                                                         ParallelWorkerContext *pwcxt)
 {
        ParallelIndexScanDesc piscan;
 
-       piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
+       piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
        node->iss_ScanDesc =
                index_beginscan_parallel(node->ss.ss_currentRelation,
                                                                 node->iss_RelationDesc,
index 76bec780a8d25d635fe77b35f8ab398187089591..a5bd60e5795f35ab5c9f7d08f5c66c034e220684 100644 (file)
@@ -348,11 +348,12 @@ ExecSeqScanReInitializeDSM(SeqScanState *node,
  * ----------------------------------------------------------------
  */
 void
-ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc)
+ExecSeqScanInitializeWorker(SeqScanState *node,
+                                                       ParallelWorkerContext *pwcxt)
 {
        ParallelHeapScanDesc pscan;
 
-       pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
+       pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
        node->ss.ss_currentScanDesc =
                heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
 }
index 98bcaeb66f5f824ff252f7e6ae2e229d5a94ad1e..73aa3715e6d56778a94ed1e5475dacd58c1e9f46 100644 (file)
@@ -420,10 +420,10 @@ ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt)
  * ----------------------------------------------------------------
  */
 void
-ExecSortInitializeWorker(SortState *node, shm_toc *toc)
+ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt)
 {
        node->shared_info =
-               shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, true);
+               shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
        node->am_worker = true;
 }
 
index e3e0cecf1ea3b34d273c806b20b954ffee6e8eaf..f4db88294aa00a6c0f5732273a335a4edbe4316f 100644 (file)
@@ -45,6 +45,12 @@ typedef struct ParallelContext
        ParallelWorkerInfo *worker;
 } ParallelContext;
 
+typedef struct ParallelWorkerContext
+{
+       dsm_segment *seg;
+       shm_toc    *toc;
+} ParallelWorkerContext;
+
 extern volatile bool ParallelMessagePending;
 extern int     ParallelWorkerNumber;
 extern bool InitializingParallelWorker;
index 10844a405a522b000ccd4aa0730a7eefcb81b232..7907ecc3cb5cf4f89a291f45ae20f27479ef4977 100644 (file)
@@ -27,6 +27,6 @@ extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
 extern void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
                                                          ParallelContext *pcxt);
 extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
-                                                          shm_toc *toc);
+                                                          ParallelWorkerContext *pwcxt);
 
 #endif                                                 /* NODEBITMAPHEAPSCAN_H */
index 25767b6a4a52d39eb1a065892cca077dff6d300b..d7dcf3b8cb18dcd1bc39d5adefe1441f485ae4c5 100644 (file)
@@ -37,7 +37,7 @@ extern void ExecCustomScanInitializeDSM(CustomScanState *node,
 extern void ExecCustomScanReInitializeDSM(CustomScanState *node,
                                                          ParallelContext *pcxt);
 extern void ExecCustomScanInitializeWorker(CustomScanState *node,
-                                                          shm_toc *toc);
+                                                          ParallelWorkerContext *pwcxt);
 extern void ExecShutdownCustomScan(CustomScanState *node);
 
 #endif                                                 /* NODECUSTOM_H */
index 0354c2c43085410b0aae8607d4ae0cb5e5dddef7..152abf022bea8e30f5a0b96b761ca685ed590531 100644 (file)
@@ -28,7 +28,7 @@ extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
 extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
                                                           ParallelContext *pcxt);
 extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
-                                                               shm_toc *toc);
+                                                               ParallelWorkerContext *pwcxt);
 extern void ExecShutdownForeignScan(ForeignScanState *node);
 
 #endif                                                 /* NODEFOREIGNSCAN_H */
index 690b5dbfe5964cc1070669c0d46670acafbe4016..c5344a8d5d2184ec76435c82b66097a2f292f322 100644 (file)
@@ -31,6 +31,6 @@ extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
 extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
                                                                 ParallelContext *pcxt);
 extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
-                                                                 shm_toc *toc);
+                                                                 ParallelWorkerContext *pwcxt);
 
 #endif                                                 /* NODEINDEXONLYSCAN_H */
index 0670e87e395506ae4ce03e552ea5aa353d1833b9..ae0f44806a52fa6bbbc6bb2e187815f82626ceae 100644 (file)
@@ -25,7 +25,8 @@ extern void ExecReScanIndexScan(IndexScanState *node);
 extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt);
 extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
 extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
-extern void ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc);
+extern void ExecIndexScanInitializeWorker(IndexScanState *node,
+                                                         ParallelWorkerContext *pwcxt);
 
 /*
  * These routines are exported to share code with nodeIndexonlyscan.c and
index eb96799cade5ddd2bc23101c480899b91da3666a..ee3b1a0bb84e4e52c11a2e3eb24ef1c68285e0e2 100644 (file)
@@ -25,6 +25,7 @@ extern void ExecReScanSeqScan(SeqScanState *node);
 extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt);
 extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
 extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
-extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc);
+extern void ExecSeqScanInitializeWorker(SeqScanState *node,
+                                                       ParallelWorkerContext *pwcxt);
 
 #endif                                                 /* NODESEQSCAN_H */
index 1ab8f7672103fd50cb7de5663514dfbf5aebe76b..cc61a9db6977fa67f2c817f0415e310eccdd673d 100644 (file)
@@ -27,7 +27,7 @@ extern void ExecReScanSort(SortState *node);
 extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt);
 extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt);
 extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt);
-extern void ExecSortInitializeWorker(SortState *node, shm_toc *toc);
+extern void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt);
 extern void ExecSortRetrieveInstrumentation(SortState *node);
 
 #endif                                                 /* NODESORT_H */
index 61aeb51c29d875fc69c32cc2c2a60c05a0d2a404..b422050a9246adfc99308bfaf3a0a56b265294f1 100644 (file)
@@ -1534,6 +1534,7 @@ ParallelHeapScanDesc
 ParallelIndexScanDesc
 ParallelSlot
 ParallelState
+ParallelWorkerContext
 ParallelWorkerInfo
 Param
 ParamExecData