#include "commands/createas.h"
#include "commands/defrem.h"
#include "commands/prepare.h"
-#include "executor/hashjoin.h"
+#include "executor/nodeHash.h"
#include "foreign/fdwapi.h"
#include "nodes/extensible.h"
#include "nodes/nodeFuncs.h"
static void
show_hash_info(HashState *hashstate, ExplainState *es)
{
- HashJoinTable hashtable;
+ HashInstrumentation *hinstrument = NULL;
- hashtable = hashstate->hashtable;
+ /*
+ * In a parallel query, the leader process may or may not have run the
+ * hash join, and even if it did it may not have built a hash table due to
+ * timing (if it started late it might have seen no tuples in the outer
+ * relation and skipped building the hash table). Therefore we have to be
+ * prepared to get instrumentation data from a worker if there is no hash
+ * table.
+ */
+ if (hashstate->hashtable)
+ {
+ hinstrument = (HashInstrumentation *)
+ palloc(sizeof(HashInstrumentation));
+ ExecHashGetInstrumentation(hinstrument, hashstate->hashtable);
+ }
+ else if (hashstate->shared_info)
+ {
+ SharedHashInfo *shared_info = hashstate->shared_info;
+ int i;
+
+ /* Find the first worker that built a hash table. */
+ for (i = 0; i < shared_info->num_workers; ++i)
+ {
+ if (shared_info->hinstrument[i].nbatch > 0)
+ {
+ hinstrument = &shared_info->hinstrument[i];
+ break;
+ }
+ }
+ }
- if (hashtable)
+ if (hinstrument)
{
- long spacePeakKb = (hashtable->spacePeak + 1023) / 1024;
+ long spacePeakKb = (hinstrument->space_peak + 1023) / 1024;
if (es->format != EXPLAIN_FORMAT_TEXT)
{
- ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
+ ExplainPropertyLong("Hash Buckets", hinstrument->nbuckets, es);
ExplainPropertyLong("Original Hash Buckets",
- hashtable->nbuckets_original, es);
- ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
+ hinstrument->nbuckets_original, es);
+ ExplainPropertyLong("Hash Batches", hinstrument->nbatch, es);
ExplainPropertyLong("Original Hash Batches",
- hashtable->nbatch_original, es);
+ hinstrument->nbatch_original, es);
ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
}
- else if (hashtable->nbatch_original != hashtable->nbatch ||
- hashtable->nbuckets_original != hashtable->nbuckets)
+ else if (hinstrument->nbatch_original != hinstrument->nbatch ||
+ hinstrument->nbuckets_original != hinstrument->nbuckets)
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str,
"Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
- hashtable->nbuckets,
- hashtable->nbuckets_original,
- hashtable->nbatch,
- hashtable->nbatch_original,
+ hinstrument->nbuckets,
+ hinstrument->nbuckets_original,
+ hinstrument->nbatch,
+ hinstrument->nbatch_original,
spacePeakKb);
}
else
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str,
"Buckets: %d Batches: %d Memory Usage: %ldkB\n",
- hashtable->nbuckets, hashtable->nbatch,
+ hinstrument->nbuckets, hinstrument->nbatch,
spacePeakKb);
}
}
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
+#include "executor/nodeHash.h"
#include "executor/nodeIndexscan.h"
#include "executor/nodeIndexonlyscan.h"
#include "executor/nodeSeqscan.h"
ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
e->pcxt);
break;
+ case T_HashState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecHashEstimate((HashState *) planstate, e->pcxt);
+ break;
case T_SortState:
- /* even when not parallel-aware */
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortEstimate((SortState *) planstate, e->pcxt);
break;
ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
d->pcxt);
break;
+ case T_HashState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
+ break;
case T_SortState:
- /* even when not parallel-aware */
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
break;
ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
pcxt);
break;
+ case T_HashState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecHashReInitializeDSM((HashState *) planstate, pcxt);
+ break;
case T_SortState:
- /* even when not parallel-aware */
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortReInitializeDSM((SortState *) planstate, pcxt);
break;
planstate->worker_instrument->num_workers = instrumentation->num_workers;
memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
- /*
- * Perform any node-type-specific work that needs to be done. Currently,
- * only Sort nodes need to do anything here.
- */
- if (IsA(planstate, SortState))
- ExecSortRetrieveInstrumentation((SortState *) planstate);
+ /* Perform any node-type-specific work that needs to be done. */
+ switch (nodeTag(planstate))
+ {
+ case T_SortState:
+ ExecSortRetrieveInstrumentation((SortState *) planstate);
+ break;
+ case T_HashState:
+ ExecHashRetrieveInstrumentation((HashState *) planstate);
+ break;
+ default:
+ break;
+ }
return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
instrumentation);
ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
pwcxt);
break;
+ case T_HashState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecHashInitializeWorker((HashState *) planstate, pwcxt);
+ break;
case T_SortState:
- /* even when not parallel-aware */
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortInitializeWorker((SortState *) planstate, pwcxt);
break;
case T_GatherMergeState:
ExecShutdownGatherMerge((GatherMergeState *) node);
break;
+ case T_HashState:
+ ExecShutdownHash((HashState *) node);
+ break;
default:
break;
}
}
}
+/*
+ * Reserve space in the DSM segment for instrumentation data.
+ */
+void
+ExecHashEstimate(HashState *node, ParallelContext *pcxt)
+{
+ size_t size;
+
+ size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
+ size = add_size(size, offsetof(SharedHashInfo, hinstrument));
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Set up a space in the DSM for all workers to record instrumentation data
+ * about their hash table.
+ */
+void
+ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
+{
+ size_t size;
+
+ size = offsetof(SharedHashInfo, hinstrument) +
+ pcxt->nworkers * sizeof(HashInstrumentation);
+ node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
+ memset(node->shared_info, 0, size);
+ node->shared_info->num_workers = pcxt->nworkers;
+ shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
+ node->shared_info);
+}
+
+/*
+ * Reset shared state before beginning a fresh scan.
+ */
+void
+ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt)
+{
+ if (node->shared_info != NULL)
+ {
+ memset(node->shared_info->hinstrument, 0,
+ node->shared_info->num_workers * sizeof(HashInstrumentation));
+ }
+}
+
+/*
+ * Locate the DSM space for hash table instrumentation data that we'll write
+ * to at shutdown time.
+ */
+void
+ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
+{
+ SharedHashInfo *shared_info;
+
+ shared_info = (SharedHashInfo *)
+ shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, true);
+ node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
+}
+
+/*
+ * Copy instrumentation data from this worker's hash table (if it built one)
+ * to DSM memory so the leader can retrieve it. This must be done in an
+ * ExecShutdownHash() rather than ExecEndHash() because the latter runs after
+ * we've detached from the DSM segment.
+ */
+void
+ExecShutdownHash(HashState *node)
+{
+ if (node->hinstrument && node->hashtable)
+ ExecHashGetInstrumentation(node->hinstrument, node->hashtable);
+}
+
+/*
+ * Retrieve instrumentation data from workers before the DSM segment is
+ * detached, so that EXPLAIN can access it.
+ */
+void
+ExecHashRetrieveInstrumentation(HashState *node)
+{
+ SharedHashInfo *shared_info = node->shared_info;
+ size_t size;
+
+ /* Replace node->shared_info with a copy in backend-local memory. */
+ size = offsetof(SharedHashInfo, hinstrument) +
+ shared_info->num_workers * sizeof(HashInstrumentation);
+ node->shared_info = palloc(size);
+ memcpy(node->shared_info, shared_info, size);
+}
+
+/*
+ * Copy the instrumentation data from 'hashtable' into a HashInstrumentation
+ * struct.
+ */
+void
+ExecHashGetInstrumentation(HashInstrumentation *instrument,
+ HashJoinTable hashtable)
+{
+ instrument->nbuckets = hashtable->nbuckets;
+ instrument->nbuckets_original = hashtable->nbuckets_original;
+ instrument->nbatch = hashtable->nbatch;
+ instrument->nbatch_original = hashtable->nbatch_original;
+ instrument->space_peak = hashtable->spacePeak;
+}
+
/*
* Allocate 'size' bytes from the currently active HashMemoryChunk
*/
#ifndef NODEHASH_H
#define NODEHASH_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
int *numbatches,
int *num_skew_mcvs);
extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
+extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
+extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt);
+extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt);
+extern void ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt);
+extern void ExecHashRetrieveInstrumentation(HashState *node);
+extern void ExecShutdownHash(HashState *node);
+extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
+ HashJoinTable hashtable);
#endif /* NODEHASH_H */
struct binaryheap *gm_heap; /* binary heap of slot indices */
} GatherMergeState;
+/* ----------------
+ * Values displayed by EXPLAIN ANALYZE
+ * ----------------
+ */
+typedef struct HashInstrumentation
+{
+ int nbuckets; /* number of buckets at end of execution */
+ int nbuckets_original; /* planned number of buckets */
+ int nbatch; /* number of batches at end of execution */
+ int nbatch_original; /* planned number of batches */
+ size_t space_peak; /* speak memory usage in bytes */
+} HashInstrumentation;
+
+/* ----------------
+ * Shared memory container for per-worker hash information
+ * ----------------
+ */
+typedef struct SharedHashInfo
+{
+ int num_workers;
+ HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER];
+} SharedHashInfo;
+
/* ----------------
* HashState information
* ----------------
HashJoinTable hashtable; /* hash table for the hashjoin */
List *hashkeys; /* list of ExprState nodes */
/* hashkeys is same as parent's hj_InnerHashKeys */
+
+ SharedHashInfo *shared_info; /* one entry per worker */
+ HashInstrumentation *hinstrument; /* this worker's entry */
} HashState;
/* ----------------
rollback to settings;
-- A couple of other hash join tests unrelated to work_mem management.
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+ select count(*) from simple r join simple s using (id);
+$$);
+ original | final
+----------+-------
+ 1 | 1
+(1 row)
+
+rollback to settings;
-- A full outer join where every record is matched.
-- non-parallel
savepoint settings;
-- A couple of other hash join tests unrelated to work_mem management.
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+ select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
-- A full outer join where every record is matched.
-- non-parallel