static void
show_hash_info(HashState *hashstate, ExplainState *es)
{
- HashInstrumentation *hinstrument = NULL;
+ HashInstrumentation hinstrument = {0};
/*
* In a parallel query, the leader process may or may not have run the
* hash join, and even if it did it may not have built a hash table due to
* timing (if it started late it might have seen no tuples in the outer
* relation and skipped building the hash table). Therefore we have to be
- * prepared to get instrumentation data from a worker if there is no hash
- * table.
+ * prepared to get instrumentation data from all participants.
*/
if (hashstate->hashtable)
- {
- hinstrument = (HashInstrumentation *)
- palloc(sizeof(HashInstrumentation));
- ExecHashGetInstrumentation(hinstrument, hashstate->hashtable);
- }
- else if (hashstate->shared_info)
+ ExecHashGetInstrumentation(&hinstrument, hashstate->hashtable);
+
+ /*
+ * Merge results from workers. In the parallel-oblivious case, the
+ * results from all participants should be identical, except where
+ * participants didn't run the join at all so have no data. In the
+ * parallel-aware case, we need to consider all the results. Each worker
+ * may have seen a different subset of batches and we want to find the
+ * highest memory usage for any one batch across all batches.
+ */
+ if (hashstate->shared_info)
{
SharedHashInfo *shared_info = hashstate->shared_info;
- int i;
+ int i;
- /* Find the first worker that built a hash table. */
for (i = 0; i < shared_info->num_workers; ++i)
{
- if (shared_info->hinstrument[i].nbatch > 0)
+ HashInstrumentation *worker_hi = &shared_info->hinstrument[i];
+
+ if (worker_hi->nbatch > 0)
{
- hinstrument = &shared_info->hinstrument[i];
- break;
+ /*
+ * Every participant should agree on the buckets, so to be
+ * sure we have a value we'll just overwrite each time.
+ */
+ hinstrument.nbuckets = worker_hi->nbuckets;
+ hinstrument.nbuckets_original = worker_hi->nbuckets_original;
+
+ /*
+ * Normally every participant should agree on the number of
+ * batches too, but it's possible for a backend that started
+ * late and missed the whole join not to have the final nbatch
+ * number. So we'll take the largest number.
+ */
+ hinstrument.nbatch = Max(hinstrument.nbatch, worker_hi->nbatch);
+ hinstrument.nbatch_original = worker_hi->nbatch_original;
+
+ /*
+ * In a parallel-aware hash join, for now we report the
+ * maximum peak memory reported by any worker.
+ */
+ hinstrument.space_peak =
+ Max(hinstrument.space_peak, worker_hi->space_peak);
}
}
}
- if (hinstrument)
+ if (hinstrument.nbatch > 0)
{
- long spacePeakKb = (hinstrument->space_peak + 1023) / 1024;
+ long spacePeakKb = (hinstrument.space_peak + 1023) / 1024;
if (es->format != EXPLAIN_FORMAT_TEXT)
{
- ExplainPropertyLong("Hash Buckets", hinstrument->nbuckets, es);
+ ExplainPropertyLong("Hash Buckets", hinstrument.nbuckets, es);
ExplainPropertyLong("Original Hash Buckets",
- hinstrument->nbuckets_original, es);
- ExplainPropertyLong("Hash Batches", hinstrument->nbatch, es);
+ hinstrument.nbuckets_original, es);
+ ExplainPropertyLong("Hash Batches", hinstrument.nbatch, es);
ExplainPropertyLong("Original Hash Batches",
- hinstrument->nbatch_original, es);
+ hinstrument.nbatch_original, es);
ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
}
- else if (hinstrument->nbatch_original != hinstrument->nbatch ||
- hinstrument->nbuckets_original != hinstrument->nbuckets)
+ else if (hinstrument.nbatch_original != hinstrument.nbatch ||
+ hinstrument.nbuckets_original != hinstrument.nbuckets)
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str,
"Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
- hinstrument->nbuckets,
- hinstrument->nbuckets_original,
- hinstrument->nbatch,
- hinstrument->nbatch_original,
+ hinstrument.nbuckets,
+ hinstrument.nbuckets_original,
+ hinstrument.nbatch,
+ hinstrument.nbatch_original,
spacePeakKb);
}
else
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str,
"Buckets: %d Batches: %d Memory Usage: %ldkB\n",
- hinstrument->nbuckets, hinstrument->nbatch,
+ hinstrument.nbuckets, hinstrument.nbatch,
spacePeakKb);
}
}
batch->buckets = InvalidDsaPointer;
}
}
- ExecParallelHashUpdateSpacePeak(hashtable, curbatch);
+
+ /*
+ * Track the largest batch we've been attached to. Though each
+ * backend might see a different subset of batches, explain.c will
+ * scan the results from all backends to find the largest value.
+ */
+ hashtable->spacePeak =
+ Max(hashtable->spacePeak,
+ batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+
/* Remember that we are not attached to a batch. */
hashtable->curbatch = -1;
}
return true;
}
-
-/*
- * Update this backend's copy of hashtable->spacePeak to account for a given
- * batch. This is called at the end of hashing for batch 0, and then for each
- * batch when it is done or discovered to be already done. The result is used
- * for EXPLAIN output.
- */
-void
-ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno)
-{
- size_t size;
-
- size = hashtable->batches[batchno].shared->size;
- size += sizeof(dsa_pointer_atomic) * hashtable->nbuckets;
- hashtable->spacePeak = Max(hashtable->spacePeak, size);
-}