]> granicus.if.org Git - postgresql/commitdiff
Fix EXPLAIN ANALYZE of hash join when the leader doesn't participate.
authorAndres Freund <andres@anarazel.de>
Tue, 5 Dec 2017 18:55:56 +0000 (10:55 -0800)
committerAndres Freund <andres@anarazel.de>
Tue, 5 Dec 2017 18:55:56 +0000 (10:55 -0800)
If a hash join appears in a parallel query, there may be no hash table
available for explain.c to inspect even though a hash table may have
been built in other processes.  This could happen either because
parallel_leader_participation was set to off or because the leader
happened to hit the end of the outer relation immediately (even though
the complete relation is not empty) and decided not to build the hash
table.

Commit bf11e7ee introduced a way for workers to exchange
instrumentation via the DSM segment for Sort nodes even though they
are not parallel-aware.  This commit does the same for Hash nodes, so
that explain.c has a way to find instrumentation data from an
arbitrary participant that actually built the hash table.

Author: Thomas Munro
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/CAEepm%3D3DUQC2-z252N55eOcZBer6DPdM%3DFzrxH9dZc5vYLsjaA%40mail.gmail.com

src/backend/commands/explain.c
src/backend/executor/execParallel.c
src/backend/executor/execProcnode.c
src/backend/executor/nodeHash.c
src/include/executor/nodeHash.h
src/include/nodes/execnodes.h
src/test/regress/expected/join.out
src/test/regress/sql/join.sql

index 447f69d044e96f713f525131665fc5cdfb1b0a26..7e4fbafc535a83c71cb41edace346e9a02cb35e5 100644 (file)
@@ -19,7 +19,7 @@
 #include "commands/createas.h"
 #include "commands/defrem.h"
 #include "commands/prepare.h"
-#include "executor/hashjoin.h"
+#include "executor/nodeHash.h"
 #include "foreign/fdwapi.h"
 #include "nodes/extensible.h"
 #include "nodes/nodeFuncs.h"
@@ -2379,34 +2379,62 @@ show_sort_info(SortState *sortstate, ExplainState *es)
 static void
 show_hash_info(HashState *hashstate, ExplainState *es)
 {
-       HashJoinTable hashtable;
+       HashInstrumentation *hinstrument = NULL;
 
-       hashtable = hashstate->hashtable;
+       /*
+        * In a parallel query, the leader process may or may not have run the
+        * hash join, and even if it did it may not have built a hash table due to
+        * timing (if it started late it might have seen no tuples in the outer
+        * relation and skipped building the hash table).  Therefore we have to be
+        * prepared to get instrumentation data from a worker if there is no hash
+        * table.
+        */
+       if (hashstate->hashtable)
+       {
+               hinstrument = (HashInstrumentation *)
+                       palloc(sizeof(HashInstrumentation));
+               ExecHashGetInstrumentation(hinstrument, hashstate->hashtable);
+       }
+       else if (hashstate->shared_info)
+       {
+               SharedHashInfo *shared_info = hashstate->shared_info;
+               int             i;
+
+               /* Find the first worker that built a hash table. */
+               for (i = 0; i < shared_info->num_workers; ++i)
+               {
+                       if (shared_info->hinstrument[i].nbatch > 0)
+                       {
+                               hinstrument = &shared_info->hinstrument[i];
+                               break;
+                       }
+               }
+       }
 
-       if (hashtable)
+       if (hinstrument)
        {
-               long            spacePeakKb = (hashtable->spacePeak + 1023) / 1024;
+               long            spacePeakKb = (hinstrument->space_peak + 1023) / 1024;
 
                if (es->format != EXPLAIN_FORMAT_TEXT)
                {
-                       ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
+                       ExplainPropertyLong("Hash Buckets", hinstrument->nbuckets, es);
                        ExplainPropertyLong("Original Hash Buckets",
-                                                               hashtable->nbuckets_original, es);
-                       ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
+                                                               hinstrument->nbuckets_original, es);
+                       ExplainPropertyLong("Hash Batches", hinstrument->nbatch, es);
                        ExplainPropertyLong("Original Hash Batches",
-                                                               hashtable->nbatch_original, es);
+                                                               hinstrument->nbatch_original, es);
                        ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
                }
