]> granicus.if.org Git - postgresql/commitdiff
Allow for parallel execution whenever ExecutorRun() is done only once.
authorRobert Haas <rhaas@postgresql.org>
Thu, 23 Mar 2017 17:05:48 +0000 (13:05 -0400)
committerRobert Haas <rhaas@postgresql.org>
Thu, 23 Mar 2017 17:14:36 +0000 (13:14 -0400)
Previously, it was unsafe to execute a plan in parallel if
ExecutorRun() might be called with a non-zero row count.  However,
it's quite easy to fix things up so that we can support that case,
provided that it is known that we will never call ExecutorRun() a
second time for the same QueryDesc.  Add infrastructure to signal
this, and cross-checks to make sure that a caller who claims this is
true doesn't later reneg.

While that pattern never happens with queries received directly from a
client -- there's no way to know whether multiple Execute messages
will be sent unless the first one requests all the rows -- it's pretty
common for queries originating from procedural languages, which often
limit the result to a single tuple or to a user-specified number of
tuples.

This commit doesn't actually enable parallelism in any additional
cases, because currently none of the places that would be able to
benefit from this infrastructure pass CURSOR_OPT_PARALLEL_OK in the
first place, but it makes it much more palatable to pass
CURSOR_OPT_PARALLEL_OK in places where we currently don't, because it
eliminates some cases where we'd end up having to run the parallel
plan serially.

Patch by me, based on some ideas from Rafia Sabih and corrected by
Rafia Sabih based on feedback from Dilip Kumar and myself.

Discussion: http://postgr.es/m/CA+TgmobXEhvHbJtWDuPZM9bVSLiTj-kShxQJ2uM5GPDze9fRYA@mail.gmail.com

19 files changed:
contrib/auto_explain/auto_explain.c
contrib/pg_stat_statements/pg_stat_statements.c
src/backend/commands/copy.c
src/backend/commands/createas.c
src/backend/commands/explain.c
src/backend/commands/extension.c
src/backend/commands/matview.c
src/backend/commands/portalcmds.c
src/backend/commands/prepare.c
src/backend/executor/execMain.c
src/backend/executor/execParallel.c
src/backend/executor/functions.c
src/backend/executor/spi.c
src/backend/tcop/postgres.c
src/backend/tcop/pquery.c
src/include/executor/execdesc.h
src/include/executor/executor.h
src/include/tcop/pquery.h
src/include/utils/portal.h

index 34b9f1543ec0d2330aae6a2958671852a4c93030..9213ffb6a44a29371c3d88a1f016d12951ab6475 100644 (file)
@@ -61,7 +61,7 @@ void          _PG_fini(void);
 static void explain_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void explain_ExecutorRun(QueryDesc *queryDesc,
                                        ScanDirection direction,
-                                       uint64 count);
+                                       uint64 count, bool execute_once);
 static void explain_ExecutorFinish(QueryDesc *queryDesc);
 static void explain_ExecutorEnd(QueryDesc *queryDesc);
 
@@ -257,15 +257,16 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
+explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction,
+                                       uint64 count, bool execute_once)
 {
        nesting_level++;
        PG_TRY();
        {
                if (prev_ExecutorRun)
-                       prev_ExecutorRun(queryDesc, direction, count);
+                       prev_ExecutorRun(queryDesc, direction, count, execute_once);
                else
-                       standard_ExecutorRun(queryDesc, direction, count);
+                       standard_ExecutorRun(queryDesc, direction, count, execute_once);
                nesting_level--;
        }
        PG_CATCH();
index 221ac98d4aa6f4eea668a91aa1e2c4300dafc926..42f43233f8968eff3447f5d887c25910815e752d 100644 (file)
@@ -290,7 +290,7 @@ static void pgss_post_parse_analyze(ParseState *pstate, Query *query);
 static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void pgss_ExecutorRun(QueryDesc *queryDesc,
                                 ScanDirection direction,
-                                uint64 count);
+                                uint64 count, bool execute_once);
 static void pgss_ExecutorFinish(QueryDesc *queryDesc);
 static void pgss_ExecutorEnd(QueryDesc *queryDesc);
 static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
