From d1b7c1ffe72e86932b5395f29e006c3f503bc53d Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 28 Sep 2015 21:55:57 -0400 Subject: [PATCH] Parallel executor support. This code provides infrastructure for a parallel leader to start up parallel workers to execute subtrees of the plan tree being executed in the master. User-supplied parameters from ParamListInfo are passed down, but PARAM_EXEC parameters are not. Various other constructs, such as initplans, subplans, and CTEs, are also not currently shared. Nevertheless, there's enough here to support a basic implementation of parallel query, and we can lift some of the current restrictions as needed. Amit Kapila and Robert Haas --- src/backend/executor/Makefile | 3 +- src/backend/executor/execParallel.c | 585 +++++++++++++++++++++++++++ src/backend/executor/instrument.c | 78 ++++ src/backend/executor/tqueue.c | 4 +- src/backend/nodes/copyfuncs.c | 1 + src/backend/nodes/outfuncs.c | 1 + src/backend/nodes/params.c | 155 +++++++ src/backend/nodes/readfuncs.c | 1 + src/backend/optimizer/plan/planner.c | 1 + src/backend/optimizer/plan/setrefs.c | 5 + src/backend/utils/adt/datum.c | 118 ++++++ src/include/executor/execParallel.h | 36 ++ src/include/executor/instrument.h | 5 + src/include/nodes/params.h | 3 + src/include/nodes/plannodes.h | 1 + src/include/nodes/relation.h | 2 + src/include/utils/datum.h | 10 + 17 files changed, 1007 insertions(+), 2 deletions(-) create mode 100644 src/backend/executor/execParallel.c create mode 100644 src/include/executor/execParallel.h diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 249534bb92..f5e1e1aefc 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -13,7 +13,8 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \ - execMain.o execProcnode.o execQual.o execScan.o execTuples.o \ + execMain.o execParallel.o execProcnode.o execQual.o \ + execScan.o execTuples.o \ execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \ nodeBitmapAnd.o nodeBitmapOr.o \ nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c new file mode 100644 index 0000000000..a409a9a571 --- /dev/null +++ b/src/backend/executor/execParallel.c @@ -0,0 +1,585 @@ +/*------------------------------------------------------------------------- + * + * execParallel.c + * Support routines for parallel execution. + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/execParallel.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/execParallel.h" +#include "executor/executor.h" +#include "executor/tqueue.h" +#include "nodes/nodeFuncs.h" +#include "optimizer/planmain.h" +#include "optimizer/planner.h" +#include "storage/spin.h" +#include "tcop/tcopprot.h" +#include "utils/memutils.h" +#include "utils/snapmgr.h" + +/* + * Magic numbers for parallel executor communication. We use constants + * greater than any 32-bit integer here so that values < 2^32 can be used + * by individual parallel nodes to store their own state. + */ +#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001) +#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003) +#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005) + +#define PARALLEL_TUPLE_QUEUE_SIZE 65536 + +/* DSM structure for accumulating per-PlanState instrumentation. */ +typedef struct SharedPlanStateInstrumentation +{ + int plan_node_id; + slock_t mutex; + Instrumentation instr; +} SharedPlanStateInstrumentation; + +/* DSM structure for accumulating per-PlanState instrumentation. */ +struct SharedExecutorInstrumentation +{ + int instrument_options; + int ps_ninstrument; /* # of ps_instrument structures following */ + SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* Context object for ExecParallelEstimate. */ +typedef struct ExecParallelEstimateContext +{ + ParallelContext *pcxt; + int nnodes; +} ExecParallelEstimateContext; + +/* Context object for ExecParallelEstimate. */ +typedef struct ExecParallelInitializeDSMContext +{ + ParallelContext *pcxt; + SharedExecutorInstrumentation *instrumentation; + int nnodes; +} ExecParallelInitializeDSMContext; + +/* Helper functions that run in the parallel leader. */ +static char *ExecSerializePlan(Plan *plan, List *rangetable); +static bool ExecParallelEstimate(PlanState *node, + ExecParallelEstimateContext *e); +static bool ExecParallelInitializeDSM(PlanState *node, + ExecParallelInitializeDSMContext *d); +static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt); +static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, + SharedExecutorInstrumentation *instrumentation); + +/* Helper functions that run in the parallel worker. */ +static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); +static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); + +/* + * Create a serialized representation of the plan to be sent to each worker. + */ +static char * +ExecSerializePlan(Plan *plan, List *rangetable) +{ + PlannedStmt *pstmt; + ListCell *tlist; + + /* We can't scribble on the original plan, so make a copy. */ + plan = copyObject(plan); + + /* + * The worker will start its own copy of the executor, and that copy will + * insert a junk filter if the toplevel node has any resjunk entries. We + * don't want that to happen, because while resjunk columns shouldn't be + * sent back to the user, here the tuples are coming back to another + * backend which may very well need them. So mutate the target list + * accordingly. This is sort of a hack; there might be better ways to do + * this... + */ + foreach(tlist, plan->targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(tlist); + + tle->resjunk = false; + } + + /* + * Create a dummy PlannedStmt. Most of the fields don't need to be valid + * for our purposes, but the worker will need at least a minimal + * PlannedStmt to start the executor. + */ + pstmt = makeNode(PlannedStmt); + pstmt->commandType = CMD_SELECT; + pstmt->queryId = 0; + pstmt->hasReturning = 0; + pstmt->hasModifyingCTE = 0; + pstmt->canSetTag = 1; + pstmt->transientPlan = 0; + pstmt->planTree = plan; + pstmt->rtable = rangetable; + pstmt->resultRelations = NIL; + pstmt->utilityStmt = NULL; + pstmt->subplans = NIL; + pstmt->rewindPlanIDs = NULL; + pstmt->rowMarks = NIL; + pstmt->nParamExec = 0; + pstmt->relationOids = NIL; + pstmt->invalItems = NIL; /* workers can't replan anyway... */ + pstmt->hasRowSecurity = false; + + /* Return serialized copy of our dummy PlannedStmt. */ + return nodeToString(pstmt); +} + +/* + * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes + * may need some state which is shared across all parallel workers. Before + * we size the DSM, give them a chance to call shm_toc_estimate_chunk or + * shm_toc_estimate_keys on &pcxt->estimator. + * + * While we're at it, count the number of PlanState nodes in the tree, so + * we know how many SharedPlanStateInstrumentation structures we need. + */ +static bool +ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) +{ + if (planstate == NULL) + return false; + + /* Count this node. */ + e->nnodes++; + + /* + * XXX. Call estimators for parallel-aware nodes here, when we have + * some. + */ + + return planstate_tree_walker(planstate, ExecParallelEstimate, e); +} + +/* + * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes + * may need to initialize shared state in the DSM before parallel workers + * are available. They can allocate the space they previous estimated using + * shm_toc_allocate, and add the keys they previously estimated using + * shm_toc_insert, in each case targeting pcxt->toc. + */ +static bool +ExecParallelInitializeDSM(PlanState *planstate, + ExecParallelInitializeDSMContext *d) +{ + if (planstate == NULL) + return false; + + /* If instrumentation is enabled, initialize array slot for this node. */ + if (d->instrumentation != NULL) + { + SharedPlanStateInstrumentation *instrumentation; + + instrumentation = &d->instrumentation->ps_instrument[d->nnodes]; + Assert(d->nnodes < d->instrumentation->ps_ninstrument); + instrumentation->plan_node_id = planstate->plan->plan_node_id; + SpinLockInit(&instrumentation->mutex); + InstrInit(&instrumentation->instr, + d->instrumentation->instrument_options); + } + + /* Count this node. */ + d->nnodes++; + + /* + * XXX. Call initializers for parallel-aware plan nodes, when we have + * some. + */ + + return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d); +} + +/* + * It sets up the response queues for backend workers to return tuples + * to the main backend and start the workers. + */ +static shm_mq_handle ** +ExecParallelSetupTupleQueues(ParallelContext *pcxt) +{ + shm_mq_handle **responseq; + char *tqueuespace; + int i; + + /* Skip this if no workers. */ + if (pcxt->nworkers == 0) + return NULL; + + /* Allocate memory for shared memory queue handles. */ + responseq = (shm_mq_handle **) + palloc(pcxt->nworkers * sizeof(shm_mq_handle *)); + + /* Allocate space from the DSM for the queues themselves. */ + tqueuespace = shm_toc_allocate(pcxt->toc, + PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); + + /* Create the queues, and become the receiver for each. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + shm_mq *mq; + + mq = shm_mq_create(tqueuespace + i * PARALLEL_TUPLE_QUEUE_SIZE, + (Size) PARALLEL_TUPLE_QUEUE_SIZE); + + shm_mq_set_receiver(mq, MyProc); + responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL); + } + + /* Add array of queues to shm_toc, so others can find it. */ + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace); + + /* Return array of handles. */ + return responseq; +} + +/* + * Sets up the required infrastructure for backend workers to perform + * execution and return results to the main backend. + */ +ParallelExecutorInfo * +ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) +{ + ParallelExecutorInfo *pei; + ParallelContext *pcxt; + ExecParallelEstimateContext e; + ExecParallelInitializeDSMContext d; + char *pstmt_data; + char *pstmt_space; + char *param_space; + BufferUsage *bufusage_space; + SharedExecutorInstrumentation *instrumentation = NULL; + int pstmt_len; + int param_len; + int instrumentation_len = 0; + + /* Allocate object for return value. */ + pei = palloc0(sizeof(ParallelExecutorInfo)); + pei->planstate = planstate; + + /* Fix up and serialize plan to be sent to workers. */ + pstmt_data = ExecSerializePlan(planstate->plan, estate->es_range_table); + + /* Create a parallel context. */ + pcxt = CreateParallelContext(ParallelQueryMain, nworkers); + pei->pcxt = pcxt; + + /* + * Before telling the parallel context to create a dynamic shared memory + * segment, we need to figure out how big it should be. Estimate space + * for the various things we need to store. + */ + + /* Estimate space for serialized PlannedStmt. */ + pstmt_len = strlen(pstmt_data) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for serialized ParamListInfo. */ + param_len = EstimateParamListSpace(estate->es_param_list_info); + shm_toc_estimate_chunk(&pcxt->estimator, param_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Estimate space for BufferUsage. + * + * If EXPLAIN is not in use and there are no extensions loaded that care, + * we could skip this. But we have no way of knowing whether anyone's + * looking at pgBufferUsage, so do it unconditionally. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + sizeof(BufferUsage) * pcxt->nworkers); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for tuple queues. */ + shm_toc_estimate_chunk(&pcxt->estimator, + PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Give parallel-aware nodes a chance to add to the estimates, and get + * a count of how many PlanState nodes there are. + */ + e.pcxt = pcxt; + e.nnodes = 0; + ExecParallelEstimate(planstate, &e); + + /* Estimate space for instrumentation, if required. */ + if (estate->es_instrument) + { + instrumentation_len = + offsetof(SharedExecutorInstrumentation, ps_instrument) + + sizeof(SharedPlanStateInstrumentation) * e.nnodes; + shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + + /* Everyone's had a chance to ask for space, so now create the DSM. */ + InitializeParallelDSM(pcxt); + + /* + * OK, now we have a dynamic shared memory segment, and it should be big + * enough to store all of the data we estimated we would want to put into + * it, plus whatever general stuff (not specifically executor-related) the + * ParallelContext itself needs to store there. None of the space we + * asked for has been allocated or initialized yet, though, so do that. + */ + + /* Store serialized PlannedStmt. */ + pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len); + memcpy(pstmt_space, pstmt_data, pstmt_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space); + + /* Store serialized ParamListInfo. */ + param_space = shm_toc_allocate(pcxt->toc, param_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space); + SerializeParamList(estate->es_param_list_info, ¶m_space); + + /* Allocate space for each worker's BufferUsage; no need to initialize. */ + bufusage_space = shm_toc_allocate(pcxt->toc, + sizeof(BufferUsage) * pcxt->nworkers); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space); + pei->buffer_usage = bufusage_space; + + /* Set up tuple queues. */ + pei->tqueue = ExecParallelSetupTupleQueues(pcxt); + + /* + * If instrumentation options were supplied, allocate space for the + * data. It only gets partially initialized here; the rest happens + * during ExecParallelInitializeDSM. + */ + if (estate->es_instrument) + { + instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len); + instrumentation->instrument_options = estate->es_instrument; + instrumentation->ps_ninstrument = e.nnodes; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, + instrumentation); + pei->instrumentation = instrumentation; + } + + /* + * Give parallel-aware nodes a chance to initialize their shared data. + * This also initializes the elements of instrumentation->ps_instrument, + * if it exists. + */ + d.pcxt = pcxt; + d.instrumentation = instrumentation; + d.nnodes = 0; + ExecParallelInitializeDSM(planstate, &d); + + /* + * Make sure that the world hasn't shifted under our feat. This could + * probably just be an Assert(), but let's be conservative for now. + */ + if (e.nnodes != d.nnodes) + elog(ERROR, "inconsistent count of PlanState nodes"); + + /* OK, we're ready to rock and roll. */ + return pei; +} + +/* + * Copy instrumentation information about this node and its descendents from + * dynamic shared memory. + */ +static bool +ExecParallelRetrieveInstrumentation(PlanState *planstate, + SharedExecutorInstrumentation *instrumentation) +{ + int i; + int plan_node_id = planstate->plan->plan_node_id; + SharedPlanStateInstrumentation *ps_instrument; + + /* Find the instumentation for this node. */ + for (i = 0; i < instrumentation->ps_ninstrument; ++i) + if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id) + break; + if (i >= instrumentation->ps_ninstrument) + elog(ERROR, "plan node %d not found", plan_node_id); + + /* No need to acquire the spinlock here; workers have exited already. */ + ps_instrument = &instrumentation->ps_instrument[i]; + InstrAggNode(planstate->instrument, &ps_instrument->instr); + + return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation, + instrumentation); +} + +/* + * Finish parallel execution. We wait for parallel workers to finish, and + * accumulate their buffer usage and instrumentation. + */ +void +ExecParallelFinish(ParallelExecutorInfo *pei) +{ + int i; + + /* First, wait for the workers to finish. */ + WaitForParallelWorkersToFinish(pei->pcxt); + + /* Next, accumulate buffer usage. */ + for (i = 0; i < pei->pcxt->nworkers; ++i) + InstrAccumParallelQuery(&pei->buffer_usage[i]); + + /* Finally, accumulate instrumentation, if any. */ + if (pei->instrumentation) + ExecParallelRetrieveInstrumentation(pei->planstate, + pei->instrumentation); +} + +/* + * Create a DestReceiver to write tuples we produce to the shm_mq designated + * for that purpose. + */ +static DestReceiver * +ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc) +{ + char *mqspace; + shm_mq *mq; + + mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE); + mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE; + mq = (shm_mq *) mqspace; + shm_mq_set_sender(mq, MyProc); + return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL)); +} + +/* + * Create a QueryDesc for the PlannedStmt we are to execute, and return it. + */ +static QueryDesc * +ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, + int instrument_options) +{ + char *pstmtspace; + char *paramspace; + PlannedStmt *pstmt; + ParamListInfo paramLI; + + /* Reconstruct leader-supplied PlannedStmt. */ + pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT); + pstmt = (PlannedStmt *) stringToNode(pstmtspace); + + /* Reconstruct ParamListInfo. */ + paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS); + paramLI = RestoreParamList(¶mspace); + + /* + * Create a QueryDesc for the query. + * + * It's not obvious how to obtain the query string from here; and even if + * we could copying it would take more cycles than not copying it. But + * it's a bit unsatisfying to just use a dummy string here, so consider + * revising this someday. + */ + return CreateQueryDesc(pstmt, + "", + GetActiveSnapshot(), InvalidSnapshot, + receiver, paramLI, instrument_options); +} + +/* + * Copy instrumentation information from this node and its descendents into + * dynamic shared memory, so that the parallel leader can retrieve it. + */ +static bool +ExecParallelReportInstrumentation(PlanState *planstate, + SharedExecutorInstrumentation *instrumentation) +{ + int i; + int plan_node_id = planstate->plan->plan_node_id; + SharedPlanStateInstrumentation *ps_instrument; + + /* + * If we shuffled the plan_node_id values in ps_instrument into sorted + * order, we could use binary search here. This might matter someday + * if we're pushing down sufficiently large plan trees. For now, do it + * the slow, dumb way. + */ + for (i = 0; i < instrumentation->ps_ninstrument; ++i) + if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id) + break; + if (i >= instrumentation->ps_ninstrument) + elog(ERROR, "plan node %d not found", plan_node_id); + + /* + * There's one SharedPlanStateInstrumentation per plan_node_id, so we + * must use a spinlock in case multiple workers report at the same time. + */ + ps_instrument = &instrumentation->ps_instrument[i]; + SpinLockAcquire(&ps_instrument->mutex); + InstrAggNode(&ps_instrument->instr, planstate->instrument); + SpinLockRelease(&ps_instrument->mutex); + + return planstate_tree_walker(planstate, ExecParallelReportInstrumentation, + instrumentation); +} + +/* + * Main entrypoint for parallel query worker processes. + * + * We reach this function from ParallelMain, so the setup necessary to create + * a sensible parallel environment has already been done; ParallelMain worries + * about stuff like the transaction state, combo CID mappings, and GUC values, + * so we don't need to deal with any of that here. + * + * Our job is to deal with concerns specific to the executor. The parallel + * group leader will have stored a serialized PlannedStmt, and it's our job + * to execute that plan and write the resulting tuples to the appropriate + * tuple queue. Various bits of supporting information that we need in order + * to do this are also stored in the dsm_segment and can be accessed through + * the shm_toc. + */ +static void +ParallelQueryMain(dsm_segment *seg, shm_toc *toc) +{ + BufferUsage *buffer_usage; + DestReceiver *receiver; + QueryDesc *queryDesc; + SharedExecutorInstrumentation *instrumentation; + int instrument_options = 0; + + /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ + receiver = ExecParallelGetReceiver(seg, toc); + instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION); + if (instrumentation != NULL) + instrument_options = instrumentation->instrument_options; + queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options); + + /* Prepare to track buffer usage during query execution. */ + InstrStartParallelQuery(); + + /* Start up the executor, have it run the plan, and then shut it down. */ + ExecutorStart(queryDesc, 0); + ExecutorRun(queryDesc, ForwardScanDirection, 0L); + ExecutorFinish(queryDesc); + ExecutorEnd(queryDesc); + + /* Report buffer usage during parallel execution. */ + buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE); + InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]); + + /* Report instrumentation data if any instrumentation options are set. */ + if (instrumentation != NULL) + ExecParallelReportInstrumentation(queryDesc->planstate, + instrumentation); + + /* Cleanup. */ + FreeQueryDesc(queryDesc); + (*receiver->rDestroy) (receiver); +} diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index f5351eb397..bf509b1e75 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -18,7 +18,9 @@ #include "executor/instrument.h" BufferUsage pgBufferUsage; +static BufferUsage save_pgBufferUsage; +static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add); static void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add, const BufferUsage *sub); @@ -47,6 +49,15 @@ InstrAlloc(int n, int instrument_options) return instr; } +/* Initialize an pre-allocated instrumentation structure. */ +void +InstrInit(Instrumentation *instr, int instrument_options) +{ + memset(instr, 0, sizeof(Instrumentation)); + instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0; + instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0; +} + /* Entry to a plan node */ void InstrStartNode(Instrumentation *instr) @@ -127,6 +138,73 @@ InstrEndLoop(Instrumentation *instr) instr->tuplecount = 0; } +/* aggregate instrumentation information */ +void +InstrAggNode(Instrumentation *dst, Instrumentation *add) +{ + if (!dst->running && add->running) + { + dst->running = true; + dst->firsttuple = add->firsttuple; + } + else if (dst->running && add->running && dst->firsttuple > add->firsttuple) + dst->firsttuple = add->firsttuple; + + INSTR_TIME_ADD(dst->counter, add->counter); + + dst->tuplecount += add->tuplecount; + dst->startup += add->startup; + dst->total += add->total; + dst->ntuples += add->ntuples; + dst->nloops += add->nloops; + dst->nfiltered1 += add->nfiltered1; + dst->nfiltered2 += add->nfiltered2; + + /* Add delta of buffer usage since entry to node's totals */ + if (dst->need_bufusage) + BufferUsageAdd(&dst->bufusage, &add->bufusage); +} + +/* note current values during parallel executor startup */ +void +InstrStartParallelQuery(void) +{ + save_pgBufferUsage = pgBufferUsage; +} + +/* report usage after parallel executor shutdown */ +void +InstrEndParallelQuery(BufferUsage *result) +{ + memset(result, 0, sizeof(BufferUsage)); + BufferUsageAccumDiff(result, &pgBufferUsage, &save_pgBufferUsage); +} + +/* accumulate work done by workers in leader's stats */ +void +InstrAccumParallelQuery(BufferUsage *result) +{ + BufferUsageAdd(&pgBufferUsage, result); +} + +/* dst += add */ +static void +BufferUsageAdd(BufferUsage *dst, const BufferUsage *add) +{ + dst->shared_blks_hit += add->shared_blks_hit; + dst->shared_blks_read += add->shared_blks_read; + dst->shared_blks_dirtied += add->shared_blks_dirtied; + dst->shared_blks_written += add->shared_blks_written; + dst->local_blks_hit += add->local_blks_hit; + dst->local_blks_read += add->local_blks_read; + dst->local_blks_dirtied += add->local_blks_dirtied; + dst->local_blks_written += add->local_blks_written; + dst->temp_blks_read += add->temp_blks_read; + dst->temp_blks_written += add->temp_blks_written; + INSTR_TIME_ADD(dst->blk_read_time, add->blk_read_time); + INSTR_TIME_ADD(dst->blk_write_time, add->blk_write_time); +} + /* dst += add - sub */ static void BufferUsageAccumDiff(BufferUsage *dst, diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index d0edf4e559..67143d33da 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -66,7 +66,9 @@ tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) static void tqueueShutdownReceiver(DestReceiver *self) { - /* do nothing */ + TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + + shm_mq_detach(shm_mq_get_queue(tqueue->handle)); } /* diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 62355aae51..4b4ddec4c2 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -112,6 +112,7 @@ CopyPlanFields(const Plan *from, Plan *newnode) COPY_SCALAR_FIELD(total_cost); COPY_SCALAR_FIELD(plan_rows); COPY_SCALAR_FIELD(plan_width); + COPY_SCALAR_FIELD(plan_node_id); COPY_NODE_FIELD(targetlist); COPY_NODE_FIELD(qual); COPY_NODE_FIELD(lefttree); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index c91273cf23..ee9c360345 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -271,6 +271,7 @@ _outPlanInfo(StringInfo str, const Plan *node) WRITE_FLOAT_FIELD(total_cost, "%.2f"); WRITE_FLOAT_FIELD(plan_rows, "%.0f"); WRITE_INT_FIELD(plan_width); + WRITE_INT_FIELD(plan_node_id); WRITE_NODE_FIELD(targetlist); WRITE_NODE_FIELD(qual); WRITE_NODE_FIELD(lefttree); diff --git a/src/backend/nodes/params.c b/src/backend/nodes/params.c index fb803f8ee8..d093263589 100644 --- a/src/backend/nodes/params.c +++ b/src/backend/nodes/params.c @@ -16,6 +16,7 @@ #include "postgres.h" #include "nodes/params.h" +#include "storage/shmem.h" #include "utils/datum.h" #include "utils/lsyscache.h" @@ -73,3 +74,157 @@ copyParamList(ParamListInfo from) return retval; } + +/* + * Estimate the amount of space required to serialize a ParamListInfo. + */ +Size +EstimateParamListSpace(ParamListInfo paramLI) +{ + int i; + Size sz = sizeof(int); + + if (paramLI == NULL || paramLI->numParams <= 0) + return sz; + + for (i = 0; i < paramLI->numParams; i++) + { + ParamExternData *prm = ¶mLI->params[i]; + int16 typLen; + bool typByVal; + + /* give hook a chance in case parameter is dynamic */ + if (!OidIsValid(prm->ptype) && paramLI->paramFetch != NULL) + (*paramLI->paramFetch) (paramLI, i + 1); + + sz = add_size(sz, sizeof(Oid)); /* space for type OID */ + sz = add_size(sz, sizeof(uint16)); /* space for pflags */ + + /* space for datum/isnull */ + if (OidIsValid(prm->ptype)) + get_typlenbyval(prm->ptype, &typLen, &typByVal); + else + { + /* If no type OID, assume by-value, like copyParamList does. */ + typLen = sizeof(Datum); + typByVal = true; + } + sz = add_size(sz, + datumEstimateSpace(prm->value, prm->isnull, typByVal, typLen)); + } + + return sz; +} + +/* + * Serialize a paramListInfo structure into caller-provided storage. + * + * We write the number of parameters first, as a 4-byte integer, and then + * write details for each parameter in turn. The details for each parameter + * consist of a 4-byte type OID, 2 bytes of flags, and then the datum as + * serialized by datumSerialize(). The caller is responsible for ensuring + * that there is enough storage to store the number of bytes that will be + * written; use EstimateParamListSpace to find out how many will be needed. + * *start_address is updated to point to the byte immediately following those + * written. + * + * RestoreParamList can be used to recreate a ParamListInfo based on the + * serialized representation; this will be a static, self-contained copy + * just as copyParamList would create. + */ +void +SerializeParamList(ParamListInfo paramLI, char **start_address) +{ + int nparams; + int i; + + /* Write number of parameters. */ + if (paramLI == NULL || paramLI->numParams <= 0) + nparams = 0; + else + nparams = paramLI->numParams; + memcpy(*start_address, &nparams, sizeof(int)); + *start_address += sizeof(int); + + /* Write each parameter in turn. */ + for (i = 0; i < nparams; i++) + { + ParamExternData *prm = ¶mLI->params[i]; + int16 typLen; + bool typByVal; + + /* give hook a chance in case parameter is dynamic */ + if (!OidIsValid(prm->ptype) && paramLI->paramFetch != NULL) + (*paramLI->paramFetch) (paramLI, i + 1); + + /* Write type OID. */ + memcpy(*start_address, &prm->ptype, sizeof(Oid)); + *start_address += sizeof(Oid); + + /* Write flags. */ + memcpy(*start_address, &prm->pflags, sizeof(uint16)); + *start_address += sizeof(uint16); + + /* Write datum/isnull. */ + if (OidIsValid(prm->ptype)) + get_typlenbyval(prm->ptype, &typLen, &typByVal); + else + { + /* If no type OID, assume by-value, like copyParamList does. */ + typLen = sizeof(Datum); + typByVal = true; + } + datumSerialize(prm->value, prm->isnull, typByVal, typLen, + start_address); + } +} + +/* + * Copy a ParamListInfo structure. + * + * The result is allocated in CurrentMemoryContext. + * + * Note: the intent of this function is to make a static, self-contained + * set of parameter values. If dynamic parameter hooks are present, we + * intentionally do not copy them into the result. Rather, we forcibly + * instantiate all available parameter values and copy the datum values. + */ +ParamListInfo +RestoreParamList(char **start_address) +{ + ParamListInfo paramLI; + Size size; + int i; + int nparams; + + memcpy(&nparams, *start_address, sizeof(int)); + *start_address += sizeof(int); + + size = offsetof(ParamListInfoData, params) + + nparams * sizeof(ParamExternData); + + paramLI = (ParamListInfo) palloc(size); + paramLI->paramFetch = NULL; + paramLI->paramFetchArg = NULL; + paramLI->parserSetup = NULL; + paramLI->parserSetupArg = NULL; + paramLI->numParams = nparams; + + for (i = 0; i < nparams; i++) + { + ParamExternData *prm = ¶mLI->params[i]; + + /* Read type OID. */ + memcpy(&prm->ptype, *start_address, sizeof(Oid)); + *start_address += sizeof(Oid); + + /* Read flags. */ + memcpy(&prm->pflags, *start_address, sizeof(uint16)); + *start_address += sizeof(uint16); + + /* Read datum/isnull. */ + prm->value = datumRestore(start_address, &prm->isnull); + } + + return paramLI; +} diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 08519ed298..72368ab981 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1413,6 +1413,7 @@ ReadCommonPlan(Plan *local_node) READ_FLOAT_FIELD(total_cost); READ_FLOAT_FIELD(plan_rows); READ_INT_FIELD(plan_width); + READ_INT_FIELD(plan_node_id); READ_NODE_FIELD(targetlist); READ_NODE_FIELD(qual); READ_NODE_FIELD(lefttree); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 06be922929..e1ee67cd60 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -196,6 +196,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) glob->nParamExec = 0; glob->lastPHId = 0; glob->lastRowMarkId = 0; + glob->lastPlanNodeId = 0; glob->transientPlan = false; glob->hasRowSecurity = false; diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index daeb5842d0..3c8169725a 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -174,6 +174,8 @@ static bool extract_query_dependencies_walker(Node *node, * Currently, relations and user-defined functions are the only types of * objects that are explicitly tracked this way. * + * 7. We assign every plan node in the tree a unique ID. + * * We also perform one final optimization step, which is to delete * SubqueryScan plan nodes that aren't doing anything useful (ie, have * no qual and a no-op targetlist). The reason for doing this last is that @@ -436,6 +438,9 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) if (plan == NULL) return NULL; + /* Assign this node a unique ID. */ + plan->plan_node_id = root->glob->lastPlanNodeId++; + /* * Plan-type-specific fixes */ diff --git a/src/backend/utils/adt/datum.c b/src/backend/utils/adt/datum.c index e8af0304c0..3d9e35442d 100644 --- a/src/backend/utils/adt/datum.c +++ b/src/backend/utils/adt/datum.c @@ -246,3 +246,121 @@ datumIsEqual(Datum value1, Datum value2, bool typByVal, int typLen) } return res; } + +/*------------------------------------------------------------------------- + * datumEstimateSpace + * + * Compute the amount of space that datumSerialize will require for a + * particular Datum. + *------------------------------------------------------------------------- + */ +Size +datumEstimateSpace(Datum value, bool isnull, bool typByVal, int typLen) +{ + Size sz = sizeof(int); + + if (!isnull) + { + /* no need to use add_size, can't overflow */ + if (typByVal) + sz += sizeof(Datum); + else + sz += datumGetSize(value, typByVal, typLen); + } + + return sz; +} + +/*------------------------------------------------------------------------- + * datumSerialize + * + * Serialize a possibly-NULL datum into caller-provided storage. + * + * The format is as follows: first, we write a 4-byte header word, which + * is either the length of a pass-by-reference datum, -1 for a + * pass-by-value datum, or -2 for a NULL. If the value is NULL, nothing + * further is written. If it is pass-by-value, sizeof(Datum) bytes + * follow. Otherwise, the number of bytes indicated by the header word + * follow. The caller is responsible for ensuring that there is enough + * storage to store the number of bytes that will be written; use + * datumEstimateSpace() to find out how many will be needed. + * *start_address is updated to point to the byte immediately following + * those written. + *------------------------------------------------------------------------- + */ +void +datumSerialize(Datum value, bool isnull, bool typByVal, int typLen, + char **start_address) +{ + int header; + + /* Write header word. */ + if (isnull) + header = -2; + else if (typByVal) + header = -1; + else + header = datumGetSize(value, typByVal, typLen); + memcpy(*start_address, &header, sizeof(int)); + *start_address += sizeof(int); + + /* If not null, write payload bytes. */ + if (!isnull) + { + if (typByVal) + { + memcpy(*start_address, &value, sizeof(Datum)); + *start_address += sizeof(Datum); + } + else + { + memcpy(*start_address, DatumGetPointer(value), header); + *start_address += header; + } + } +} + +/*------------------------------------------------------------------------- + * datumRestore + * + * Restore a possibly-NULL datum previously serialized by datumSerialize. + * *start_address is updated according to the number of bytes consumed. + *------------------------------------------------------------------------- + */ +Datum +datumRestore(char **start_address, bool *isnull) +{ + int header; + void *d; + + /* Read header word. */ + memcpy(&header, *start_address, sizeof(int)); + *start_address += sizeof(int); + + /* If this datum is NULL, we can stop here. */ + if (header == -2) + { + *isnull = true; + return (Datum) 0; + } + + /* OK, datum is not null. */ + *isnull = false; + + /* If this datum is pass-by-value, sizeof(Datum) bytes follow. */ + if (header == -1) + { + Datum val; + + memcpy(&val, *start_address, sizeof(Datum)); + *start_address += sizeof(Datum); + return val; + } + + /* Pass-by-reference case; copy indicated number of bytes. */ + Assert(header > 0); + d = palloc(header); + memcpy(d, *start_address, header); + *start_address += header; + return PointerGetDatum(d); +} diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h new file mode 100644 index 0000000000..4fc797ad98 --- /dev/null +++ b/src/include/executor/execParallel.h @@ -0,0 +1,36 @@ +/*-------------------------------------------------------------------- + * execParallel.h + * POSTGRES parallel execution interface + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/executor/execParallel.h + *-------------------------------------------------------------------- + */ + +#ifndef EXECPARALLEL_H +#define EXECPARALLEL_H + +#include "access/parallel.h" +#include "nodes/execnodes.h" +#include "nodes/parsenodes.h" +#include "nodes/plannodes.h" + +typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation; + +typedef struct ParallelExecutorInfo +{ + PlanState *planstate; + ParallelContext *pcxt; + BufferUsage *buffer_usage; + SharedExecutorInstrumentation *instrumentation; + shm_mq_handle **tqueue; +} ParallelExecutorInfo; + +extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, + EState *estate, int nworkers); +extern void ExecParallelFinish(ParallelExecutorInfo *pei); + +#endif /* EXECPARALLEL_H */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index c9a2129c7a..f28e56ce48 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -66,8 +66,13 @@ typedef struct Instrumentation extern PGDLLIMPORT BufferUsage pgBufferUsage; extern Instrumentation *InstrAlloc(int n, int instrument_options); +extern void InstrInit(Instrumentation *instr, int instrument_options); extern void InstrStartNode(Instrumentation *instr); extern void InstrStopNode(Instrumentation *instr, double nTuples); extern void InstrEndLoop(Instrumentation *instr); +extern void InstrAggNode(Instrumentation *dst, Instrumentation *add); +extern void InstrStartParallelQuery(void); +extern void InstrEndParallelQuery(BufferUsage *result); +extern void InstrAccumParallelQuery(BufferUsage *result); #endif /* INSTRUMENT_H */ diff --git a/src/include/nodes/params.h b/src/include/nodes/params.h index a0f7dd0c55..83bebde69d 100644 --- a/src/include/nodes/params.h +++ b/src/include/nodes/params.h @@ -102,5 +102,8 @@ typedef struct ParamExecData /* Functions found in src/backend/nodes/params.c */ extern ParamListInfo copyParamList(ParamListInfo from); +extern Size EstimateParamListSpace(ParamListInfo paramLI); +extern void SerializeParamList(ParamListInfo paramLI, char **start_address); +extern ParamListInfo RestoreParamList(char **start_address); #endif /* PARAMS_H */ diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index cc259f1f67..1e2d2bbaa1 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -111,6 +111,7 @@ typedef struct Plan /* * Common structural data for all Plan types. */ + int plan_node_id; /* unique across entire final plan tree */ List *targetlist; /* target list to be computed at this node */ List *qual; /* implicitly-ANDed qual conditions */ struct Plan *lefttree; /* input plan tree(s) */ diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 79bed3316b..961b5d17cf 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -99,6 +99,8 @@ typedef struct PlannerGlobal Index lastRowMarkId; /* highest PlanRowMark ID assigned */ + int lastPlanNodeId; /* highest plan node ID assigned */ + bool transientPlan; /* redo plan when TransactionXmin changes? */ bool hasRowSecurity; /* row security applied? */ diff --git a/src/include/utils/datum.h b/src/include/utils/datum.h index c572f790a5..e9d4be5c4b 100644 --- a/src/include/utils/datum.h +++ b/src/include/utils/datum.h @@ -46,4 +46,14 @@ extern Datum datumTransfer(Datum value, bool typByVal, int typLen); extern bool datumIsEqual(Datum value1, Datum value2, bool typByVal, int typLen); +/* + * Serialize and restore datums so that we can transfer them to parallel + * workers. + */ +extern Size datumEstimateSpace(Datum value, bool isnull, bool typByVal, + int typLen); +extern void datumSerialize(Datum value, bool isnull, bool typByVal, + int typLen, char **start_address); +extern Datum datumRestore(char **start_address, bool *isnull); + #endif /* DATUM_H */ -- 2.40.0