-               else if (hashtable->nbatch_original != hashtable->nbatch ||
-                                hashtable->nbuckets_original != hashtable->nbuckets)
+               else if (hinstrument->nbatch_original != hinstrument->nbatch ||
+                                hinstrument->nbuckets_original != hinstrument->nbuckets)
                {
                        appendStringInfoSpaces(es->str, es->indent * 2);
                        appendStringInfo(es->str,
                                                         "Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-                                                        hashtable->nbuckets,
-                                                        hashtable->nbuckets_original,
-                                                        hashtable->nbatch,
-                                                        hashtable->nbatch_original,
+                                                        hinstrument->nbuckets,
+                                                        hinstrument->nbuckets_original,
+                                                        hinstrument->nbatch,
+                                                        hinstrument->nbatch_original,
                                                         spacePeakKb);
                }
                else
@@ -2414,7 +2442,7 @@ show_hash_info(HashState *hashstate, ExplainState *es)
                        appendStringInfoSpaces(es->str, es->indent * 2);
                        appendStringInfo(es->str,
                                                         "Buckets: %d  Batches: %d  Memory Usage: %ldkB\n",
-                                                        hashtable->nbuckets, hashtable->nbatch,
+                                                        hinstrument->nbuckets, hinstrument->nbatch,
                                                         spacePeakKb);
                }
        }
index 53c5254be13c43b8a2959a04c68ed2bcf97be11b..0aca00b0e68ca2608ed9bff4ed79e0355fde0165 100644 (file)
@@ -29,6 +29,7 @@
 #include "executor/nodeBitmapHeapscan.h"
 #include "executor/nodeCustom.h"
 #include "executor/nodeForeignscan.h"
+#include "executor/nodeHash.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeSeqscan.h"
@@ -259,8 +260,12 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
                                ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
                                                                           e->pcxt);
                        break;
+               case T_HashState:
+                       /* even when not parallel-aware, for EXPLAIN ANALYZE */
+                       ExecHashEstimate((HashState *) planstate, e->pcxt);
+                       break;
                case T_SortState:
-                       /* even when not parallel-aware */
+                       /* even when not parallel-aware, for EXPLAIN ANALYZE */
                        ExecSortEstimate((SortState *) planstate, e->pcxt);
                        break;
 
@@ -458,8 +463,12 @@ ExecParallelInitializeDSM(PlanState *planstate,
                                ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
                                                                                        d->pcxt);
                        break;
+               case T_HashState:
+                       /* even when not parallel-aware, for EXPLAIN ANALYZE */
+                       ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
+                       break;
                case T_SortState:
-                       /* even when not parallel-aware */
+                       /* even when not parallel-aware, for EXPLAIN ANALYZE */
                        ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
                        break;
 
@@ -872,8 +881,12 @@ ExecParallelReInitializeDSM(PlanState *planstate,
                                ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
                                                                                          pcxt);
                        break;
+               case T_HashState:
+                       /* even when not parallel-aware, for EXPLAIN ANALYZE */
+                       ExecHashReInitializeDSM((HashState *) planstate, pcxt);
+                       break;
                case T_SortState:
-                       /* even when not parallel-aware */
+                       /* even when not parallel-aware, for EXPLAIN ANALYZE */
                        ExecSortReInitializeDSM((SortState *) planstate, pcxt);
                        break;
 
@@ -928,12 +941,18 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
        planstate->worker_instrument->num_workers = instrumentation->num_workers;
        memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
 
