]> granicus.if.org Git - postgresql/commitdiff
Provide a DSA area for all parallel queries.
authorRobert Haas <rhaas@postgresql.org>
Mon, 19 Dec 2016 21:47:15 +0000 (16:47 -0500)
committerRobert Haas <rhaas@postgresql.org>
Mon, 19 Dec 2016 22:11:46 +0000 (17:11 -0500)
This will allow future parallel query code to dynamically allocate
storage shared by all participants.

Thomas Munro, with assorted changes by me.

doc/src/sgml/monitoring.sgml
src/backend/executor/execParallel.c
src/include/executor/execParallel.h
src/include/nodes/execnodes.h
src/include/storage/lwlock.h

index 02bc8feca7740da121b7399e0eaab9286a7931c7..1545f03656c94f76ec2cb5c4b703300f8da88666 100644 (file)
@@ -818,7 +818,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
       <tbody>
        <row>
-        <entry morerows="57"><literal>LWLock</></entry>
+        <entry morerows="58"><literal>LWLock</></entry>
         <entry><literal>ShmemIndexLock</></entry>
         <entry>Waiting to find or allocate space in shared memory.</entry>
        </row>
@@ -1069,6 +1069,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry><literal>predicate_lock_manager</></entry>
          <entry>Waiting to add or examine predicate lock information.</entry>
         </row>
+        <row>
+         <entry><literal>parallel_query_dsa</></entry>
+         <entry>Waiting for parallel query dynamic shared memory allocation lock.</entry>
+        </row>
         <row>
          <entry morerows="9"><literal>Lock</></entry>
          <entry><literal>relation</></entry>
index f9c85989d82b285e50d3282fdd676f41161e2ec4..8a6f844e352b52c3b2023b418719c4d0b469f9ff 100644 (file)
@@ -34,6 +34,7 @@
 #include "optimizer/planner.h"
 #include "storage/spin.h"
 #include "tcop/tcopprot.h"
+#include "utils/dsa.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 
@@ -47,6 +48,7 @@
 #define PARALLEL_KEY_BUFFER_USAGE              UINT64CONST(0xE000000000000003)
 #define PARALLEL_KEY_TUPLE_QUEUE               UINT64CONST(0xE000000000000004)
 #define PARALLEL_KEY_INSTRUMENTATION   UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_DSA                               UINT64CONST(0xE000000000000006)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE              65536
 
@@ -345,6 +347,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        int                     param_len;
        int                     instrumentation_len = 0;
        int                     instrument_offset = 0;
+       Size            dsa_minsize = dsa_minimum_size();
 
        /* Allocate object for return value. */
        pei = palloc0(sizeof(ParallelExecutorInfo));
@@ -413,6 +416,10 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
                shm_toc_estimate_keys(&pcxt->estimator, 1);
        }
 
+       /* Estimate space for DSA area. */
+       shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
+       shm_toc_estimate_keys(&pcxt->estimator, 1);
+
        /* Everyone's had a chance to ask for space, so now create the DSM. */
        InitializeParallelDSM(pcxt);
 
@@ -466,6 +473,29 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
                pei->instrumentation = instrumentation;
        }
 
+       /*
+        * Create a DSA area that can be used by the leader and all workers.
+        * (However, if we failed to create a DSM and are using private memory
+        * instead, then skip this.)
+        */
+       if (pcxt->seg != NULL)
+       {
+               char       *area_space;
+
+               area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
+               shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
+               pei->area = dsa_create_in_place(area_space, dsa_minsize,
+                                                                               LWTRANCHE_PARALLEL_QUERY_DSA,
+                                                                               "parallel_query_dsa",
+                                                                               pcxt->seg);
+       }
+
+       /*
+        * Make the area available to executor nodes running in the leader.  See
+        * also ParallelQueryMain which makes it available to workers.
+        */
+       estate->es_query_dsa = pei->area;
+
        /*
         * Give parallel-aware nodes a chance to initialize their shared data.
         * This also initializes the elements of instrumentation->ps_instrument,
@@ -571,6 +601,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 void
 ExecParallelCleanup(ParallelExecutorInfo *pei)
 {
+       if (pei->area != NULL)
+       {
+               dsa_detach(pei->area);
+               pei->area = NULL;
+       }
        if (pei->pcxt != NULL)
        {
                DestroyParallelContext(pei->pcxt);
@@ -728,6 +763,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        QueryDesc  *queryDesc;
        SharedExecutorInstrumentation *instrumentation;
        int                     instrument_options = 0;
+       void       *area_space;
+       dsa_area   *area;
 
        /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
        receiver = ExecParallelGetReceiver(seg, toc);
@@ -739,10 +776,21 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        /* Prepare to track buffer usage during query execution. */
        InstrStartParallelQuery();
 
-       /* Start up the executor, have it run the plan, and then shut it down. */
+       /* Attach to the dynamic shared memory area. */
+       area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA);
+       area = dsa_attach_in_place(area_space, seg);
+
+       /* Start up the executor */
        ExecutorStart(queryDesc, 0);
+
+       /* Special executor initialization steps for parallel workers */
+       queryDesc->planstate->state->es_query_dsa = area;
        ExecParallelInitializeWorker(queryDesc->planstate, toc);
+
+       /* Run the plan */
        ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+
+       /* Shut down the executor */
        ExecutorFinish(queryDesc);
 
        /* Report buffer usage during parallel execution. */
@@ -758,6 +806,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        ExecutorEnd(queryDesc);
 
        /* Cleanup. */
+       dsa_detach(area);
        FreeQueryDesc(queryDesc);
        (*receiver->rDestroy) (receiver);
 }
index f4c6d37a119289ee832d49fd93579fb2e5913bb4..4bbee691a7f33f64c3c8d69fea8d186b1ca2521f 100644 (file)
@@ -17,6 +17,7 @@
 #include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 #include "nodes/plannodes.h"
+#include "utils/dsa.h"
 
 typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
 
@@ -27,6 +28,7 @@ typedef struct ParallelExecutorInfo
        BufferUsage *buffer_usage;
        SharedExecutorInstrumentation *instrumentation;
        shm_mq_handle **tqueue;
+       dsa_area   *area;
        bool            finished;
 } ParallelExecutorInfo;
 
index 703604ab9d746f4f1b2010c3187bcc6d052fe5e8..5c3b8683f5b6dd123fec24e042b6e41f3d17fb6c 100644 (file)
@@ -427,6 +427,9 @@ typedef struct EState
        HeapTuple  *es_epqTuple;        /* array of EPQ substitute tuples */
        bool       *es_epqTupleSet; /* true if EPQ tuple is provided */
        bool       *es_epqScanDone; /* true if EPQ tuple has been fetched */
+
+       /* The per-query shared memory area to use for parallel execution. */
+       struct dsa_area   *es_query_dsa;
 } EState;
 
 
index db1c687e21e8398bb201666b833c5eb6991d82e5..3ca4db0a7231e0333a5762a22e1d7916599aebd7 100644 (file)
@@ -210,6 +210,7 @@ typedef enum BuiltinTrancheIds
        LWTRANCHE_BUFFER_MAPPING,
        LWTRANCHE_LOCK_MANAGER,
        LWTRANCHE_PREDICATE_LOCK_MANAGER,
+       LWTRANCHE_PARALLEL_QUERY_DSA,
        LWTRANCHE_FIRST_USER_DEFINED
 }      BuiltinTrancheIds;