@@ -871,15 +871,16 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
+pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
+                                bool execute_once)
 {
        nested_level++;
        PG_TRY();
        {
                if (prev_ExecutorRun)
-                       prev_ExecutorRun(queryDesc, direction, count);
+                       prev_ExecutorRun(queryDesc, direction, count, execute_once);
                else
-                       standard_ExecutorRun(queryDesc, direction, count);
+                       standard_ExecutorRun(queryDesc, direction, count, execute_once);
                nested_level--;
        }
        PG_CATCH();
index b0fd09f458a95159454b8d58f3291e0414dc6d4f..ab59be8455261d85dcbd5af751b0976965fa1cac 100644 (file)
@@ -2074,7 +2074,7 @@ CopyTo(CopyState cstate)
        else
        {
                /* run the plan --- the dest receiver will send tuples */
-               ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+               ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
                processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
        }
 
index 646a88409f2c374cb4b0357e72d6d7dac6b594fb..3daffc894a1669db37830a2dd39d819cc4b029e2 100644 (file)
@@ -347,7 +347,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
                ExecutorStart(queryDesc, GetIntoRelEFlags(into));
 
                /* run the plan to completion */
-               ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+               ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
 
                /* save the rowcount if we're given a completionTag to fill */
                if (completionTag)
index c9b55ead3dc35d8a4113b4a416a20e66be795193..b4c7466666b1d529fb338ab5d19f5d0904617d00 100644 (file)
@@ -530,7 +530,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
                        dir = ForwardScanDirection;
 
                /* run the plan */
-               ExecutorRun(queryDesc, dir, 0L);
+               ExecutorRun(queryDesc, dir, 0L, true);
 
                /* run cleanup too */
                ExecutorFinish(queryDesc);
index 86a84ee23463511beda8af956f2247d91337480e..5a84bedf467de67231a498f7ddf51fea2658785e 100644 (file)
@@ -742,7 +742,7 @@ execute_sql_string(const char *sql, const char *filename)
                                                                                dest, NULL, 0);
 
                                ExecutorStart(qdesc, 0);
-                               ExecutorRun(qdesc, ForwardScanDirection, 0);
+                               ExecutorRun(qdesc, ForwardScanDirection, 0, true);
                                ExecutorFinish(qdesc);
                                ExecutorEnd(qdesc);
 
index 8df3d1d81dd751e09ce3eaa35bd26a6482374786..9d41ad8fad262d52201008e3d38fda421153085f 100644 (file)
@@ -424,7 +424,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
        ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
 
        /* run the plan */
-       ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+       ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
 
        processed = queryDesc->estate->es_processed;
 
index 29d0430dd876ae4c7ad5c7de9433350697ea3775..f57cf87e8c3921380bea731fab50918a47f96217 100644 (file)
@@ -395,7 +395,7 @@ PersistHoldablePortal(Portal portal)
                                                                                true);
 
                /* Fetch the result set into the tuplestore */
-               ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+               ExecutorRun(queryDesc, ForwardScanDirection, 0L, false);
 
                (*queryDesc->dest->rDestroy) (queryDesc->dest);
                queryDesc->dest = NULL;
index 1cf0d2b971a370f97d4959dcc417e49f5a03cac9..992ba1c9a2e2f95ae56e76c6844414dc2f449979 100644 (file)
@@ -301,7 +301,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause,
         */
        PortalStart(portal, paramLI, eflags, GetActiveSnapshot());
 
-       (void) PortalRun(portal, count, false, dest, dest, completionTag);
+       (void) PortalRun(portal, count, false, true, dest, dest, completionTag);
 
        PortalDrop(portal, false);
 
index 023ea0081a0c9841fe53efce68009ebfacced0e7..c28cf9c8eab7f2b678918f62a29429c40648e4d3 100644 (file)
@@ -85,7 +85,8 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
                        bool sendTuples,
                        uint64 numberTuples,
                        ScanDirection direction,