-       /*
-        * Perform any node-type-specific work that needs to be done.  Currently,
-        * only Sort nodes need to do anything here.
-        */
-       if (IsA(planstate, SortState))
-               ExecSortRetrieveInstrumentation((SortState *) planstate);
+       /* Perform any node-type-specific work that needs to be done. */
+       switch (nodeTag(planstate))
+       {
+               case T_SortState:
+                       ExecSortRetrieveInstrumentation((SortState *) planstate);
+                       break;
+               case T_HashState:
+                       ExecHashRetrieveInstrumentation((HashState *) planstate);
+                       break;
+               default:
+                       break;
+       }
 
        return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
                                                                 instrumentation);
@@ -1160,8 +1179,12 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
                                ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
                                                                                           pwcxt);
                        break;
+               case T_HashState:
+                       /* even when not parallel-aware, for EXPLAIN ANALYZE */
+                       ExecHashInitializeWorker((HashState *) planstate, pwcxt);
+                       break;
                case T_SortState:
-                       /* even when not parallel-aware */
+                       /* even when not parallel-aware, for EXPLAIN ANALYZE */
                        ExecSortInitializeWorker((SortState *) planstate, pwcxt);
                        break;
 
index c1aa5064c909776780e33ad72fbfb9556dec5392..9befca901615cbd9234c60f1761e178d68595c44 100644 (file)
@@ -751,6 +751,9 @@ ExecShutdownNode(PlanState *node)
                case T_GatherMergeState:
                        ExecShutdownGatherMerge((GatherMergeState *) node);
                        break;
+               case T_HashState:
+                       ExecShutdownHash((HashState *) node);
+                       break;
                default:
                        break;
        }
index f7cd8fb34725596c1956e3d8c232bb0bf1b45bd2..6fe5d69d5589086da7524d09dbba98242916aed1 100644 (file)
@@ -1637,6 +1637,110 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
        }
 }
 
+/*
+ * Reserve space in the DSM segment for instrumentation data.
+ */
+void
+ExecHashEstimate(HashState *node, ParallelContext *pcxt)
+{
+       size_t          size;
+
+       size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
+       size = add_size(size, offsetof(SharedHashInfo, hinstrument));
+       shm_toc_estimate_chunk(&pcxt->estimator, size);
+       shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Set up a space in the DSM for all workers to record instrumentation data
+ * about their hash table.
+ */
+void
+ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
+{
+       size_t          size;
+
+       size = offsetof(SharedHashInfo, hinstrument) +
+               pcxt->nworkers * sizeof(HashInstrumentation);
+       node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
+       memset(node->shared_info, 0, size);
+       node->shared_info->num_workers = pcxt->nworkers;
+       shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
+                                  node->shared_info);
+}
+
+/*
+ * Reset shared state before beginning a fresh scan.
+ */
+void
+ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt)
+{
+       if (node->shared_info != NULL)
+       {
+               memset(node->shared_info->hinstrument, 0,
+                          node->shared_info->num_workers * sizeof(HashInstrumentation));
+       }
+}
+
+/*
+ * Locate the DSM space for hash table instrumentation data that we'll write
+ * to at shutdown time.
+ */
+void
+ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
+{
+       SharedHashInfo *shared_info;
+
+       shared_info = (SharedHashInfo *)
+               shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, true);
+       node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
+}
+
+/*
+ * Copy instrumentation data from this worker's hash table (if it built one)
+ * to DSM memory so the leader can retrieve it.  This must be done in an
+ * ExecShutdownHash() rather than ExecEndHash() because the latter runs after
+ * we've detached from the DSM segment.
+ */
+void
+ExecShutdownHash(HashState *node)
+{
+       if (node->hinstrument && node->hashtable)
+               ExecHashGetInstrumentation(node->hinstrument, node->hashtable);
+}
+
+/*
+ * Retrieve instrumentation data from workers before the DSM segment is
+ * detached, so that EXPLAIN can access it.
+ */
+void
+ExecHashRetrieveInstrumentation(HashState *node)
+{
+       SharedHashInfo *shared_info = node->shared_info;
+       size_t          size;
+
+       /* Replace node->shared_info with a copy in backend-local memory. */
+       size = offsetof(SharedHashInfo, hinstrument) +
+               shared_info->num_workers * sizeof(HashInstrumentation);
+       node->shared_info = palloc(size);
+       memcpy(node->shared_info, shared_info, size);
+}
+
+/*
+ * Copy the instrumentation data from 'hashtable' into a HashInstrumentation
+ * struct.
+ */
+void
+ExecHashGetInstrumentation(HashInstrumentation *instrument,
+                                                  HashJoinTable hashtable)
+{
+       instrument->nbuckets = hashtable->nbuckets;
+       instrument->nbuckets_original = hashtable->nbuckets_original;
+       instrument->nbatch = hashtable->nbatch;
+       instrument->nbatch_original = hashtable->nbatch_original;
+       instrument->space_peak = hashtable->spacePeak;
+}
+
 /*
  * Allocate 'size' bytes from the currently active HashMemoryChunk
  */
