]> granicus.if.org Git - postgresql/commitdiff
Allow parallel custom and foreign scans.
authorRobert Haas <rhaas@postgresql.org>
Wed, 3 Feb 2016 17:46:18 +0000 (12:46 -0500)
committerRobert Haas <rhaas@postgresql.org>
Wed, 3 Feb 2016 17:49:46 +0000 (12:49 -0500)
This patch doesn't put the new infrastructure to use anywhere, and
indeed it's not clear how it could ever be used for something like
postgres_fdw which has to send an SQL query and wait for a reply,
but there might be FDWs or custom scan providers that are CPU-bound,
so let's give them a way to join club parallel.

KaiGai Kohei, reviewed by me.

doc/src/sgml/custom-scan.sgml
doc/src/sgml/fdwhandler.sgml
src/backend/executor/execParallel.c
src/backend/executor/nodeCustom.c
src/backend/executor/nodeForeignscan.c
src/include/executor/nodeCustom.h
src/include/executor/nodeForeignscan.h
src/include/foreign/fdwapi.h
src/include/nodes/execnodes.h

index 9a8d092decc9dcbb8d20f4b7d5b2c2f1a672c4d8..1ca9247124583f5957d2af2623f0892b414de826 100644 (file)
@@ -303,6 +303,43 @@ void (*RestrPosCustomScan) (CustomScanState *node);
 
    <para>
 <programlisting>
+Size (*EstimateDSMCustomScan) (CustomScanState *node,
+                               ParallelContext *pcxt);
+</programlisting>
+    Estimate the amount of dynamic shared memory that will be required
+    for parallel operation.  This may be higher than the amount that will
+    actually be used, but it must not be lower.  The return value is in bytes.
+    This callback is optional, and need only be supplied if this custom
+    scan provider supports parallel execution.
+   </para>
+
+   <para>
+<programlisting>
+void (*InitializeDSMCustomScan) (CustomScanState *node,
+                                 ParallelContext *pcxt,
+                                 void *coordinate);
+</programlisting>
+    Initialize the dynamic shared memory that will be required for parallel
+    operation; <literal>coordinate</> points to an amount of allocated space
+    equal to the return value of <function>EstimateDSMCustomScan</>.
+    This callback is optional, and need only be supplied if this custom
+    scan provider supports parallel execution.
+   </para>
+
+   <para>
+<programlisting>
+void (*InitializeWorkerCustomScan) (CustomScanState *node,
+                                    shm_toc *toc,
+                                    void *coordinate);
+</programlisting>
+    Initialize a parallel worker's custom state based on the shared state
+    set up in the leader by <literal>InitializeDSMCustomScan</>.
+    This callback is optional, and needs only be supplied if this
+    custom path supports parallel execution.
+   </para>
+
+   <para>
+<programlisting>
 void (*ExplainCustomScan) (CustomScanState *node,
                            List *ancestors,
                            ExplainState *es);
index dc2d8909751f3e7cb7d9d0110d6d6599df28f9bf..c6b60fa57982eb126c3b3ca948addd12b7a3b22a 100644 (file)
@@ -955,6 +955,53 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid);
 
    </sect2>
 
+   <sect2 id="fdw-callbacks-parallel">
+    <title>FDW Routines for Parallel Execution</title>
+    <para>
+     A <structname>ForeignScan</> node can, optionally, support parallel
+     execution.  A parallel <structname>ForeignScan</> will be executed
+     in multiple processes and should return each row only once across
+     all cooperating processes.  To do this, processes can coordinate through
+     fixed size chunks of dynamic shared memory.  This shared memory is not
+     guaranteed to be mapped at the same address in every process, so pointers
+     may not be used.  The following callbacks are all optional in general,
+     but required if parallel execution is to be supported.
+    </para>
+
+    <para>
+<programlisting>
+Size
+EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt);
+</programlisting>
+    Estimate the amount of dynamic shared memory that will be required
+    for parallel operation.  This may be higher than the amount that will
+    actually be used, but it must not be lower.  The return value is in bytes.
+    </para>
+
+    <para>
+<programlisting>
+void
+InitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
+                         void *coordinate);
+</programlisting>
+    Initialize the dynamic shared memory that will be required for parallel
+    operation; <literal>coordinate</> points to an amount of allocated space
+    equal to the return value of <function>EstimateDSMForeignScan</>.
+   </para>
+
+   <para>
+<programlisting>
+void
+InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
+                            void *coordinate);
+</programlisting>
+    Initialize a parallel worker's custom state based on the shared state
+    set up in the leader by <literal>InitializeDSMForeignScan</>.
+    This callback is optional, and needs only be supplied if this
+    custom path supports parallel execution.
+   </para>
+   </sect2>
+
    </sect1>
 
    <sect1 id="fdw-helpers">
index 29e450a571c649326a6ec6c773adc911cea156ad..95e8e41d2bbc86f8a744f2722fbf0921da322761 100644 (file)
@@ -25,6 +25,8 @@
 
 #include "executor/execParallel.h"
 #include "executor/executor.h"
