From: Andres Freund Date: Tue, 5 Dec 2017 18:55:56 +0000 (-0800) Subject: Fix EXPLAIN ANALYZE of hash join when the leader doesn't participate. X-Git-Tag: REL_11_BETA1~1110 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=5bcf389ecfd40daf92238e1abbff4fc4d3f18b33;p=postgresql Fix EXPLAIN ANALYZE of hash join when the leader doesn't participate. If a hash join appears in a parallel query, there may be no hash table available for explain.c to inspect even though a hash table may have been built in other processes. This could happen either because parallel_leader_participation was set to off or because the leader happened to hit the end of the outer relation immediately (even though the complete relation is not empty) and decided not to build the hash table. Commit bf11e7ee introduced a way for workers to exchange instrumentation via the DSM segment for Sort nodes even though they are not parallel-aware. This commit does the same for Hash nodes, so that explain.c has a way to find instrumentation data from an arbitrary participant that actually built the hash table. Author: Thomas Munro Reviewed-By: Andres Freund Discussion: https://postgr.es/m/CAEepm%3D3DUQC2-z252N55eOcZBer6DPdM%3DFzrxH9dZc5vYLsjaA%40mail.gmail.com --- diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 447f69d044..7e4fbafc53 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -19,7 +19,7 @@ #include "commands/createas.h" #include "commands/defrem.h" #include "commands/prepare.h" -#include "executor/hashjoin.h" +#include "executor/nodeHash.h" #include "foreign/fdwapi.h" #include "nodes/extensible.h" #include "nodes/nodeFuncs.h" @@ -2379,34 +2379,62 @@ show_sort_info(SortState *sortstate, ExplainState *es) static void show_hash_info(HashState *hashstate, ExplainState *es) { - HashJoinTable hashtable; + HashInstrumentation *hinstrument = NULL; - hashtable = hashstate->hashtable; + /* + * In a parallel query, the leader process may or may not have run the + * hash join, and even if it did it may not have built a hash table due to + * timing (if it started late it might have seen no tuples in the outer + * relation and skipped building the hash table). Therefore we have to be + * prepared to get instrumentation data from a worker if there is no hash + * table. + */ + if (hashstate->hashtable) + { + hinstrument = (HashInstrumentation *) + palloc(sizeof(HashInstrumentation)); + ExecHashGetInstrumentation(hinstrument, hashstate->hashtable); + } + else if (hashstate->shared_info) + { + SharedHashInfo *shared_info = hashstate->shared_info; + int i; + + /* Find the first worker that built a hash table. */ + for (i = 0; i < shared_info->num_workers; ++i) + { + if (shared_info->hinstrument[i].nbatch > 0) + { + hinstrument = &shared_info->hinstrument[i]; + break; + } + } + } - if (hashtable) + if (hinstrument) { - long spacePeakKb = (hashtable->spacePeak + 1023) / 1024; + long spacePeakKb = (hinstrument->space_peak + 1023) / 1024; if (es->format != EXPLAIN_FORMAT_TEXT) { - ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es); + ExplainPropertyLong("Hash Buckets", hinstrument->nbuckets, es); ExplainPropertyLong("Original Hash Buckets", - hashtable->nbuckets_original, es); - ExplainPropertyLong("Hash Batches", hashtable->nbatch, es); + hinstrument->nbuckets_original, es); + ExplainPropertyLong("Hash Batches", hinstrument->nbatch, es); ExplainPropertyLong("Original Hash Batches", - hashtable->nbatch_original, es); + hinstrument->nbatch_original, es); ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es); } - else if (hashtable->nbatch_original != hashtable->nbatch || - hashtable->nbuckets_original != hashtable->nbuckets) + else if (hinstrument->nbatch_original != hinstrument->nbatch || + hinstrument->nbuckets_original != hinstrument->nbuckets) { appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", - hashtable->nbuckets, - hashtable->nbuckets_original, - hashtable->nbatch, - hashtable->nbatch_original, + hinstrument->nbuckets, + hinstrument->nbuckets_original, + hinstrument->nbatch, + hinstrument->nbatch_original, spacePeakKb); } else @@ -2414,7 +2442,7 @@ show_hash_info(HashState *hashstate, ExplainState *es) appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, "Buckets: %d Batches: %d Memory Usage: %ldkB\n", - hashtable->nbuckets, hashtable->nbatch, + hinstrument->nbuckets, hinstrument->nbatch, spacePeakKb); } } diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 53c5254be1..0aca00b0e6 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -29,6 +29,7 @@ #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" +#include "executor/nodeHash.h" #include "executor/nodeIndexscan.h" #include "executor/nodeIndexonlyscan.h" #include "executor/nodeSeqscan.h" @@ -259,8 +260,12 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate, e->pcxt); break; + case T_HashState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecHashEstimate((HashState *) planstate, e->pcxt); + break; case T_SortState: - /* even when not parallel-aware */ + /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecSortEstimate((SortState *) planstate, e->pcxt); break; @@ -458,8 +463,12 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate, d->pcxt); break; + case T_HashState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecHashInitializeDSM((HashState *) planstate, d->pcxt); + break; case T_SortState: - /* even when not parallel-aware */ + /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecSortInitializeDSM((SortState *) planstate, d->pcxt); break; @@ -872,8 +881,12 @@ ExecParallelReInitializeDSM(PlanState *planstate, ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, pcxt); break; + case T_HashState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecHashReInitializeDSM((HashState *) planstate, pcxt); + break; case T_SortState: - /* even when not parallel-aware */ + /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecSortReInitializeDSM((SortState *) planstate, pcxt); break; @@ -928,12 +941,18 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, planstate->worker_instrument->num_workers = instrumentation->num_workers; memcpy(&planstate->worker_instrument->instrument, instrument, ibytes); - /* - * Perform any node-type-specific work that needs to be done. Currently, - * only Sort nodes need to do anything here. - */ - if (IsA(planstate, SortState)) - ExecSortRetrieveInstrumentation((SortState *) planstate); + /* Perform any node-type-specific work that needs to be done. */ + switch (nodeTag(planstate)) + { + case T_SortState: + ExecSortRetrieveInstrumentation((SortState *) planstate); + break; + case T_HashState: + ExecHashRetrieveInstrumentation((HashState *) planstate); + break; + default: + break; + } return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation, instrumentation); @@ -1160,8 +1179,12 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, pwcxt); break; + case T_HashState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecHashInitializeWorker((HashState *) planstate, pwcxt); + break; case T_SortState: - /* even when not parallel-aware */ + /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecSortInitializeWorker((SortState *) planstate, pwcxt); break; diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index c1aa5064c9..9befca9016 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -751,6 +751,9 @@ ExecShutdownNode(PlanState *node) case T_GatherMergeState: ExecShutdownGatherMerge((GatherMergeState *) node); break; + case T_HashState: + ExecShutdownHash((HashState *) node); + break; default: break; } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index f7cd8fb347..6fe5d69d55 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1637,6 +1637,110 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) } } +/* + * Reserve space in the DSM segment for instrumentation data. + */ +void +ExecHashEstimate(HashState *node, ParallelContext *pcxt) +{ + size_t size; + + size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation)); + size = add_size(size, offsetof(SharedHashInfo, hinstrument)); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* + * Set up a space in the DSM for all workers to record instrumentation data + * about their hash table. + */ +void +ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) +{ + size_t size; + + size = offsetof(SharedHashInfo, hinstrument) + + pcxt->nworkers * sizeof(HashInstrumentation); + node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size); + memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; + shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, + node->shared_info); +} + +/* + * Reset shared state before beginning a fresh scan. + */ +void +ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt) +{ + if (node->shared_info != NULL) + { + memset(node->shared_info->hinstrument, 0, + node->shared_info->num_workers * sizeof(HashInstrumentation)); + } +} + +/* + * Locate the DSM space for hash table instrumentation data that we'll write + * to at shutdown time. + */ +void +ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt) +{ + SharedHashInfo *shared_info; + + shared_info = (SharedHashInfo *) + shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, true); + node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber]; +} + +/* + * Copy instrumentation data from this worker's hash table (if it built one) + * to DSM memory so the leader can retrieve it. This must be done in an + * ExecShutdownHash() rather than ExecEndHash() because the latter runs after + * we've detached from the DSM segment. + */ +void +ExecShutdownHash(HashState *node) +{ + if (node->hinstrument && node->hashtable) + ExecHashGetInstrumentation(node->hinstrument, node->hashtable); +} + +/* + * Retrieve instrumentation data from workers before the DSM segment is + * detached, so that EXPLAIN can access it. + */ +void +ExecHashRetrieveInstrumentation(HashState *node) +{ + SharedHashInfo *shared_info = node->shared_info; + size_t size; + + /* Replace node->shared_info with a copy in backend-local memory. */ + size = offsetof(SharedHashInfo, hinstrument) + + shared_info->num_workers * sizeof(HashInstrumentation); + node->shared_info = palloc(size); + memcpy(node->shared_info, shared_info, size); +} + +/* + * Copy the instrumentation data from 'hashtable' into a HashInstrumentation + * struct. + */ +void +ExecHashGetInstrumentation(HashInstrumentation *instrument, + HashJoinTable hashtable) +{ + instrument->nbuckets = hashtable->nbuckets; + instrument->nbuckets_original = hashtable->nbuckets_original; + instrument->nbatch = hashtable->nbatch; + instrument->nbatch_original = hashtable->nbatch_original; + instrument->space_peak = hashtable->spacePeak; +} + /* * Allocate 'size' bytes from the currently active HashMemoryChunk */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 3ae556fb6c..75d4c70f6f 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -14,6 +14,7 @@ #ifndef NODEHASH_H #define NODEHASH_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags); @@ -48,5 +49,13 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, int *numbatches, int *num_skew_mcvs); extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); +extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt); +extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt); +extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt); +extern void ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt); +extern void ExecHashRetrieveInstrumentation(HashState *node); +extern void ExecShutdownHash(HashState *node); +extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, + HashJoinTable hashtable); #endif /* NODEHASH_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index e05bc04f52..084d59ef83 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1980,6 +1980,29 @@ typedef struct GatherMergeState struct binaryheap *gm_heap; /* binary heap of slot indices */ } GatherMergeState; +/* ---------------- + * Values displayed by EXPLAIN ANALYZE + * ---------------- + */ +typedef struct HashInstrumentation +{ + int nbuckets; /* number of buckets at end of execution */ + int nbuckets_original; /* planned number of buckets */ + int nbatch; /* number of batches at end of execution */ + int nbatch_original; /* planned number of batches */ + size_t space_peak; /* speak memory usage in bytes */ +} HashInstrumentation; + +/* ---------------- + * Shared memory container for per-worker hash information + * ---------------- + */ +typedef struct SharedHashInfo +{ + int num_workers; + HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]; +} SharedHashInfo; + /* ---------------- * HashState information * ---------------- @@ -1990,6 +2013,9 @@ typedef struct HashState HashJoinTable hashtable; /* hash table for the hashjoin */ List *hashkeys; /* list of ExprState nodes */ /* hashkeys is same as parent's hj_InnerHashKeys */ + + SharedHashInfo *shared_info; /* one entry per worker */ + HashInstrumentation *hinstrument; /* this worker's entry */ } HashState; /* ---------------- diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out index b7d1790097..001d96dc2d 100644 --- a/src/test/regress/expected/join.out +++ b/src/test/regress/expected/join.out @@ -6173,6 +6173,21 @@ $$); rollback to settings; -- A couple of other hash join tests unrelated to work_mem management. +-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '4MB'; +set local parallel_leader_participation = off; +select * from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); + original | final +----------+------- + 1 | 1 +(1 row) + +rollback to settings; -- A full outer join where every record is matched. -- non-parallel savepoint settings; diff --git a/src/test/regress/sql/join.sql b/src/test/regress/sql/join.sql index c6d4a513e8..882601b338 100644 --- a/src/test/regress/sql/join.sql +++ b/src/test/regress/sql/join.sql @@ -2159,6 +2159,17 @@ rollback to settings; -- A couple of other hash join tests unrelated to work_mem management. +-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '4MB'; +set local parallel_leader_participation = off; +select * from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); +rollback to settings; + -- A full outer join where every record is matched. -- non-parallel