index 3ae556fb6c5cc5355df8327906cd58029008e6da..75d4c70f6f6f15aeb451fe0ef65823088fe749fd 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef NODEHASH_H
 #define NODEHASH_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
@@ -48,5 +49,13 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
                                                int *numbatches,
                                                int *num_skew_mcvs);
 extern int     ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
+extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
+extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt);
+extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt);
+extern void ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt);
+extern void ExecHashRetrieveInstrumentation(HashState *node);
+extern void ExecShutdownHash(HashState *node);
+extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
+                                                                          HashJoinTable hashtable);
 
 #endif                                                 /* NODEHASH_H */
index e05bc04f5259e9b51dcb076e24dcab8137cfcb4a..084d59ef834af0a218894c448ea6463f42a277f4 100644 (file)
@@ -1980,6 +1980,29 @@ typedef struct GatherMergeState
        struct binaryheap *gm_heap; /* binary heap of slot indices */
 } GatherMergeState;
 
+/* ----------------
+ *      Values displayed by EXPLAIN ANALYZE
+ * ----------------
+ */
+typedef struct HashInstrumentation
+{
+       int                     nbuckets;               /* number of buckets at end of execution */
+       int                     nbuckets_original;      /* planned number of buckets */
+       int                     nbatch;                 /* number of batches at end of execution */
+       int                     nbatch_original;        /* planned number of batches */
+       size_t          space_peak;             /* speak memory usage in bytes */
+} HashInstrumentation;
+
+/* ----------------
+ *      Shared memory container for per-worker hash information
+ * ----------------
+ */
+typedef struct SharedHashInfo
+{
+       int                     num_workers;
+       HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER];
+} SharedHashInfo;
+
 /* ----------------
  *      HashState information
  * ----------------
@@ -1990,6 +2013,9 @@ typedef struct HashState
        HashJoinTable hashtable;        /* hash table for the hashjoin */
        List       *hashkeys;           /* list of ExprState nodes */
        /* hashkeys is same as parent's hj_InnerHashKeys */
+
+       SharedHashInfo *shared_info;    /* one entry per worker */
+       HashInstrumentation *hinstrument;       /* this worker's entry */
 } HashState;
 
 /* ----------------
index b7d1790097869fbc2bee7f54cad7ed04c1d632be..001d96dc2d886f6b0e6fc8cf1255294d1edcdcc6 100644 (file)
@@ -6173,6 +6173,21 @@ $$);
 
 rollback to settings;
 -- A couple of other hash join tests unrelated to work_mem management.
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     1
+(1 row)
+
+rollback to settings;
 -- A full outer join where every record is matched.
 -- non-parallel
 savepoint settings;
index c6d4a513e86e7497b1715c666455ddceb69a94e4..882601b33882f7675f9b2cec5c0aab80f467ce50 100644 (file)
@@ -2159,6 +2159,17 @@ rollback to settings;
 
 -- A couple of other hash join tests unrelated to work_mem management.
 
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
 -- A full outer join where every record is matched.
 
 -- non-parallel