+#include "executor/nodeCustom.h"
+#include "executor/nodeForeignscan.h"
 #include "executor/nodeSeqscan.h"
 #include "executor/tqueue.h"
 #include "nodes/nodeFuncs.h"
@@ -176,6 +178,14 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
                                ExecSeqScanEstimate((SeqScanState *) planstate,
                                                                        e->pcxt);
                                break;
+                       case T_ForeignScanState:
+                               ExecForeignScanEstimate((ForeignScanState *) planstate,
+                                                                               e->pcxt);
+                               break;
+                       case T_CustomScanState:
+                               ExecCustomScanEstimate((CustomScanState *) planstate,
+                                                                          e->pcxt);
+                               break;
                        default:
                                break;
                }
@@ -220,6 +230,14 @@ ExecParallelInitializeDSM(PlanState *planstate,
                                ExecSeqScanInitializeDSM((SeqScanState *) planstate,
                                                                                 d->pcxt);
                                break;
+                       case T_ForeignScanState:
+                               ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
+                                                                                        d->pcxt);
+                               break;
+                       case T_CustomScanState:
+                               ExecCustomScanInitializeDSM((CustomScanState *) planstate,
+                                                                                       d->pcxt);
+                               break;
                        default:
                                break;
                }
@@ -642,6 +660,14 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
                        case T_SeqScanState:
                                ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
                                break;
+                       case T_ForeignScanState:
+                               ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
+                                                                                               toc);
+                               break;
+                       case T_CustomScanState:
+                               ExecCustomScanInitializeWorker((CustomScanState *) planstate,
+                                                                                          toc);
+                               break;
                        default:
                                break;
                }
index 640289e2773c4d4a5082d2767e104aa98c937922..322abca282acfad4af0fafcdc1744aa3600746da 100644 (file)
@@ -10,6 +10,7 @@
  */
 #include "postgres.h"
 
+#include "access/parallel.h"
 #include "executor/executor.h"
 #include "executor/nodeCustom.h"
 #include "nodes/execnodes.h"
@@ -159,3 +160,47 @@ ExecCustomRestrPos(CustomScanState *node)
                                                node->methods->CustomName)));
        node->methods->RestrPosCustomScan(node);
 }
+
+void
+ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)
+{
+       const CustomExecMethods *methods = node->methods;
+
+       if (methods->EstimateDSMCustomScan)
+       {
+               node->pscan_len = methods->EstimateDSMCustomScan(node, pcxt);
+               shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+               shm_toc_estimate_keys(&pcxt->estimator, 1);
+       }
+}
+
+void
+ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
+{
+       const CustomExecMethods *methods = node->methods;
+
+       if (methods->InitializeDSMCustomScan)
+       {
+               int                     plan_node_id = node->ss.ps.plan->plan_node_id;
+               void       *coordinate;
+
+               coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+               methods->InitializeDSMCustomScan(node, pcxt, coordinate);
+               shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+       }
+}
+
+void
+ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
+{
+       const CustomExecMethods *methods = node->methods;
+
+       if (methods->InitializeWorkerCustomScan)
+       {
+               int                     plan_node_id = node->ss.ps.plan->plan_node_id;
+               void       *coordinate;
+
+               coordinate = shm_toc_lookup(toc, plan_node_id);
+               methods->InitializeWorkerCustomScan(node, toc, coordinate);
+       }
+}
index 64a07bcc7713773b1d7cfca9527ccbad75a909ce..388c92274982a5f68434c54b3e3bea7055f70763 100644 (file)
@@ -282,3 +282,65 @@ ExecReScanForeignScan(ForeignScanState *node)
 
        ExecScanReScan(&node->ss);
 }
+
+/* ----------------------------------------------------------------
+ *             ExecForeignScanEstimate
+ *
+ *             Informs size of the parallel coordination information, if any
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
+{
+       FdwRoutine *fdwroutine = node->fdwroutine;
+
+       if (fdwroutine->EstimateDSMForeignScan)
+       {
+               node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
+               shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+               shm_toc_estimate_keys(&pcxt->estimator, 1);
+       }
+}
+
+/* ----------------------------------------------------------------
+ *             ExecForeignScanInitializeDSM
+ *
+ *             Initialize the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
+{
+       FdwRoutine *fdwroutine = node->fdwroutine;
+
+       if (fdwroutine->InitializeDSMForeignScan)
+       {
+               int                     plan_node_id = node->ss.ps.plan->plan_node_id;
+               void       *coordinate;
+
+               coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+               fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
+               shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+       }
+}
+
+/* ----------------------------------------------------------------
+ *             ExecForeignScanInitializeDSM
+ *
+ *             Initialization according to the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
+{
+       FdwRoutine *fdwroutine = node->fdwroutine;
+
+       if (fdwroutine->InitializeWorkerForeignScan)
+       {
+               int                     plan_node_id = node->ss.ps.plan->plan_node_id;
+               void       *coordinate;
+
+               coordinate = shm_toc_lookup(toc, plan_node_id);
+               fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
+       }
+}
index e244942d79a0d3b3d93969b4743890fa8eef6ad0..410a3ad14db8e86cdc46b4126d99abf9938aacf4 100644 (file)
@@ -12,6 +12,7 @@
 #ifndef NODECUSTOM_H
 #define NODECUSTOM_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 /*
@@ -26,4 +27,14 @@ extern void ExecReScanCustomScan(CustomScanState *node);
 extern void ExecCustomMarkPos(CustomScanState *node);
 extern void ExecCustomRestrPos(CustomScanState *node);
 
+/*
+ * Parallel execution support
+ */
+extern void ExecCustomScanEstimate(CustomScanState *node,
+                                                                  ParallelContext *pcxt);
+extern void ExecCustomScanInitializeDSM(CustomScanState *node,
+                                                                               ParallelContext *pcxt);
+extern void ExecCustomScanInitializeWorker(CustomScanState *node,
+                                                                                  shm_toc *toc);
+
 #endif   /* NODECUSTOM_H */
