From 7082e614c0dd504cdf49c4d5a692159f22e78f9d Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 16 Nov 2017 17:28:11 -0800 Subject: [PATCH] Provide DSM segment to ExecXXXInitializeWorker functions. Previously, executor nodes running in parallel worker processes didn't have access to the dsm_segment object used for parallel execution. In order to support resource management based on DSM segment lifetime, they need that. So create a ParallelWorkerContext object to hold it and pass it to all InitializeWorker functions. Author: Thomas Munro Reviewed-By: Andres Freund Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com --- src/backend/executor/execParallel.c | 27 ++++++++++++++--------- src/backend/executor/nodeBitmapHeapscan.c | 5 +++-- src/backend/executor/nodeCustom.c | 7 +++--- src/backend/executor/nodeForeignscan.c | 7 +++--- src/backend/executor/nodeIndexonlyscan.c | 5 +++-- src/backend/executor/nodeIndexscan.c | 5 +++-- src/backend/executor/nodeSeqscan.c | 5 +++-- src/backend/executor/nodeSort.c | 4 ++-- src/include/access/parallel.h | 6 +++++ src/include/executor/nodeBitmapHeapscan.h | 2 +- src/include/executor/nodeCustom.h | 2 +- src/include/executor/nodeForeignscan.h | 2 +- src/include/executor/nodeIndexonlyscan.h | 2 +- src/include/executor/nodeIndexscan.h | 3 ++- src/include/executor/nodeSeqscan.h | 3 ++- src/include/executor/nodeSort.h | 2 +- src/tools/pgindent/typedefs.list | 1 + 17 files changed, 55 insertions(+), 33 deletions(-) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index c435550637..2ead32d5ad 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -1122,7 +1122,7 @@ ExecParallelReportInstrumentation(PlanState *planstate, * is allocated and initialized by executor; that is, after ExecutorStart(). */ static bool -ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) +ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) { if (planstate == NULL) return false; @@ -1131,40 +1131,44 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) { case T_SeqScanState: if (planstate->plan->parallel_aware) - ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc); + ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt); break; case T_IndexScanState: if (planstate->plan->parallel_aware) - ExecIndexScanInitializeWorker((IndexScanState *) planstate, toc); + ExecIndexScanInitializeWorker((IndexScanState *) planstate, + pwcxt); break; case T_IndexOnlyScanState: if (planstate->plan->parallel_aware) - ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate, toc); + ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate, + pwcxt); break; case T_ForeignScanState: if (planstate->plan->parallel_aware) ExecForeignScanInitializeWorker((ForeignScanState *) planstate, - toc); + pwcxt); break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanInitializeWorker((CustomScanState *) planstate, - toc); + pwcxt); break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) - ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc); + ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, + pwcxt); break; case T_SortState: /* even when not parallel-aware */ - ExecSortInitializeWorker((SortState *) planstate, toc); + ExecSortInitializeWorker((SortState *) planstate, pwcxt); break; default: break; } - return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc); + return planstate_tree_walker(planstate, ExecParallelInitializeWorker, + pwcxt); } /* @@ -1194,6 +1198,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) int instrument_options = 0; void *area_space; dsa_area *area; + ParallelWorkerContext pwcxt; /* Get fixed-size state. */ fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); @@ -1231,7 +1236,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) RestoreParamExecParams(paramexec_space, queryDesc->estate); } - ExecParallelInitializeWorker(queryDesc->planstate, toc); + pwcxt.toc = toc; + pwcxt.seg = seg; + ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt); /* Pass down any tuple bound */ ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate); diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c index b885f2a3a6..221391908c 100644 --- a/src/backend/executor/nodeBitmapHeapscan.c +++ b/src/backend/executor/nodeBitmapHeapscan.c @@ -1102,12 +1102,13 @@ ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, * ---------------------------------------------------------------- */ void -ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc) +ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, + ParallelWorkerContext *pwcxt) { ParallelBitmapHeapState *pstate; Snapshot snapshot; - pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); + pstate = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false); node->pstate = pstate; snapshot = RestoreSnapshot(pstate->phs_snapshot_data); diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index 07dcabef55..5f1732d6ac 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -210,7 +210,8 @@ ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt) } void -ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) +ExecCustomScanInitializeWorker(CustomScanState *node, + ParallelWorkerContext *pwcxt) { const CustomExecMethods *methods = node->methods; @@ -219,8 +220,8 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) int plan_node_id = node->ss.ps.plan->plan_node_id; void *coordinate; - coordinate = shm_toc_lookup(toc, plan_node_id, false); - methods->InitializeWorkerCustomScan(node, toc, coordinate); + coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false); + methods->InitializeWorkerCustomScan(node, pwcxt->toc, coordinate); } } diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 20892d6d5f..dc6cfcfa66 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -359,7 +359,8 @@ ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) * ---------------------------------------------------------------- */ void -ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) +ExecForeignScanInitializeWorker(ForeignScanState *node, + ParallelWorkerContext *pwcxt) { FdwRoutine *fdwroutine = node->fdwroutine; @@ -368,8 +369,8 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) int plan_node_id = node->ss.ps.plan->plan_node_id; void *coordinate; - coordinate = shm_toc_lookup(toc, plan_node_id, false); - fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate); + coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false); + fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate); } } diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c index 9368ca04f8..c54c5aa659 100644 --- a/src/backend/executor/nodeIndexonlyscan.c +++ b/src/backend/executor/nodeIndexonlyscan.c @@ -678,11 +678,12 @@ ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, * ---------------------------------------------------------------- */ void -ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, shm_toc *toc) +ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, + ParallelWorkerContext *pwcxt) { ParallelIndexScanDesc piscan; - piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); + piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false); node->ioss_ScanDesc = index_beginscan_parallel(node->ss.ss_currentRelation, node->ioss_RelationDesc, diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c index 2d6da28fbd..2ffef23107 100644 --- a/src/backend/executor/nodeIndexscan.c +++ b/src/backend/executor/nodeIndexscan.c @@ -1716,11 +1716,12 @@ ExecIndexScanReInitializeDSM(IndexScanState *node, * ---------------------------------------------------------------- */ void -ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc) +ExecIndexScanInitializeWorker(IndexScanState *node, + ParallelWorkerContext *pwcxt) { ParallelIndexScanDesc piscan; - piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); + piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false); node->iss_ScanDesc = index_beginscan_parallel(node->ss.ss_currentRelation, node->iss_RelationDesc, diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 76bec780a8..a5bd60e579 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -348,11 +348,12 @@ ExecSeqScanReInitializeDSM(SeqScanState *node, * ---------------------------------------------------------------- */ void -ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc) +ExecSeqScanInitializeWorker(SeqScanState *node, + ParallelWorkerContext *pwcxt) { ParallelHeapScanDesc pscan; - pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); + pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false); node->ss.ss_currentScanDesc = heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); } diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c index 98bcaeb66f..73aa3715e6 100644 --- a/src/backend/executor/nodeSort.c +++ b/src/backend/executor/nodeSort.c @@ -420,10 +420,10 @@ ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt) * ---------------------------------------------------------------- */ void -ExecSortInitializeWorker(SortState *node, shm_toc *toc) +ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt) { node->shared_info = - shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, true); + shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); node->am_worker = true; } diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index e3e0cecf1e..f4db88294a 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -45,6 +45,12 @@ typedef struct ParallelContext ParallelWorkerInfo *worker; } ParallelContext; +typedef struct ParallelWorkerContext +{ + dsm_segment *seg; + shm_toc *toc; +} ParallelWorkerContext; + extern volatile bool ParallelMessagePending; extern int ParallelWorkerNumber; extern bool InitializingParallelWorker; diff --git a/src/include/executor/nodeBitmapHeapscan.h b/src/include/executor/nodeBitmapHeapscan.h index 10844a405a..7907ecc3cb 100644 --- a/src/include/executor/nodeBitmapHeapscan.h +++ b/src/include/executor/nodeBitmapHeapscan.h @@ -27,6 +27,6 @@ extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, extern void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt); extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, - shm_toc *toc); + ParallelWorkerContext *pwcxt); #endif /* NODEBITMAPHEAPSCAN_H */ diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h index 25767b6a4a..d7dcf3b8cb 100644 --- a/src/include/executor/nodeCustom.h +++ b/src/include/executor/nodeCustom.h @@ -37,7 +37,7 @@ extern void ExecCustomScanInitializeDSM(CustomScanState *node, extern void ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt); extern void ExecCustomScanInitializeWorker(CustomScanState *node, - shm_toc *toc); + ParallelWorkerContext *pwcxt); extern void ExecShutdownCustomScan(CustomScanState *node); #endif /* NODECUSTOM_H */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 0354c2c430..152abf022b 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -28,7 +28,7 @@ extern void ExecForeignScanInitializeDSM(ForeignScanState *node, extern void ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt); extern void ExecForeignScanInitializeWorker(ForeignScanState *node, - shm_toc *toc); + ParallelWorkerContext *pwcxt); extern void ExecShutdownForeignScan(ForeignScanState *node); #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/executor/nodeIndexonlyscan.h b/src/include/executor/nodeIndexonlyscan.h index 690b5dbfe5..c5344a8d5d 100644 --- a/src/include/executor/nodeIndexonlyscan.h +++ b/src/include/executor/nodeIndexonlyscan.h @@ -31,6 +31,6 @@ extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt); extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, - shm_toc *toc); + ParallelWorkerContext *pwcxt); #endif /* NODEINDEXONLYSCAN_H */ diff --git a/src/include/executor/nodeIndexscan.h b/src/include/executor/nodeIndexscan.h index 0670e87e39..ae0f44806a 100644 --- a/src/include/executor/nodeIndexscan.h +++ b/src/include/executor/nodeIndexscan.h @@ -25,7 +25,8 @@ extern void ExecReScanIndexScan(IndexScanState *node); extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt); extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt); extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt); -extern void ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc); +extern void ExecIndexScanInitializeWorker(IndexScanState *node, + ParallelWorkerContext *pwcxt); /* * These routines are exported to share code with nodeIndexonlyscan.c and diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h index eb96799cad..ee3b1a0bb8 100644 --- a/src/include/executor/nodeSeqscan.h +++ b/src/include/executor/nodeSeqscan.h @@ -25,6 +25,7 @@ extern void ExecReScanSeqScan(SeqScanState *node); extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt); -extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc); +extern void ExecSeqScanInitializeWorker(SeqScanState *node, + ParallelWorkerContext *pwcxt); #endif /* NODESEQSCAN_H */ diff --git a/src/include/executor/nodeSort.h b/src/include/executor/nodeSort.h index 1ab8f76721..cc61a9db69 100644 --- a/src/include/executor/nodeSort.h +++ b/src/include/executor/nodeSort.h @@ -27,7 +27,7 @@ extern void ExecReScanSort(SortState *node); extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt); extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt); extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt); -extern void ExecSortInitializeWorker(SortState *node, shm_toc *toc); +extern void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt); extern void ExecSortRetrieveInstrumentation(SortState *node); #endif /* NODESORT_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 61aeb51c29..b422050a92 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1534,6 +1534,7 @@ ParallelHeapScanDesc ParallelIndexScanDesc ParallelSlot ParallelState +ParallelWorkerContext ParallelWorkerInfo Param ParamExecData -- 2.40.0