<para>
<programlisting>
+Size (*EstimateDSMCustomScan) (CustomScanState *node,
+ ParallelContext *pcxt);
+</programlisting>
+ Estimate the amount of dynamic shared memory that will be required
+ for parallel operation. This may be higher than the amount that will
+ actually be used, but it must not be lower. The return value is in bytes.
+ This callback is optional, and need only be supplied if this custom
+ scan provider supports parallel execution.
+ </para>
+
+ <para>
+<programlisting>
+void (*InitializeDSMCustomScan) (CustomScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
+</programlisting>
+ Initialize the dynamic shared memory that will be required for parallel
+ operation; <literal>coordinate</> points to an amount of allocated space
+ equal to the return value of <function>EstimateDSMCustomScan</>.
+ This callback is optional, and need only be supplied if this custom
+ scan provider supports parallel execution.
+ </para>
+
+ <para>
+<programlisting>
+void (*InitializeWorkerCustomScan) (CustomScanState *node,
+ shm_toc *toc,
+ void *coordinate);
+</programlisting>
+ Initialize a parallel worker's custom state based on the shared state
+ set up in the leader by <literal>InitializeDSMCustomScan</>.
+ This callback is optional, and needs only be supplied if this
+ custom path supports parallel execution.
+ </para>
+
+ <para>
+<programlisting>
void (*ExplainCustomScan) (CustomScanState *node,
List *ancestors,
ExplainState *es);
</sect2>
+ <sect2 id="fdw-callbacks-parallel">
+ <title>FDW Routines for Parallel Execution</title>
+ <para>
+ A <structname>ForeignScan</> node can, optionally, support parallel
+ execution. A parallel <structname>ForeignScan</> will be executed
+ in multiple processes and should return each row only once across
+ all cooperating processes. To do this, processes can coordinate through
+ fixed size chunks of dynamic shared memory. This shared memory is not
+ guaranteed to be mapped at the same address in every process, so pointers
+ may not be used. The following callbacks are all optional in general,
+ but required if parallel execution is to be supported.
+ </para>
+
+ <para>
+<programlisting>
+Size
+EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt);
+</programlisting>
+ Estimate the amount of dynamic shared memory that will be required
+ for parallel operation. This may be higher than the amount that will
+ actually be used, but it must not be lower. The return value is in bytes.
+ </para>
+
+ <para>
+<programlisting>
+void
+InitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
+ void *coordinate);
+</programlisting>
+ Initialize the dynamic shared memory that will be required for parallel
+ operation; <literal>coordinate</> points to an amount of allocated space
+ equal to the return value of <function>EstimateDSMForeignScan</>.
+ </para>
+
+ <para>
+<programlisting>
+void
+InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
+ void *coordinate);
+</programlisting>
+ Initialize a parallel worker's custom state based on the shared state
+ set up in the leader by <literal>InitializeDSMForeignScan</>.
+ This callback is optional, and needs only be supplied if this
+ custom path supports parallel execution.
+ </para>
+ </sect2>
+
</sect1>
<sect1 id="fdw-helpers">
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeCustom.h"
+#include "executor/nodeForeignscan.h"
#include "executor/nodeSeqscan.h"
#include "executor/tqueue.h"
#include "nodes/nodeFuncs.h"
ExecSeqScanEstimate((SeqScanState *) planstate,
e->pcxt);
break;
+ case T_ForeignScanState:
+ ExecForeignScanEstimate((ForeignScanState *) planstate,
+ e->pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanEstimate((CustomScanState *) planstate,
+ e->pcxt);
+ break;
default:
break;
}
ExecSeqScanInitializeDSM((SeqScanState *) planstate,
d->pcxt);
break;
+ case T_ForeignScanState:
+ ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
+ d->pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanInitializeDSM((CustomScanState *) planstate,
+ d->pcxt);
+ break;
default:
break;
}
case T_SeqScanState:
ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
break;
+ case T_ForeignScanState:
+ ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
+ toc);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanInitializeWorker((CustomScanState *) planstate,
+ toc);
+ break;
default:
break;
}
*/
#include "postgres.h"
+#include "access/parallel.h"
#include "executor/executor.h"
#include "executor/nodeCustom.h"
#include "nodes/execnodes.h"
node->methods->CustomName)));
node->methods->RestrPosCustomScan(node);
}
+
+void
+ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->EstimateDSMCustomScan)
+ {
+ node->pscan_len = methods->EstimateDSMCustomScan(node, pcxt);
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+}
+
+void
+ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->InitializeDSMCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+ methods->InitializeDSMCustomScan(node, pcxt, coordinate);
+ shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+ }
+}
+
+void
+ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->InitializeWorkerCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(toc, plan_node_id);
+ methods->InitializeWorkerCustomScan(node, toc, coordinate);
+ }
+}
ExecScanReScan(&node->ss);
}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanEstimate
+ *
+ * Informs size of the parallel coordination information, if any
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->EstimateDSMForeignScan)
+ {
+ node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeDSM
+ *
+ * Initialize the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->InitializeDSMForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+ fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
+ shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeDSM
+ *
+ * Initialization according to the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->InitializeWorkerForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(toc, plan_node_id);
+ fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
+ }
+}
#ifndef NODECUSTOM_H
#define NODECUSTOM_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
/*
extern void ExecCustomMarkPos(CustomScanState *node);
extern void ExecCustomRestrPos(CustomScanState *node);
+/*
+ * Parallel execution support
+ */
+extern void ExecCustomScanEstimate(CustomScanState *node,
+ ParallelContext *pcxt);
+extern void ExecCustomScanInitializeDSM(CustomScanState *node,
+ ParallelContext *pcxt);
+extern void ExecCustomScanInitializeWorker(CustomScanState *node,
+ shm_toc *toc);
+
#endif /* NODECUSTOM_H */
#ifndef NODEFOREIGNSCAN_H
#define NODEFOREIGNSCAN_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
extern ForeignScanState *ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags);
extern void ExecEndForeignScan(ForeignScanState *node);
extern void ExecReScanForeignScan(ForeignScanState *node);
+extern void ExecForeignScanEstimate(ForeignScanState *node,
+ ParallelContext *pcxt);
+extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
+ ParallelContext *pcxt);
+extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
+ shm_toc *toc);
+
#endif /* NODEFOREIGNSCAN_H */
#ifndef FDWAPI_H
#define FDWAPI_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
#include "nodes/relation.h"
typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt,
Oid serverOid);
+typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
+ ParallelContext *pcxt);
+typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
+typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
+ shm_toc *toc,
+ void *coordinate);
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
/* Support functions for IMPORT FOREIGN SCHEMA */
ImportForeignSchema_function ImportForeignSchema;
+
+ /* Support functions for parallelism under Gather node */
+ EstimateDSMForeignScan_function EstimateDSMForeignScan;
+ InitializeDSMForeignScan_function InitializeDSMForeignScan;
+ InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
} FdwRoutine;
{
ScanState ss; /* its first field is NodeTag */
List *fdw_recheck_quals; /* original quals not in ss.ps.qual */
+ Size pscan_len; /* size of parallel coordination information */
/* use struct pointer to avoid including fdwapi.h here */
struct FdwRoutine *fdwroutine;
void *fdw_state; /* foreign-data wrapper can keep state here */
* the BeginCustomScan method.
* ----------------
*/
+struct ParallelContext; /* avoid including parallel.h here */
+struct shm_toc; /* avoid including shm_toc.h here */
struct ExplainState; /* avoid including explain.h here */
struct CustomScanState;
void (*ReScanCustomScan) (struct CustomScanState *node);
void (*MarkPosCustomScan) (struct CustomScanState *node);
void (*RestrPosCustomScan) (struct CustomScanState *node);
-
+ /* Optional: parallel execution support */
+ Size (*EstimateDSMCustomScan) (struct CustomScanState *node,
+ struct ParallelContext *pcxt);
+ void (*InitializeDSMCustomScan) (struct CustomScanState *node,
+ struct ParallelContext *pcxt,
+ void *coordinate);
+ void (*InitializeWorkerCustomScan) (struct CustomScanState *node,
+ struct shm_toc *toc,
+ void *coordinate);
/* Optional: print additional information in EXPLAIN */
void (*ExplainCustomScan) (struct CustomScanState *node,
List *ancestors,
ScanState ss;
uint32 flags; /* mask of CUSTOMPATH_* flags, see relation.h */
List *custom_ps; /* list of child PlanState nodes, if any */
+ Size pscan_len; /* size of parallel coordination information */
const CustomExecMethods *methods;
} CustomScanState;