index a92ce5c22a36deaf55c46eb45c61e086681f17a3..c2553295fab79c0b49a002b3d6abeefaea152cb0 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef NODEFOREIGNSCAN_H
 #define NODEFOREIGNSCAN_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 extern ForeignScanState *ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags);
@@ -21,4 +22,11 @@ extern TupleTableSlot *ExecForeignScan(ForeignScanState *node);
 extern void ExecEndForeignScan(ForeignScanState *node);
 extern void ExecReScanForeignScan(ForeignScanState *node);
 
+extern void ExecForeignScanEstimate(ForeignScanState *node,
+                                                                       ParallelContext *pcxt);
+extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
+                                                                                ParallelContext *pcxt);
+extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
+                                                                                       shm_toc *toc);
+
 #endif   /* NODEFOREIGNSCAN_H */
index db73233f65b942829c001b8d85884ddedfc4c836..e16fbf34ec858354df07e48d133c1337f471e0a9 100644 (file)
@@ -12,6 +12,7 @@
 #ifndef FDWAPI_H
 #define FDWAPI_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 #include "nodes/relation.h"
 
@@ -122,6 +123,14 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation,
 typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt,
                                                                                                                   Oid serverOid);
 
+typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
+                                                                                               ParallelContext *pcxt);
+typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
+                                                                                                  ParallelContext *pcxt,
+                                                                                                  void *coordinate);
+typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
+                                                                                                         shm_toc *toc,
+                                                                                                         void *coordinate);
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
  * function.  It provides pointers to the callback functions needed by the
@@ -177,6 +186,11 @@ typedef struct FdwRoutine
 
        /* Support functions for IMPORT FOREIGN SCHEMA */
        ImportForeignSchema_function ImportForeignSchema;
+
+       /* Support functions for parallelism under Gather node */
+       EstimateDSMForeignScan_function EstimateDSMForeignScan;
+       InitializeDSMForeignScan_function InitializeDSMForeignScan;
+       InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
 } FdwRoutine;
 
 
index 07cd20ac504b27f3fc74cf683ae16b7744fc83af..064a0509c4d236924e3f6fb73891d9530776109a 100644 (file)
@@ -1585,6 +1585,7 @@ typedef struct ForeignScanState
 {
        ScanState       ss;                             /* its first field is NodeTag */
        List       *fdw_recheck_quals;  /* original quals not in ss.ps.qual */
+       Size            pscan_len;              /* size of parallel coordination information */
        /* use struct pointer to avoid including fdwapi.h here */
        struct FdwRoutine *fdwroutine;
        void       *fdw_state;          /* foreign-data wrapper can keep state here */
@@ -1603,6 +1604,8 @@ typedef struct ForeignScanState
  * the BeginCustomScan method.
  * ----------------
  */
+struct ParallelContext;                        /* avoid including parallel.h here */
+struct shm_toc;                                        /* avoid including shm_toc.h here */
 struct ExplainState;                   /* avoid including explain.h here */
 struct CustomScanState;
 
@@ -1619,7 +1622,15 @@ typedef struct CustomExecMethods
        void            (*ReScanCustomScan) (struct CustomScanState *node);
        void            (*MarkPosCustomScan) (struct CustomScanState *node);
        void            (*RestrPosCustomScan) (struct CustomScanState *node);
-
+       /* Optional: parallel execution support */
+       Size            (*EstimateDSMCustomScan) (struct CustomScanState *node,
+                                                                                          struct ParallelContext *pcxt);
+       void            (*InitializeDSMCustomScan) (struct CustomScanState *node,
+                                                                                               struct ParallelContext *pcxt,
+                                                                                                               void *coordinate);
+       void            (*InitializeWorkerCustomScan) (struct CustomScanState *node,
+                                                                                                                struct shm_toc *toc,
+                                                                                                                  void *coordinate);
        /* Optional: print additional information in EXPLAIN */
        void            (*ExplainCustomScan) (struct CustomScanState *node,
                                                                                                  List *ancestors,
@@ -1631,6 +1642,7 @@ typedef struct CustomScanState
        ScanState       ss;
        uint32          flags;                  /* mask of CUSTOMPATH_* flags, see relation.h */
        List       *custom_ps;          /* list of child PlanState nodes, if any */
+       Size            pscan_len;              /* size of parallel coordination information */
        const CustomExecMethods *methods;
 } CustomScanState;