PlanState *planstate, ExplainState *es);
static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es);
static const char *explain_get_index_name(Oid indexId);
+static void show_buffer_usage(ExplainState *es, const BufferUsage *usage);
static void ExplainIndexScanDetails(Oid indexid, ScanDirection indexorderdir,
ExplainState *es);
static void ExplainScanTarget(Scan *plan, ExplainState *es);
/* Show buffer usage */
if (es->buffers && planstate->instrument)
+ show_buffer_usage(es, &planstate->instrument->bufusage);
+
+ /* Show worker detail */
+ if (es->analyze && es->verbose && planstate->worker_instrument)
{
- const BufferUsage *usage = &planstate->instrument->bufusage;
+ WorkerInstrumentation *w = planstate->worker_instrument;
+ bool opened_group = false;
+ int n;
- if (es->format == EXPLAIN_FORMAT_TEXT)
+ for (n = 0; n < w->num_workers; ++n)
{
- bool has_shared = (usage->shared_blks_hit > 0 ||
- usage->shared_blks_read > 0 ||
- usage->shared_blks_dirtied > 0 ||
- usage->shared_blks_written > 0);
- bool has_local = (usage->local_blks_hit > 0 ||
- usage->local_blks_read > 0 ||
- usage->local_blks_dirtied > 0 ||
- usage->local_blks_written > 0);
- bool has_temp = (usage->temp_blks_read > 0 ||
- usage->temp_blks_written > 0);
- bool has_timing = (!INSTR_TIME_IS_ZERO(usage->blk_read_time) ||
- !INSTR_TIME_IS_ZERO(usage->blk_write_time));
+ Instrumentation *instrument = &w->instrument[n];
+ double nloops = instrument->nloops;
+ double startup_sec;
+ double total_sec;
+ double rows;
+
+ if (nloops <= 0)
+ continue;
+ startup_sec = 1000.0 * instrument->startup / nloops;
+ total_sec = 1000.0 * instrument->total / nloops;
+ rows = instrument->ntuples / nloops;
- /* Show only positive counter values. */
- if (has_shared || has_local || has_temp)
+ if (es->format == EXPLAIN_FORMAT_TEXT)
{
appendStringInfoSpaces(es->str, es->indent * 2);
- appendStringInfoString(es->str, "Buffers:");
-
- if (has_shared)
- {
- appendStringInfoString(es->str, " shared");
- if (usage->shared_blks_hit > 0)
- appendStringInfo(es->str, " hit=%ld",
- usage->shared_blks_hit);
- if (usage->shared_blks_read > 0)
- appendStringInfo(es->str, " read=%ld",
- usage->shared_blks_read);
- if (usage->shared_blks_dirtied > 0)
- appendStringInfo(es->str, " dirtied=%ld",
- usage->shared_blks_dirtied);
- if (usage->shared_blks_written > 0)
- appendStringInfo(es->str, " written=%ld",
- usage->shared_blks_written);
- if (has_local || has_temp)
- appendStringInfoChar(es->str, ',');
- }
- if (has_local)
+ appendStringInfo(es->str, "Worker %d: ", n);
+ if (es->timing)
+ appendStringInfo(es->str,
+ "actual time=%.3f..%.3f rows=%.0f loops=%.0f\n",
+ startup_sec, total_sec, rows, nloops);
+ else
+ appendStringInfo(es->str,
+ "actual rows=%.0f loops=%.0f\n",
+ rows, nloops);
+ es->indent++;
+ if (es->buffers)
+ show_buffer_usage(es, &instrument->bufusage);
+ es->indent--;
+ }
+ else
+ {
+ if (!opened_group)
{
- appendStringInfoString(es->str, " local");
- if (usage->local_blks_hit > 0)
- appendStringInfo(es->str, " hit=%ld",
- usage->local_blks_hit);
- if (usage->local_blks_read > 0)
- appendStringInfo(es->str, " read=%ld",
- usage->local_blks_read);
- if (usage->local_blks_dirtied > 0)
- appendStringInfo(es->str, " dirtied=%ld",
- usage->local_blks_dirtied);
- if (usage->local_blks_written > 0)
- appendStringInfo(es->str, " written=%ld",
- usage->local_blks_written);
- if (has_temp)
- appendStringInfoChar(es->str, ',');
+ ExplainOpenGroup("Workers", "Workers", false, es);
+ opened_group = true;
}
- if (has_temp)
+ ExplainOpenGroup("Worker", NULL, true, es);
+ ExplainPropertyInteger("Worker Number", n, es);
+
+ if (es->timing)
{
- appendStringInfoString(es->str, " temp");
- if (usage->temp_blks_read > 0)
- appendStringInfo(es->str, " read=%ld",
- usage->temp_blks_read);
- if (usage->temp_blks_written > 0)
- appendStringInfo(es->str, " written=%ld",
- usage->temp_blks_written);
+ ExplainPropertyFloat("Actual Startup Time", startup_sec, 3, es);
+ ExplainPropertyFloat("Actual Total Time", total_sec, 3, es);
}
- appendStringInfoChar(es->str, '\n');
- }
+ ExplainPropertyFloat("Actual Rows", rows, 0, es);
+ ExplainPropertyFloat("Actual Loops", nloops, 0, es);
- /* As above, show only positive counter values. */
- if (has_timing)
- {
- appendStringInfoSpaces(es->str, es->indent * 2);
- appendStringInfoString(es->str, "I/O Timings:");
- if (!INSTR_TIME_IS_ZERO(usage->blk_read_time))
- appendStringInfo(es->str, " read=%0.3f",
- INSTR_TIME_GET_MILLISEC(usage->blk_read_time));
- if (!INSTR_TIME_IS_ZERO(usage->blk_write_time))
- appendStringInfo(es->str, " write=%0.3f",
- INSTR_TIME_GET_MILLISEC(usage->blk_write_time));
- appendStringInfoChar(es->str, '\n');
+ if (es->buffers)
+ show_buffer_usage(es, &instrument->bufusage);
+
+ ExplainCloseGroup("Worker", NULL, true, es);
}
}
- else
- {
- ExplainPropertyLong("Shared Hit Blocks", usage->shared_blks_hit, es);
- ExplainPropertyLong("Shared Read Blocks", usage->shared_blks_read, es);
- ExplainPropertyLong("Shared Dirtied Blocks", usage->shared_blks_dirtied, es);
- ExplainPropertyLong("Shared Written Blocks", usage->shared_blks_written, es);
- ExplainPropertyLong("Local Hit Blocks", usage->local_blks_hit, es);
- ExplainPropertyLong("Local Read Blocks", usage->local_blks_read, es);
- ExplainPropertyLong("Local Dirtied Blocks", usage->local_blks_dirtied, es);
- ExplainPropertyLong("Local Written Blocks", usage->local_blks_written, es);
- ExplainPropertyLong("Temp Read Blocks", usage->temp_blks_read, es);
- ExplainPropertyLong("Temp Written Blocks", usage->temp_blks_written, es);
- ExplainPropertyFloat("I/O Read Time", INSTR_TIME_GET_MILLISEC(usage->blk_read_time), 3, es);
- ExplainPropertyFloat("I/O Write Time", INSTR_TIME_GET_MILLISEC(usage->blk_write_time), 3, es);
- }
+
+ if (opened_group)
+ ExplainCloseGroup("Workers", "Workers", false, es);
}
/* Get ready to display the child plans */
return result;
}
+/*
+ * Show buffer usage details.
+ */
+static void
+show_buffer_usage(ExplainState *es, const BufferUsage *usage)
+{
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ bool has_shared = (usage->shared_blks_hit > 0 ||
+ usage->shared_blks_read > 0 ||
+ usage->shared_blks_dirtied > 0 ||
+ usage->shared_blks_written > 0);
+ bool has_local = (usage->local_blks_hit > 0 ||
+ usage->local_blks_read > 0 ||
+ usage->local_blks_dirtied > 0 ||
+ usage->local_blks_written > 0);
+ bool has_temp = (usage->temp_blks_read > 0 ||
+ usage->temp_blks_written > 0);
+ bool has_timing = (!INSTR_TIME_IS_ZERO(usage->blk_read_time) ||
+ !INSTR_TIME_IS_ZERO(usage->blk_write_time));
+
+ /* Show only positive counter values. */
+ if (has_shared || has_local || has_temp)
+ {
+ appendStringInfoSpaces(es->str, es->indent * 2);
+ appendStringInfoString(es->str, "Buffers:");
+
+ if (has_shared)
+ {
+ appendStringInfoString(es->str, " shared");
+ if (usage->shared_blks_hit > 0)
+ appendStringInfo(es->str, " hit=%ld",
+ usage->shared_blks_hit);
+ if (usage->shared_blks_read > 0)
+ appendStringInfo(es->str, " read=%ld",
+ usage->shared_blks_read);
+ if (usage->shared_blks_dirtied > 0)
+ appendStringInfo(es->str, " dirtied=%ld",
+ usage->shared_blks_dirtied);
+ if (usage->shared_blks_written > 0)
+ appendStringInfo(es->str, " written=%ld",
+ usage->shared_blks_written);
+ if (has_local || has_temp)
+ appendStringInfoChar(es->str, ',');
+ }
+ if (has_local)
+ {
+ appendStringInfoString(es->str, " local");
+ if (usage->local_blks_hit > 0)
+ appendStringInfo(es->str, " hit=%ld",
+ usage->local_blks_hit);
+ if (usage->local_blks_read > 0)
+ appendStringInfo(es->str, " read=%ld",
+ usage->local_blks_read);
+ if (usage->local_blks_dirtied > 0)
+ appendStringInfo(es->str, " dirtied=%ld",
+ usage->local_blks_dirtied);
+ if (usage->local_blks_written > 0)
+ appendStringInfo(es->str, " written=%ld",
+ usage->local_blks_written);
+ if (has_temp)
+ appendStringInfoChar(es->str, ',');
+ }
+ if (has_temp)
+ {
+ appendStringInfoString(es->str, " temp");
+ if (usage->temp_blks_read > 0)
+ appendStringInfo(es->str, " read=%ld",
+ usage->temp_blks_read);
+ if (usage->temp_blks_written > 0)
+ appendStringInfo(es->str, " written=%ld",
+ usage->temp_blks_written);
+ }
+ appendStringInfoChar(es->str, '\n');
+ }
+
+ /* As above, show only positive counter values. */
+ if (has_timing)
+ {
+ appendStringInfoSpaces(es->str, es->indent * 2);
+ appendStringInfoString(es->str, "I/O Timings:");
+ if (!INSTR_TIME_IS_ZERO(usage->blk_read_time))
+ appendStringInfo(es->str, " read=%0.3f",
+ INSTR_TIME_GET_MILLISEC(usage->blk_read_time));
+ if (!INSTR_TIME_IS_ZERO(usage->blk_write_time))
+ appendStringInfo(es->str, " write=%0.3f",
+ INSTR_TIME_GET_MILLISEC(usage->blk_write_time));
+ appendStringInfoChar(es->str, '\n');
+ }
+ }
+ else
+ {
+ ExplainPropertyLong("Shared Hit Blocks", usage->shared_blks_hit, es);
+ ExplainPropertyLong("Shared Read Blocks", usage->shared_blks_read, es);
+ ExplainPropertyLong("Shared Dirtied Blocks", usage->shared_blks_dirtied, es);
+ ExplainPropertyLong("Shared Written Blocks", usage->shared_blks_written, es);
+ ExplainPropertyLong("Local Hit Blocks", usage->local_blks_hit, es);
+ ExplainPropertyLong("Local Read Blocks", usage->local_blks_read, es);
+ ExplainPropertyLong("Local Dirtied Blocks", usage->local_blks_dirtied, es);
+ ExplainPropertyLong("Local Written Blocks", usage->local_blks_written, es);
+ ExplainPropertyLong("Temp Read Blocks", usage->temp_blks_read, es);
+ ExplainPropertyLong("Temp Written Blocks", usage->temp_blks_written, es);
+ ExplainPropertyFloat("I/O Read Time", INSTR_TIME_GET_MILLISEC(usage->blk_read_time), 3, es);
+ ExplainPropertyFloat("I/O Write Time", INSTR_TIME_GET_MILLISEC(usage->blk_write_time), 3, es);
+ }
+}
+
/*
* Add some additional details about an IndexScan or IndexOnlyScan
*/
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
-/* DSM structure for accumulating per-PlanState instrumentation. */
-typedef struct SharedPlanStateInstrumentation
-{
- int plan_node_id;
- slock_t mutex;
- Instrumentation instr;
-} SharedPlanStateInstrumentation;
-
/* DSM structure for accumulating per-PlanState instrumentation. */
struct SharedExecutorInstrumentation
{
int instrument_options;
- int ps_ninstrument; /* # of ps_instrument structures following */
- SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER];
+ int instrument_offset; /* offset of first Instrumentation struct */
+ int num_workers; /* # of workers */
+ int num_plan_nodes; /* # of plan nodes */
+ int plan_node_id[FLEXIBLE_ARRAY_MEMBER]; /* array of plan node IDs */
+ /* array of num_plan_nodes * num_workers Instrumentation objects follows */
};
+#define GetInstrumentationArray(sei) \
+ (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
+ (Instrumentation *) (((char *) sei) + sei->instrument_offset))
/* Context object for ExecParallelEstimate. */
typedef struct ExecParallelEstimateContext
if (planstate == NULL)
return false;
- /* If instrumentation is enabled, initialize array slot for this node. */
+ /* If instrumentation is enabled, initialize slot for this node. */
if (d->instrumentation != NULL)
- {
- SharedPlanStateInstrumentation *instrumentation;
-
- instrumentation = &d->instrumentation->ps_instrument[d->nnodes];
- Assert(d->nnodes < d->instrumentation->ps_ninstrument);
- instrumentation->plan_node_id = planstate->plan->plan_node_id;
- SpinLockInit(&instrumentation->mutex);
- InstrInit(&instrumentation->instr,
- d->instrumentation->instrument_options);
- }
+ d->instrumentation->plan_node_id[d->nnodes] =
+ planstate->plan->plan_node_id;
/* Count this node. */
d->nnodes++;
int pstmt_len;
int param_len;
int instrumentation_len = 0;
+ int instrument_offset = 0;
/* Allocate object for return value. */
pei = palloc0(sizeof(ParallelExecutorInfo));
if (estate->es_instrument)
{
instrumentation_len =
- offsetof(SharedExecutorInstrumentation, ps_instrument)
- + sizeof(SharedPlanStateInstrumentation) * e.nnodes;
+ offsetof(SharedExecutorInstrumentation, plan_node_id)
+ + sizeof(int) * e.nnodes;
+ instrumentation_len = MAXALIGN(instrumentation_len);
+ instrument_offset = instrumentation_len;
+ instrumentation_len += sizeof(Instrumentation) * e.nnodes * nworkers;
shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
*/
if (estate->es_instrument)
{
+ Instrumentation *instrument;
+ int i;
+
instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
instrumentation->instrument_options = estate->es_instrument;
- instrumentation->ps_ninstrument = e.nnodes;
+ instrumentation->instrument_offset = instrument_offset;
+ instrumentation->num_workers = nworkers;
+ instrumentation->num_plan_nodes = e.nnodes;
+ instrument = GetInstrumentationArray(instrumentation);
+ for (i = 0; i < nworkers * e.nnodes; ++i)
+ InstrInit(&instrument[i], estate->es_instrument);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
instrumentation);
pei->instrumentation = instrumentation;
ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation)
{
+ Instrumentation *instrument;
int i;
+ int n;
+ int ibytes;
int plan_node_id = planstate->plan->plan_node_id;
- SharedPlanStateInstrumentation *ps_instrument;
/* Find the instumentation for this node. */
- for (i = 0; i < instrumentation->ps_ninstrument; ++i)
- if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+ for (i = 0; i < instrumentation->num_plan_nodes; ++i)
+ if (instrumentation->plan_node_id[i] == plan_node_id)
break;
- if (i >= instrumentation->ps_ninstrument)
+ if (i >= instrumentation->num_plan_nodes)
elog(ERROR, "plan node %d not found", plan_node_id);
- /* No need to acquire the spinlock here; workers have exited already. */
- ps_instrument = &instrumentation->ps_instrument[i];
- InstrAggNode(planstate->instrument, &ps_instrument->instr);
+ /* Accumulate the statistics from all workers. */
+ instrument = GetInstrumentationArray(instrumentation);
+ instrument += i * instrumentation->num_workers;
+ for (n = 0; n < instrumentation->num_workers; ++n)
+ InstrAggNode(planstate->instrument, &instrument[n]);
+
+ /* Also store the per-worker detail. */
+ ibytes = instrumentation->num_workers * sizeof(Instrumentation);
+ planstate->worker_instrument =
+ palloc(offsetof(WorkerInstrumentation, instrument) + ibytes);
+ planstate->worker_instrument->num_workers = instrumentation->num_workers;
+ memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
instrumentation);
{
int i;
int plan_node_id = planstate->plan->plan_node_id;
- SharedPlanStateInstrumentation *ps_instrument;
+ Instrumentation *instrument;
+
+ InstrEndLoop(planstate->instrument);
/*
* If we shuffled the plan_node_id values in ps_instrument into sorted
* if we're pushing down sufficiently large plan trees. For now, do it
* the slow, dumb way.
*/
- for (i = 0; i < instrumentation->ps_ninstrument; ++i)
- if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+ for (i = 0; i < instrumentation->num_plan_nodes; ++i)
+ if (instrumentation->plan_node_id[i] == plan_node_id)
break;
- if (i >= instrumentation->ps_ninstrument)
+ if (i >= instrumentation->num_plan_nodes)
elog(ERROR, "plan node %d not found", plan_node_id);
/*
- * There's one SharedPlanStateInstrumentation per plan_node_id, so we
- * must use a spinlock in case multiple workers report at the same time.
+ * Add our statistics to the per-node, per-worker totals. It's possible
+ * that this could happen more than once if we relaunched workers.
*/
- ps_instrument = &instrumentation->ps_instrument[i];
- SpinLockAcquire(&ps_instrument->mutex);
- InstrAggNode(&ps_instrument->instr, planstate->instrument);
- SpinLockRelease(&ps_instrument->mutex);
+ instrument = GetInstrumentationArray(instrumentation);
+ instrument += i * instrumentation->num_workers;
+ Assert(IsParallelWorker());
+ Assert(ParallelWorkerNumber < instrumentation->num_workers);
+ InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
instrumentation);