-                       DestReceiver *dest);
+                       DestReceiver *dest,
+                       bool execute_once);
 static bool ExecCheckRTEPerms(RangeTblEntry *rte);
 static bool ExecCheckRTEPermsModified(Oid relOid, Oid userid,
                                                  Bitmapset *modifiedCols,
@@ -288,17 +289,18 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
  */
 void
 ExecutorRun(QueryDesc *queryDesc,
-                       ScanDirection direction, uint64 count)
+                       ScanDirection direction, uint64 count,
+                       bool execute_once)
 {
        if (ExecutorRun_hook)
-               (*ExecutorRun_hook) (queryDesc, direction, count);
+               (*ExecutorRun_hook) (queryDesc, direction, count, execute_once);
        else
-               standard_ExecutorRun(queryDesc, direction, count);
+               standard_ExecutorRun(queryDesc, direction, count, execute_once);
 }
 
 void
 standard_ExecutorRun(QueryDesc *queryDesc,
-                                        ScanDirection direction, uint64 count)
+                                        ScanDirection direction, uint64 count, bool execute_once)
 {
        EState     *estate;
        CmdType         operation;
@@ -345,6 +347,11 @@ standard_ExecutorRun(QueryDesc *queryDesc,
         * run plan
         */
        if (!ScanDirectionIsNoMovement(direction))
+       {
+               if (execute_once && queryDesc->already_executed)
+                       elog(ERROR, "can't re-execute query flagged for single execution");
+               queryDesc->already_executed = true;
+
                ExecutePlan(estate,
                                        queryDesc->planstate,
                                        queryDesc->plannedstmt->parallelModeNeeded,
@@ -352,7 +359,9 @@ standard_ExecutorRun(QueryDesc *queryDesc,
                                        sendTuples,
                                        count,
                                        direction,
-                                       dest);
+                                       dest,
+                                       execute_once);
+       }
 
        /*
         * shutdown tuple receiver, if we started it
@@ -1595,7 +1604,8 @@ ExecutePlan(EState *estate,
                        bool sendTuples,
                        uint64 numberTuples,
                        ScanDirection direction,
-                       DestReceiver *dest)
+                       DestReceiver *dest,
+                       bool execute_once)
 {
        TupleTableSlot *slot;
        uint64          current_tuple_count;
@@ -1611,12 +1621,12 @@ ExecutePlan(EState *estate,
        estate->es_direction = direction;
 
        /*
-        * If a tuple count was supplied, we must force the plan to run without
-        * parallelism, because we might exit early.  Also disable parallelism
-        * when writing into a relation, because no database changes are allowed
-        * in parallel mode.
+        * If the plan might potentially be executed multiple times, we must force
+        * it to run without parallelism, because we might exit early.  Also
+        * disable parallelism when writing into a relation, because no database
+        * changes are allowed in parallel mode.
         */
-       if (numberTuples || dest->mydest == DestIntoRel)
+       if (!execute_once || dest->mydest == DestIntoRel)
                use_parallel_mode = false;
 
        if (use_parallel_mode)
@@ -1687,7 +1697,11 @@ ExecutePlan(EState *estate,
                 */
                current_tuple_count++;
                if (numberTuples && numberTuples == current_tuple_count)
+               {
+                       /* Allow nodes to release or shut down resources. */
+                       (void) ExecShutdownNode(planstate);
                        break;
+               }
        }
 
        if (use_parallel_mode)
index 86db73be431ae04a6658597c1f0381dd23f8afc1..b91b663c46f6b7495ca357d710376ea997a22292 100644 (file)
@@ -853,7 +853,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        ExecParallelInitializeWorker(queryDesc->planstate, toc);
 
        /* Run the plan */
-       ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+       ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
 
        /* Shut down the executor */
        ExecutorFinish(queryDesc);
index 2d49a65650279b921f119b7545c38d0ead5b30f3..12214f8a15042e0482e6c706f2e19212d2445053 100644 (file)
@@ -855,7 +855,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
                /* Run regular commands to completion unless lazyEval */
                uint64          count = (es->lazyEval) ? 1 : 0;
 
-               ExecutorRun(es->qd, ForwardScanDirection, count);
+               ExecutorRun(es->qd, ForwardScanDirection, count, !fcache->returnsSet || !es->lazyEval);
 
                /*
                 * If we requested run to completion OR there was no tuple returned,
index 55f97b14e6e546279ff67152834c4944e88b0cde..72c7b4d068934e27d061a248420e3feb2438e893 100644 (file)
@@ -2305,7 +2305,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
 
        ExecutorStart(queryDesc, eflags);
 
-       ExecutorRun(queryDesc, ForwardScanDirection, tcount);
+       ExecutorRun(queryDesc, ForwardScanDirection, tcount, true);
 
        _SPI_current->processed = queryDesc->estate->es_processed;
        _SPI_current->lastoid = queryDesc->estate->es_lastoid;
index ba41f90712689313793a89939b4d19d4be7cfbca..6258a14c3907ba1ae2ebf8641814952a34a17fc0 100644 (file)
@@ -1101,6 +1101,7 @@ exec_simple_query(const char *query_string)
                (void) PortalRun(portal,
                                                 FETCH_ALL,
                                                 isTopLevel,
+                                                true,
                                                 receiver,
                                                 receiver,
                                                 completionTag);
@@ -1985,6 +1986,7 @@ exec_execute_message(const char *portal_name, long max_rows)
        completed = PortalRun(portal,
                                                  max_rows,
                                                  true, /* always top level */
+                                                 !execute_is_fetch && max_rows == FETCH_ALL,
                                                  receiver,
                                                  receiver,
                                                  completionTag);
index 371d7350b7a39543285f3e1e15b5e3382ad4893e..f538b7787c380b9670b0b9743dbca2f485633134 100644 (file)
@@ -90,6 +90,9 @@ CreateQueryDesc(PlannedStmt *plannedstmt,
        qd->planstate = NULL;
        qd->totaltime = NULL;
 
+       /* not yet executed */
+       qd->already_executed = false;
+
        return qd;
 }
 
@@ -152,7 +155,7 @@ ProcessQuery(PlannedStmt *plan,
        /*
         * Run the plan to completion.
         */
-       ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+       ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
 
        /*
         * Build command completion status string, if caller wants one.
@@ -679,7 +682,7 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
  * suspended due to exhaustion of the count parameter.
  */
 bool
-PortalRun(Portal portal, long count, bool isTopLevel,
+PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
                  DestReceiver *dest, DestReceiver *altdest,
                  char *completionTag)
 {
@@ -712,6 +715,10 @@ PortalRun(Portal portal, long count, bool isTopLevel,
         */
        MarkPortalActive(portal);
 
+       /* Set run_once flag.  Shouldn't be clear if previously set. */
+       Assert(!portal->run_once || run_once);
+       portal->run_once = run_once;
+
        /*
         * Set up global portal context pointers.
         *
@@ -918,7 +925,8 @@ PortalRunSelect(Portal portal,
                else
                {
                        PushActiveSnapshot(queryDesc->snapshot);
-                       ExecutorRun(queryDesc, direction, (uint64) count);
+                       ExecutorRun(queryDesc, direction, (uint64) count,
+                                               portal->run_once);
                        nprocessed = queryDesc->estate->es_processed;
                        PopActiveSnapshot();
                }
@@ -957,7 +965,8 @@ PortalRunSelect(Portal portal,
                else
                {
                        PushActiveSnapshot(queryDesc->snapshot);
-                       ExecutorRun(queryDesc, direction, (uint64) count);
+                       ExecutorRun(queryDesc, direction, (uint64) count,
+                                               portal->run_once);
                        nprocessed = queryDesc->estate->es_processed;
                        PopActiveSnapshot();
                }
@@ -1394,6 +1403,9 @@ PortalRunFetch(Portal portal,
         */
        MarkPortalActive(portal);
 
+       /* If supporting FETCH, portal can't be run-once. */
+       Assert(!portal->run_once);
+
        /*
         * Set up global portal context pointers.
         */
index c99ea818158c9dafb9d8b8ac9d1d5044851b54d1..87e7ca8508234660fbd6debaade9a22a29e2cba7 100644 (file)
@@ -47,6 +47,9 @@ typedef struct QueryDesc
        EState     *estate;                     /* executor's query-wide state */
        PlanState  *planstate;          /* tree of per-plan-node state */
 
+       /* This field is set by ExecutorRun */
+       bool            already_executed;               /* true if previously executed */
+
        /* This is always set NULL by the core system, but plugins can change it */
        struct Instrumentation *totaltime;      /* total time spent in ExecutorRun */
 } QueryDesc;
index e64d6fb93fe081829e218b49a40d63b0e395c903..a5c75e771f0374e1c585491e745dcb96aed3a326 100644 (file)
@@ -81,7 +81,8 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook;
 /* Hook for plugins to get control in ExecutorRun() */
 typedef void (*ExecutorRun_hook_type) (QueryDesc *queryDesc,
                                                                                                   ScanDirection direction,
-                                                                                                  uint64 count);
+                                                                                                  uint64 count,
+                                                                                                  bool execute_once);
 extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook;
 
 /* Hook for plugins to get control in ExecutorFinish() */
@@ -176,9 +177,9 @@ extern TupleTableSlot *ExecFilterJunk(JunkFilter *junkfilter,
 extern void ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void ExecutorRun(QueryDesc *queryDesc,
-                       ScanDirection direction, uint64 count);
+                       ScanDirection direction, uint64 count, bool execute_once);
 extern void standard_ExecutorRun(QueryDesc *queryDesc,
-                                        ScanDirection direction, uint64 count);
+                                        ScanDirection direction, uint64 count, bool execute_once);
 extern void ExecutorFinish(QueryDesc *queryDesc);
 extern void standard_ExecutorFinish(QueryDesc *queryDesc);
 extern void ExecutorEnd(QueryDesc *queryDesc);
index 61c0b3447e58af19d8c314fef2af34fbfb549f76..12ff4588c61abff139c8f6e4683d615d9c89e48c 100644 (file)
@@ -34,7 +34,7 @@ extern void PortalSetResultFormat(Portal portal, int nFormats,
                                          int16 *formats);
 
 extern bool PortalRun(Portal portal, long count, bool isTopLevel,
-                 DestReceiver *dest, DestReceiver *altdest,
+                 bool run_once, DestReceiver *dest, DestReceiver *altdest,
                  char *completionTag);
 
 extern uint64 PortalRunFetch(Portal portal,
index dc76acd0a42d6f6b9e405e2024fae4ba1deae523..e7c5a8bd091c051cc8054cc2db32cdd83587aaae 100644 (file)
@@ -141,6 +141,7 @@ typedef struct PortalData
        /* Features/options */
        PortalStrategy strategy;        /* see above */
        int                     cursorOptions;  /* DECLARE CURSOR option bits */
+       bool            run_once;               /* portal will only be run once */
 
        /* Status data */
        PortalStatus status;            /* see above */