]> granicus.if.org Git - postgresql/commitdiff
Support parallel bitmap heap scans.
authorRobert Haas <rhaas@postgresql.org>
Wed, 8 Mar 2017 17:05:43 +0000 (12:05 -0500)
committerRobert Haas <rhaas@postgresql.org>
Wed, 8 Mar 2017 17:05:43 +0000 (12:05 -0500)
The index is scanned by a single process, but then all cooperating
processes can iterate jointly over the resulting set of heap blocks.
In the future, we might also want to support using a parallel bitmap
index scan to set up for a parallel bitmap heap scan, but that's a
job for another day.

Dilip Kumar, with some corrections and cosmetic changes by me.  The
larger patch set of which this is a part has been reviewed and tested
by (at least) Andres Freund, Amit Khandekar, Tushar Ahuja, Rafia
Sabih, Haribabu Kommi, Thomas Munro, and me.

Discussion: http://postgr.es/m/CAFiTN-uc4=0WxRGfCzs-xfkMYcSEWUC-Fon6thkJGjkh9i=13A@mail.gmail.com

24 files changed:
doc/src/sgml/monitoring.sgml
src/backend/access/heap/heapam.c
src/backend/executor/execParallel.c
src/backend/executor/nodeBitmapHeapscan.c
src/backend/executor/nodeBitmapIndexscan.c
src/backend/executor/nodeBitmapOr.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/path/allpaths.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/path/indxpath.c
src/backend/optimizer/plan/createplan.c
src/backend/optimizer/util/pathnode.c
src/backend/postmaster/pgstat.c
src/include/access/heapam.h
src/include/executor/nodeBitmapHeapscan.h
src/include/nodes/execnodes.h
src/include/nodes/plannodes.h
src/include/optimizer/pathnode.h
src/include/optimizer/paths.h
src/include/pgstat.h
src/test/regress/expected/select_parallel.out
src/test/regress/sql/select_parallel.sql

index 27ed35f0a7b756632af4f730a9acfa8ea495c92b..4d03531cc154b9bd06b75280b8d3314b1972f6f4 100644 (file)
@@ -1211,7 +1211,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry>Waiting in an extension.</entry>
         </row>
         <row>
-         <entry morerows="10"><literal>IPC</></entry>
+         <entry morerows="11"><literal>IPC</></entry>
          <entry><literal>BgWorkerShutdown</></entry>
          <entry>Waiting for background worker to shut down.</entry>
         </row>
@@ -1247,6 +1247,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry><literal>ParallelFinish</></entry>
          <entry>Waiting for parallel workers to finish computing.</entry>
         </row>
+        <row>
+         <entry><literal>ParallelBitmapPopulate</></entry>
+         <entry>Waiting for the leader to populate the TidBitmap.</entry>
+        </row>
         <row>
          <entry><literal>SafeSnapshot</></entry>
          <entry>Waiting for a snapshot for a <literal>READ ONLY DEFERRABLE</> transaction.</entry>
index af258366a207dcecdd22d29a640d0e35d06c3c77..bffc971d6894412b5acce3fe1f465f9a2401222f 100644 (file)
@@ -1753,6 +1753,22 @@ retry:
        return page;
 }
 
+/* ----------------
+ *             heap_update_snapshot
+ *
+ *             Update snapshot info in heap scan descriptor.
+ * ----------------
+ */
+void
+heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
+{
+       Assert(IsMVCCSnapshot(snapshot));
+
+       RegisterSnapshot(snapshot);
+       scan->rs_snapshot = snapshot;
+       scan->rs_temp_snap = true;
+}
+
 /* ----------------
  *             heap_getnext    - retrieve next tuple in scan
  *
index de0e2bafe605b9117f57b66d5e304599d8b9ea6c..a1289e5f12ef513d4b573d8b9c3f6ccd7ffdf11c 100644 (file)
@@ -25,6 +25,7 @@
 
 #include "executor/execParallel.h"
 #include "executor/executor.h"
+#include "executor/nodeBitmapHeapscan.h"
 #include "executor/nodeCustom.h"
 #include "executor/nodeForeignscan.h"
 #include "executor/nodeSeqscan.h"
@@ -217,6 +218,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
                                ExecCustomScanEstimate((CustomScanState *) planstate,
                                                                           e->pcxt);
                                break;
+                       case T_BitmapHeapScanState:
+                               ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
+                                                                          e->pcxt);
+                               break;
                        default:
                                break;
                }
@@ -277,6 +282,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
                                ExecCustomScanInitializeDSM((CustomScanState *) planstate,
                                                                                        d->pcxt);
                                break;
+                       case T_BitmapHeapScanState:
+                               ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
+                                                                                       d->pcxt);
+                               break;
+
                        default:
                                break;
                }
@@ -775,6 +785,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
                                ExecCustomScanInitializeWorker((CustomScanState *) planstate,
                                                                                           toc);
                                break;
+                       case T_BitmapHeapScanState:
+                               ExecBitmapHeapInitializeWorker(
+                                                                        (BitmapHeapScanState *) planstate, toc);
+                               break;
                        default:
                                break;
                }
index c1aa9f13bdf5b3df609ddef3138f34c532762e87..833a93e1b7d36317b1576bdff009d17375fd29d8 100644 (file)
 
 static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node);
 static void bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres);
+static inline void BitmapDoneInitializingSharedState(
+                                                                 ParallelBitmapHeapState *pstate);
 static inline void BitmapAdjustPrefetchIterator(BitmapHeapScanState *node,
                                                         TBMIterateResult *tbmres);
 static inline void BitmapAdjustPrefetchTarget(BitmapHeapScanState *node);
 static inline void BitmapPrefetch(BitmapHeapScanState *node,
                           HeapScanDesc scan);
+static bool BitmapShouldInitializeSharedState(
+                                                                 ParallelBitmapHeapState *pstate);
 
 
 /* ----------------------------------------------------------------
@@ -73,9 +77,12 @@ BitmapHeapNext(BitmapHeapScanState *node)
        HeapScanDesc scan;
        TIDBitmap  *tbm;
        TBMIterator *tbmiterator;
+       TBMSharedIterator *shared_tbmiterator;
        TBMIterateResult *tbmres;
        OffsetNumber targoffset;
        TupleTableSlot *slot;
+       ParallelBitmapHeapState *pstate = node->pstate;
+       dsa_area   *dsa = node->ss.ps.state->es_query_dsa;
 
        /*
         * extract necessary information from index scan node
@@ -84,7 +91,10 @@ BitmapHeapNext(BitmapHeapScanState *node)
        slot = node->ss.ss_ScanTupleSlot;
        scan = node->ss.ss_currentScanDesc;
        tbm = node->tbm;
-       tbmiterator = node->tbmiterator;
+       if (pstate == NULL)
+               tbmiterator = node->tbmiterator;
+       else
+               shared_tbmiterator = node->shared_tbmiterator;
        tbmres = node->tbmres;
 
        /*
@@ -99,25 +109,82 @@ BitmapHeapNext(BitmapHeapScanState *node)
         * node->prefetch_maximum.  This is to avoid doing a lot of prefetching in
         * a scan that stops after a few tuples because of a LIMIT.
         */
-       if (tbm == NULL)
+       if (!node->initialized)
        {
-               tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+               if (!pstate)
+               {
+                       tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
 
-               if (!tbm || !IsA(tbm, TIDBitmap))
-                       elog(ERROR, "unrecognized result from subplan");
+                       if (!tbm || !IsA(tbm, TIDBitmap))
+                               elog(ERROR, "unrecognized result from subplan");
 
-               node->tbm = tbm;
-               node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm);
-               node->tbmres = tbmres = NULL;
+                       node->tbm = tbm;
+                       node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm);
+                       node->tbmres = tbmres = NULL;
 
 #ifdef USE_PREFETCH
-               if (node->prefetch_maximum > 0)
-               {
-                       node->prefetch_iterator = tbm_begin_iterate(tbm);
-                       node->prefetch_pages = 0;
-                       node->prefetch_target = -1;
+                       if (node->prefetch_maximum > 0)
+                       {
+                               node->prefetch_iterator = tbm_begin_iterate(tbm);
+                               node->prefetch_pages = 0;
+                               node->prefetch_target = -1;
+                       }
+#endif   /* USE_PREFETCH */
                }
+               else
+               {
+                       /*
+                        * The leader will immediately come out of the function, but
+                        * others will be blocked until leader populates the TBM and wakes
+                        * them up.
+                        */
+                       if (BitmapShouldInitializeSharedState(pstate))
+                       {
+                               tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+                               if (!tbm || !IsA(tbm, TIDBitmap))
+                                       elog(ERROR, "unrecognized result from subplan");
+
+                               node->tbm = tbm;
+
+                               /*
+                                * Prepare to iterate over the TBM. This will return the
+                                * dsa_pointer of the iterator state which will be used by
+                                * multiple processes to iterate jointly.
+                                */
+                               pstate->tbmiterator = tbm_prepare_shared_iterate(tbm);
+#ifdef USE_PREFETCH
+                               if (node->prefetch_maximum > 0)
+                               {
+                                       pstate->prefetch_iterator =
+                                               tbm_prepare_shared_iterate(tbm);
+
+                                       /*
+                                        * We don't need the mutex here as we haven't yet woke up
+                                        * others.
+                                        */
+                                       pstate->prefetch_pages = 0;
+                                       pstate->prefetch_target = -1;
+                               }
+#endif
+
+                               /* We have initialized the shared state so wake up others. */
+                               BitmapDoneInitializingSharedState(pstate);
+                       }
+
+                       /* Allocate a private iterator and attach the shared state to it */
+                       node->shared_tbmiterator = shared_tbmiterator =
+                               tbm_attach_shared_iterate(dsa, pstate->tbmiterator);
+                       node->tbmres = tbmres = NULL;
+
+#ifdef USE_PREFETCH
+                       if (node->prefetch_maximum > 0)
+                       {
+                               node->shared_prefetch_iterator =
+                                       tbm_attach_shared_iterate(dsa, pstate->prefetch_iterator);
+                       }
 #endif   /* USE_PREFETCH */
+               }
+               node->initialized = true;
        }
 
        for (;;)
@@ -130,7 +197,10 @@ BitmapHeapNext(BitmapHeapScanState *node)
                 */
                if (tbmres == NULL)
                {
-                       node->tbmres = tbmres = tbm_iterate(tbmiterator);
+                       if (!pstate)
+                               node->tbmres = tbmres = tbm_iterate(tbmiterator);
+                       else
+                               node->tbmres = tbmres = tbm_shared_iterate(shared_tbmiterator);
                        if (tbmres == NULL)
                        {
                                /* no more entries in the bitmap */
@@ -182,8 +252,19 @@ BitmapHeapNext(BitmapHeapScanState *node)
                         * Try to prefetch at least a few pages even before we get to the
                         * second page if we don't stop reading after the first tuple.
                         */
-                       if (node->prefetch_target < node->prefetch_maximum)
-                               node->prefetch_target++;
+                       if (!pstate)
+                       {
+                               if (node->prefetch_target < node->prefetch_maximum)
+                                       node->prefetch_target++;
+                       }
+                       else if (pstate->prefetch_target < node->prefetch_maximum)
+                       {
+                               /* take spinlock while updating shared state */
+                               SpinLockAcquire(&pstate->mutex);
+                               if (pstate->prefetch_target < node->prefetch_maximum)
+                                       pstate->prefetch_target++;
+                               SpinLockRelease(&pstate->mutex);
+                       }
 #endif   /* USE_PREFETCH */
                }
 
@@ -361,6 +442,21 @@ bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres)
        scan->rs_ntuples = ntup;
 }
 
+/*
+ *     BitmapDoneInitializingSharedState - Shared state is initialized
+ *
+ *     By this time the leader has already populated the TBM and initialized the
+ *     shared state so wake up other processes.
+ */
+static inline void
+BitmapDoneInitializingSharedState(ParallelBitmapHeapState *pstate)
+{
+       SpinLockAcquire(&pstate->mutex);
+       pstate->state = BM_FINISHED;
+       SpinLockRelease(&pstate->mutex);
+       ConditionVariableBroadcast(&pstate->cv);
+}
+
 /*
  *     BitmapAdjustPrefetchIterator - Adjust the prefetch iterator
  */
@@ -369,20 +465,53 @@ BitmapAdjustPrefetchIterator(BitmapHeapScanState *node,
                                                         TBMIterateResult *tbmres)
 {
 #ifdef USE_PREFETCH
-       TBMIterator *prefetch_iterator = node->prefetch_iterator;
+       ParallelBitmapHeapState *pstate = node->pstate;
 
-       if (node->prefetch_pages > 0)
+       if (pstate == NULL)
        {
-               /* The main iterator has closed the distance by one page */
-               node->prefetch_pages--;
+               TBMIterator *prefetch_iterator = node->prefetch_iterator;
+
+               if (node->prefetch_pages > 0)
+               {
+                       /* The main iterator has closed the distance by one page */
+                       node->prefetch_pages--;
+               }
+               else if (prefetch_iterator)
+               {
+                       /* Do not let the prefetch iterator get behind the main one */
+                       TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+
+                       if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
+                               elog(ERROR, "prefetch and main iterators are out of sync");
+               }
+               return;
        }
-       else if (prefetch_iterator)
+
+       if (node->prefetch_maximum > 0)
        {
-               /* Do not let the prefetch iterator get behind the main one */
-               TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+               TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
+
+               SpinLockAcquire(&pstate->mutex);
+               if (pstate->prefetch_pages > 0)
+               {
+                       node->prefetch_pages--;
+                       SpinLockRelease(&pstate->mutex);
+               }
+               else
+               {
+                       /* Release the mutex before iterating */
+                       SpinLockRelease(&pstate->mutex);
 
-               if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
-                       elog(ERROR, "prefetch and main iterators are out of sync");
+                       /*
+                        * In case of shared mode, we can not ensure that the current
+                        * blockno of the main iterator and that of the prefetch iterator
+                        * are same.  It's possible that whatever blockno we are
+                        * prefetching will be processed by another process.  Therefore, we
+                        * don't validate the blockno here as we do in non-parallel case.
+                        */
+                       if (prefetch_iterator)
+                               tbm_shared_iterate(prefetch_iterator);
+               }
        }
 #endif   /* USE_PREFETCH */
 }
@@ -399,14 +528,35 @@ static inline void
 BitmapAdjustPrefetchTarget(BitmapHeapScanState *node)
 {
 #ifdef USE_PREFETCH
-       if (node->prefetch_target >= node->prefetch_maximum)
-                /* don't increase any further */ ;
-       else if (node->prefetch_target >= node->prefetch_maximum / 2)
-               node->prefetch_target = node->prefetch_maximum;
-       else if (node->prefetch_target > 0)
-               node->prefetch_target *= 2;
-       else
-               node->prefetch_target++;
+       ParallelBitmapHeapState *pstate = node->pstate;
+
+       if (pstate == NULL)
+       {
+               if (node->prefetch_target >= node->prefetch_maximum)
+                        /* don't increase any further */ ;
+               else if (node->prefetch_target >= node->prefetch_maximum / 2)
+                       node->prefetch_target = node->prefetch_maximum;
+               else if (node->prefetch_target > 0)
+                       node->prefetch_target *= 2;
+               else
+                       node->prefetch_target++;
+               return;
+       }
+
+       /* Do an unlocked check first to save spinlock acquisitions. */
+       if (pstate->prefetch_target < node->prefetch_maximum)
+       {
+               SpinLockAcquire(&pstate->mutex);
+               if (pstate->prefetch_target >= node->prefetch_maximum)
+                        /* don't increase any further */ ;
+               else if (pstate->prefetch_target >= node->prefetch_maximum / 2)
+                       pstate->prefetch_target = node->prefetch_maximum;
+               else if (pstate->prefetch_target > 0)
+                       pstate->prefetch_target *= 2;
+               else
+                       pstate->prefetch_target++;
+               SpinLockRelease(&pstate->mutex);
+       }
 #endif   /* USE_PREFETCH */
 }
 
@@ -417,23 +567,70 @@ static inline void
 BitmapPrefetch(BitmapHeapScanState *node, HeapScanDesc scan)
 {
 #ifdef USE_PREFETCH
-       TBMIterator *prefetch_iterator = node->prefetch_iterator;
+       ParallelBitmapHeapState *pstate = node->pstate;
 
-       if (prefetch_iterator)
+       if (pstate == NULL)
        {
-               while (node->prefetch_pages < node->prefetch_target)
+               TBMIterator *prefetch_iterator = node->prefetch_iterator;
+
+               if (prefetch_iterator)
                {
-                       TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+                       while (node->prefetch_pages < node->prefetch_target)
+                       {
+                               TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+
+                               if (tbmpre == NULL)
+                               {
+                                       /* No more pages to prefetch */
+                                       tbm_end_iterate(prefetch_iterator);
+                                       node->prefetch_iterator = NULL;
+                                       break;
+                               }
+                               node->prefetch_pages++;
+                               PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
+                       }
+               }
+
+               return;
+       }
 
-                       if (tbmpre == NULL)
+       if (pstate->prefetch_pages < pstate->prefetch_target)
+       {
+               TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
+
+               if (prefetch_iterator)
+               {
+                       while (1)
                        {
-                               /* No more pages to prefetch */
-                               tbm_end_iterate(prefetch_iterator);
-                               node->prefetch_iterator = NULL;
-                               break;
+                               TBMIterateResult *tbmpre;
+                               bool            do_prefetch = false;
+
+                               /*
+                                * Recheck under the mutex. If some other process has already
+                                * done enough prefetching then we need not to do anything.
+                                */
+                               SpinLockAcquire(&pstate->mutex);
+                               if (pstate->prefetch_pages < pstate->prefetch_target)
+                               {
+                                       pstate->prefetch_pages++;
+                                       do_prefetch = true;
+                               }
+                               SpinLockRelease(&pstate->mutex);
+
+                               if (!do_prefetch)
+                                       return;
+
+                               tbmpre = tbm_shared_iterate(prefetch_iterator);
+                               if (tbmpre == NULL)
+                               {
+                                       /* No more pages to prefetch */
+                                       tbm_end_shared_iterate(prefetch_iterator);
+                                       node->shared_prefetch_iterator = NULL;
+                                       break;
+                               }
+
+                               PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
                        }
-                       node->prefetch_pages++;
-                       PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
                }
        }
 #endif   /* USE_PREFETCH */
@@ -488,12 +685,36 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
                tbm_end_iterate(node->tbmiterator);
        if (node->prefetch_iterator)
                tbm_end_iterate(node->prefetch_iterator);
+       if (node->shared_tbmiterator)
+               tbm_end_shared_iterate(node->shared_tbmiterator);
+       if (node->shared_prefetch_iterator)
+               tbm_end_shared_iterate(node->shared_prefetch_iterator);
        if (node->tbm)
                tbm_free(node->tbm);
        node->tbm = NULL;
        node->tbmiterator = NULL;
        node->tbmres = NULL;
        node->prefetch_iterator = NULL;
+       node->initialized = false;
+       node->shared_tbmiterator = NULL;
+       node->shared_prefetch_iterator = NULL;
+
+       /* Reset parallel bitmap state, if present */
+       if (node->pstate)
+       {
+               dsa_area   *dsa = node->ss.ps.state->es_query_dsa;
+
+               node->pstate->state = BM_INITIAL;
+
+               if (DsaPointerIsValid(node->pstate->tbmiterator))
+                       tbm_free_shared_area(dsa, node->pstate->tbmiterator);
+
+               if (DsaPointerIsValid(node->pstate->prefetch_iterator))
+                       tbm_free_shared_area(dsa, node->pstate->prefetch_iterator);
+
+               node->pstate->tbmiterator = InvalidDsaPointer;
+               node->pstate->prefetch_iterator = InvalidDsaPointer;
+       }
 
        ExecScanReScan(&node->ss);
 
@@ -546,6 +767,10 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
                tbm_end_iterate(node->prefetch_iterator);
        if (node->tbm)
                tbm_free(node->tbm);
+       if (node->shared_tbmiterator)
+               tbm_end_shared_iterate(node->shared_tbmiterator);
+       if (node->shared_prefetch_iterator)
+               tbm_end_shared_iterate(node->shared_prefetch_iterator);
 
        /*
         * close heap scan
@@ -597,6 +822,10 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
        scanstate->prefetch_target = 0;
        /* may be updated below */
        scanstate->prefetch_maximum = target_prefetch_pages;
+       scanstate->pscan_len = 0;
+       scanstate->initialized = false;
+       scanstate->shared_tbmiterator = NULL;
+       scanstate->pstate = NULL;
 
        /*
         * Miscellaneous initialization
@@ -681,3 +910,108 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
         */
        return scanstate;
 }
+
+/*----------------
+ *             BitmapShouldInitializeSharedState
+ *
+ *             The first process to come here and see the state to the BM_INITIAL
+ *             will become the leader for the parallel bitmap scan and will be
+ *             responsible for populating the TIDBitmap.  The other processes will
+ *             be blocked by the condition variable until the leader wakes them up.
+ * ---------------
+ */
+static bool
+BitmapShouldInitializeSharedState(ParallelBitmapHeapState *pstate)
+{
+       SharedBitmapState state;
+
+       while (1)
+       {
+               SpinLockAcquire(&pstate->mutex);
+               state = pstate->state;
+               if (pstate->state == BM_INITIAL)
+                       pstate->state = BM_INPROGRESS;
+               SpinLockRelease(&pstate->mutex);
+
+               /* Exit if bitmap is done, or if we're the leader. */
+               if (state != BM_INPROGRESS)
+                       break;
+
+               /* Wait for the leader to wake us up. */
+               ConditionVariableSleep(&pstate->cv, WAIT_EVENT_PARALLEL_BITMAP_SCAN);
+       }
+
+       ConditionVariableCancelSleep();
+
+       return (state == BM_INITIAL);
+}
+
+/* ----------------------------------------------------------------
+ *             ExecBitmapHeapEstimate
+ *
+ *             estimates the space required to serialize bitmap scan node.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapEstimate(BitmapHeapScanState *node,
+                                          ParallelContext *pcxt)
+{
+       EState     *estate = node->ss.ps.state;
+
+       node->pscan_len = add_size(offsetof(ParallelBitmapHeapState,
+                                                                               phs_snapshot_data),
+                                                          EstimateSnapshotSpace(estate->es_snapshot));
+
+       shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+       shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *             ExecBitmapHeapInitializeDSM
+ *
+ *             Set up a parallel bitmap heap scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
+                                                       ParallelContext *pcxt)
+{
+       ParallelBitmapHeapState *pstate;
+       EState     *estate = node->ss.ps.state;
+
+       pstate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+
+       pstate->tbmiterator = 0;
+       pstate->prefetch_iterator = 0;
+
+       /* Initialize the mutex */
+       SpinLockInit(&pstate->mutex);
+       pstate->prefetch_pages = 0;
+       pstate->prefetch_target = 0;
+       pstate->state = BM_INITIAL;
+
+       ConditionVariableInit(&pstate->cv);
+       SerializeSnapshot(estate->es_snapshot, pstate->phs_snapshot_data);
+
+       shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pstate);
+       node->pstate = pstate;
+}
+
+/* ----------------------------------------------------------------
+ *             ExecBitmapHeapInitializeWorker
+ *
+ *             Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc)
+{
+       ParallelBitmapHeapState *pstate;
+       Snapshot        snapshot;
+
+       pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
+       node->pstate = pstate;
+
+       snapshot = RestoreSnapshot(pstate->phs_snapshot_data);
+       heap_update_snapshot(node->ss.ss_currentScanDesc, snapshot);
+}
index 94bb289f1bb32d31d83446d9da97476f67330989..ce2f3210a4a01d00d30b48b8430be5dac625346b 100644 (file)
@@ -78,7 +78,9 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
        else
        {
                /* XXX should we use less than work_mem for this? */
-               tbm = tbm_create(work_mem * 1024L, NULL);
+               tbm = tbm_create(work_mem * 1024L,
+                                                ((BitmapIndexScan *) node->ss.ps.plan)->isshared ?
+                                                node->ss.ps.state->es_query_dsa : NULL);
        }
 
        /*
index 1d280beddc171e00290bd4cb68716059b1379cf7..c0f261407bfb1072914ad18cbe563e130894e34f 100644 (file)
@@ -129,7 +129,9 @@ MultiExecBitmapOr(BitmapOrState *node)
                        if (result == NULL) /* first subplan */
                        {
                                /* XXX should we use less than work_mem for this? */
-                               result = tbm_create(work_mem * 1024L, NULL);
+                               result = tbm_create(work_mem * 1024L,
+                                                                       ((BitmapOr *) node->ps.plan)->isshared ?
+                                                                       node->ps.state->es_query_dsa : NULL);
                        }
 
                        ((BitmapIndexScanState *) subnode)->biss_result = result;
index b3eac06c50ee664ec225d43e48b8f6d16792a95b..ac8e50ef1dc98b89e71dbcd9d52d84fa987f3918 100644 (file)
@@ -331,6 +331,7 @@ _copyBitmapOr(const BitmapOr *from)
        /*
         * copy remainder of node
         */
+       COPY_SCALAR_FIELD(isshared);
        COPY_NODE_FIELD(bitmapplans);
 
        return newnode;
@@ -496,6 +497,7 @@ _copyBitmapIndexScan(const BitmapIndexScan *from)
         * copy remainder of node
         */
        COPY_SCALAR_FIELD(indexid);
+       COPY_SCALAR_FIELD(isshared);
        COPY_NODE_FIELD(indexqual);
        COPY_NODE_FIELD(indexqualorig);
 
index d4297d11b1890b4a2769ea33e3b6ded089dfa085..825a7b283a348148e6935604eb95983e1bd57e17 100644 (file)
@@ -441,6 +441,7 @@ _outBitmapOr(StringInfo str, const BitmapOr *node)
 
        _outPlanInfo(str, (const Plan *) node);
 
+       WRITE_BOOL_FIELD(isshared);
        WRITE_NODE_FIELD(bitmapplans);
 }
 
@@ -520,6 +521,7 @@ _outBitmapIndexScan(StringInfo str, const BitmapIndexScan *node)
        _outScanInfo(str, (const Scan *) node);
 
        WRITE_OID_FIELD(indexid);
+       WRITE_BOOL_FIELD(isshared);
        WRITE_NODE_FIELD(indexqual);
        WRITE_NODE_FIELD(indexqualorig);
 }
index b02d9fa24693562f689c5a5c45211194b80ec5dd..8f39d93a123c99abe734da3dbda1b121446d206e 100644 (file)
@@ -1633,6 +1633,7 @@ _readBitmapOr(void)
 
        ReadCommonPlan(&local_node->plan);
 
+       READ_BOOL_FIELD(isshared);
        READ_NODE_FIELD(bitmapplans);
 
        READ_DONE();
@@ -1744,6 +1745,7 @@ _readBitmapIndexScan(void)
        ReadCommonScan(&local_node->scan);
 
        READ_OID_FIELD(indexid);
+       READ_BOOL_FIELD(isshared);
        READ_NODE_FIELD(indexqual);
        READ_NODE_FIELD(indexqualorig);
 
index 932c84c949befb99a0014c46bc651230ea7e1762..fbb2cda9d73bd89579ee0c2fa75c38b16ca1397e 100644 (file)
@@ -2911,6 +2911,30 @@ remove_unused_subquery_outputs(Query *subquery, RelOptInfo *rel)
        }
 }
 
+/*
+ * create_partial_bitmap_paths
+ *       Build partial bitmap heap path for the relation
+ */
+void
+create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
+                                                       Path *bitmapqual)
+{
+       int                     parallel_workers;
+       double          pages_fetched;
+
+       /* Compute heap pages for bitmap heap scan */
+       pages_fetched = compute_bitmap_pages(root, rel, bitmapqual, 1.0,
+                                                                                NULL, NULL);
+
+       parallel_workers = compute_parallel_worker(rel, pages_fetched, 0);
+
+       if (parallel_workers <= 0)
+               return;
+
+       add_partial_path(rel, (Path *) create_bitmap_heap_path(root, rel,
+                                       bitmapqual, rel->lateral_relids, 1.0, parallel_workers));
+}
+
 /*
  * Compute the number of parallel workers that should be used to scan a
  * relation.  We compute the parallel workers based on the size of the heap to
index 3eaed5af7a56a38e6400fe13b3efc4c777e2719a..627e3f1b954121337783db5834b7385f9b3c2750 100644 (file)
@@ -860,6 +860,7 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
        QualCost        qpqual_cost;
        Cost            cpu_per_tuple;
        Cost            cost_per_page;
+       Cost            cpu_run_cost;
        double          tuples_fetched;
        double          pages_fetched;
        double          spc_seq_page_cost,
@@ -921,8 +922,21 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
 
        startup_cost += qpqual_cost.startup;
        cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple;
+       cpu_run_cost = cpu_per_tuple * tuples_fetched;
+
+       /* Adjust costing for parallelism, if used. */
+       if (path->parallel_workers > 0)
+       {
+               double          parallel_divisor = get_parallel_divisor(path);
+
+               /* The CPU cost is divided among all the workers. */
+               cpu_run_cost /= parallel_divisor;
 
-       run_cost += cpu_per_tuple * tuples_fetched;
+               path->rows = clamp_row_est(path->rows / parallel_divisor);
+       }
+
+
+       run_cost += cpu_run_cost;
 
        /* tlist eval costs are paid per output row, not per tuple scanned */
        startup_cost += path->pathtarget->cost.startup;
index d8e5b811268be0c7390433200c17c0c443c64137..c2b72d410af0a916f5b782064f1dc53b49813f64 100644 (file)
@@ -337,8 +337,12 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel)
 
                bitmapqual = choose_bitmap_and(root, rel, bitindexpaths);
                bpath = create_bitmap_heap_path(root, rel, bitmapqual,
-                                                                               rel->lateral_relids, 1.0);
+                                                                               rel->lateral_relids, 1.0, 0);
                add_path(rel, (Path *) bpath);
+
+               /* create a partial bitmap heap path */
+               if (rel->consider_parallel && rel->lateral_relids == NULL)
+                       create_partial_bitmap_paths(root, rel, bitmapqual);
        }
 
        /*
@@ -410,7 +414,7 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel)
                        required_outer = get_bitmap_tree_required_outer(bitmapqual);
                        loop_count = get_loop_count(root, rel->relid, required_outer);
                        bpath = create_bitmap_heap_path(root, rel, bitmapqual,
-                                                                                       required_outer, loop_count);
+                                                                                       required_outer, loop_count, 0);
                        add_path(rel, (Path *) bpath);
                }
        }
@@ -1617,6 +1621,11 @@ bitmap_scan_cost_est(PlannerInfo *root, RelOptInfo *rel, Path *ipath)
        bpath.path.pathkeys = NIL;
        bpath.bitmapqual = ipath;
 
+       /*
+        * Check the cost of temporary path without considering parallelism.
+        * Parallel bitmap heap path will be considered at later stage.
+        */
+       bpath.path.parallel_workers = 0;
        cost_bitmap_heap_scan(&bpath.path, root, rel,
                                                  bpath.path.param_info,
                                                  ipath,
@@ -1659,6 +1668,12 @@ bitmap_and_cost_est(PlannerInfo *root, RelOptInfo *rel, List *paths)
        bpath.path.pathkeys = NIL;
        bpath.bitmapqual = (Path *) &apath;
 
+       /*
+        * Check the cost of temporary path without considering parallelism.
+        * Parallel bitmap heap path will be considered at later stage.
+        */
+       bpath.path.parallel_workers = 0;
+
        /* Now we can do cost_bitmap_heap_scan */
        cost_bitmap_heap_scan(&bpath.path, root, rel,
                                                  bpath.path.param_info,
index f1c7f609c0a706d1225304fbc129c80dad0ba703..8f8663c1e1408be5c10b5734046d988f6cfdd8d1 100644 (file)
@@ -125,6 +125,7 @@ static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root,
                                                List *tlist, List *scan_clauses);
 static Plan *create_bitmap_subplan(PlannerInfo *root, Path *bitmapqual,
                                          List **qual, List **indexqual, List **indexECs);
+static void bitmap_subplan_mark_shared(Plan *plan);
 static TidScan *create_tidscan_plan(PlannerInfo *root, TidPath *best_path,
                                        List *tlist, List *scan_clauses);
 static SubqueryScan *create_subqueryscan_plan(PlannerInfo *root,
@@ -2590,6 +2591,9 @@ create_bitmap_scan_plan(PlannerInfo *root,
                                                                                   &bitmapqualorig, &indexquals,
                                                                                   &indexECs);
 
+       if (best_path->path.parallel_aware)
+               bitmap_subplan_mark_shared(bitmapqualplan);
+
        /*
         * The qpqual list must contain all restrictions not automatically handled
         * by the index, other than pseudoconstant clauses which will be handled
@@ -4756,6 +4760,24 @@ label_sort_with_costsize(PlannerInfo *root, Sort *plan, double limit_tuples)
        plan->plan.parallel_aware = false;
 }
 
+/*
+ * bitmap_subplan_mark_shared
+ *   Set isshared flag in bitmap subplan so that it will be created in
+ *      shared memory.
+ */
+static void
+bitmap_subplan_mark_shared(Plan *plan)
+{
+       if (IsA(plan, BitmapAnd))
+               bitmap_subplan_mark_shared(
+                                                               linitial(((BitmapAnd *) plan)->bitmapplans));
+       else if (IsA(plan, BitmapOr))
+               ((BitmapOr *) plan)->isshared = true;
+       else if (IsA(plan, BitmapIndexScan))
+               ((BitmapIndexScan *) plan)->isshared = true;
+       else
+               elog(ERROR, "unrecognized node type: %d", nodeTag(plan));
+}
 
 /*****************************************************************************
  *
index 86aee2f8ec37cc1e44b23dfb0ad9a1806613afd7..0d925c6fcbf816690e1cd32604ace7cdda9b946b 100644 (file)
@@ -1068,7 +1068,8 @@ create_bitmap_heap_path(PlannerInfo *root,
                                                RelOptInfo *rel,
                                                Path *bitmapqual,
                                                Relids required_outer,
-                                               double loop_count)
+                                               double loop_count,
+                                               int parallel_degree)
 {
        BitmapHeapPath *pathnode = makeNode(BitmapHeapPath);
 
@@ -1077,9 +1078,9 @@ create_bitmap_heap_path(PlannerInfo *root,
        pathnode->path.pathtarget = rel->reltarget;
        pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                                                                                  required_outer);
-       pathnode->path.parallel_aware = false;
+       pathnode->path.parallel_aware = parallel_degree > 0 ? true : false;
        pathnode->path.parallel_safe = rel->consider_parallel;
-       pathnode->path.parallel_workers = 0;
+       pathnode->path.parallel_workers = parallel_degree;
        pathnode->path.pathkeys = NIL;          /* always unordered */
 
        pathnode->bitmapqual = bitmapqual;
@@ -3281,7 +3282,7 @@ reparameterize_path(PlannerInfo *root, Path *path,
                                                                                                                rel,
                                                                                                                bpath->bitmapqual,
                                                                                                                required_outer,
-                                                                                                               loop_count);
+                                                                                                               loop_count, 0);
                        }
                case T_SubqueryScan:
                        {
index 2fb9a8bf580639d50ca1ea224cae434d9678bfd1..7cacb1e9b24989e91369abc6c64517d166276e0d 100644 (file)
@@ -3395,6 +3395,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
                case WAIT_EVENT_PARALLEL_FINISH:
                        event_name = "ParallelFinish";
                        break;
+               case WAIT_EVENT_PARALLEL_BITMAP_SCAN:
+                       event_name = "ParallelBitmapScan";
+                       break;
                case WAIT_EVENT_SAFE_SNAPSHOT:
                        event_name = "SafeSnapshot";
                        break;
index a864f7860d8b6ac16f91d77a341b4931048f0f65..7e85510d2fd4c305a26f500db689adca9b341742 100644 (file)
@@ -179,6 +179,7 @@ extern void simple_heap_update(Relation relation, ItemPointer otid,
                                   HeapTuple tup);
 
 extern void heap_sync(Relation relation);
+extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot);
 
 /* in heap/pruneheap.c */
 extern void heap_page_prune_opt(Relation relation, Buffer buffer);
index d7659b94e61591d54c20b5636210d8f0ac3b70df..465c58e6ee54f6720aeea00f2700324273cf4006 100644 (file)
 #define NODEBITMAPHEAPSCAN_H
 
 #include "nodes/execnodes.h"
+#include "access/parallel.h"
 
 extern BitmapHeapScanState *ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecBitmapHeapScan(BitmapHeapScanState *node);
 extern void ExecEndBitmapHeapScan(BitmapHeapScanState *node);
 extern void ExecReScanBitmapHeapScan(BitmapHeapScanState *node);
+extern void ExecBitmapHeapEstimate(BitmapHeapScanState *node,
+                                          ParallelContext *pcxt);
+extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
+                                                       ParallelContext *pcxt);
+extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
+                                                          shm_toc *toc);
 
 #endif   /* NODEBITMAPHEAPSCAN_H */
index 2fde67a9c8d9fd605b7b237af9e8abe1a63e365a..6a0d590ef2974c55e937ca694fda94e01cec40a7 100644 (file)
@@ -26,6 +26,8 @@
 #include "utils/sortsupport.h"
 #include "utils/tuplestore.h"
 #include "utils/tuplesort.h"
+#include "nodes/tidbitmap.h"
+#include "storage/condition_variable.h"
 
 
 /* ----------------
@@ -1464,6 +1466,51 @@ typedef struct BitmapIndexScanState
        IndexScanDesc biss_ScanDesc;
 } BitmapIndexScanState;
 
+/* ----------------
+ *      SharedBitmapState information
+ *
+ *             BM_INITIAL              TIDBitmap creation is not yet started, so first worker
+ *                                             to see this state will set the state to BM_INPROGRESS
+ *                                             and that process will be responsible for creating
+ *                                             TIDBitmap.
+ *             BM_INPROGRESS   TIDBitmap creation is in progress; workers need to
+ *                                             sleep until it's finished.
+ *             BM_FINISHED             TIDBitmap creation is done, so now all workers can
+ *                                             proceed to iterate over TIDBitmap.
+ * ----------------
+ */
+typedef enum
+{
+       BM_INITIAL,
+       BM_INPROGRESS,
+       BM_FINISHED
+} SharedBitmapState;
+
+/* ----------------
+ *      ParallelBitmapHeapState information
+ *             tbmiterator                             iterator for scanning current pages
+ *             prefetch_iterator               iterator for prefetching ahead of current page
+ *             mutex                                   mutual exclusion for the prefetching variable
+ *                                                             and state
+ *             prefetch_pages                  # pages prefetch iterator is ahead of current
+ *             prefetch_target                 current target prefetch distance
+ *             state                                   current state of the TIDBitmap
+ *             cv                                              conditional wait variable
+ *             phs_snapshot_data               snapshot data shared to workers
+ * ----------------
+ */
+typedef struct ParallelBitmapHeapState
+{
+       dsa_pointer tbmiterator;
+       dsa_pointer prefetch_iterator;
+       slock_t         mutex;
+       int                     prefetch_pages;
+       int                     prefetch_target;
+       SharedBitmapState state;
+       ConditionVariable cv;
+       char            phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+} ParallelBitmapHeapState;
+
 /* ----------------
  *      BitmapHeapScanState information
  *
@@ -1477,6 +1524,11 @@ typedef struct BitmapIndexScanState
  *             prefetch_pages     # pages prefetch iterator is ahead of current
  *             prefetch_target    current target prefetch distance
  *             prefetch_maximum   maximum value for prefetch_target
+ *             pscan_len                  size of the shared memory for parallel bitmap
+ *             initialized                is node is ready to iterate
+ *             shared_tbmiterator         shared iterator
+ *             shared_prefetch_iterator shared iterator for prefetching
+ *             pstate                     shared state for parallel bitmap scan
  * ----------------
  */
 typedef struct BitmapHeapScanState
@@ -1492,6 +1544,11 @@ typedef struct BitmapHeapScanState
        int                     prefetch_pages;
        int                     prefetch_target;
        int                     prefetch_maximum;
+       Size            pscan_len;
+       bool            initialized;
+       TBMSharedIterator *shared_tbmiterator;
+       TBMSharedIterator *shared_prefetch_iterator;
+       ParallelBitmapHeapState *pstate;
 } BitmapHeapScanState;
 
 /* ----------------
index e30ed6aa29b54ce5caae32d26aec7eaeddbc1b84..7fbb0c2c77e550321fc25fef293fd1244fd80256 100644 (file)
@@ -292,6 +292,7 @@ typedef struct BitmapAnd
 typedef struct BitmapOr
 {
        Plan            plan;
+       bool            isshared;
        List       *bitmapplans;
 } BitmapOr;
 
@@ -420,6 +421,7 @@ typedef struct BitmapIndexScan
 {
        Scan            scan;
        Oid                     indexid;                /* OID of index to scan */
+       bool            isshared;               /* Create shared bitmap if set */
        List       *indexqual;          /* list of index quals (OpExprs) */
        List       *indexqualorig;      /* the same in original form */
 } BitmapIndexScan;
index befe5781416f1a71eea32b5d8befca6aca3f4737..f0fe8307224218bbf9042ea7746b3550e1c556c5 100644 (file)
@@ -53,7 +53,8 @@ extern BitmapHeapPath *create_bitmap_heap_path(PlannerInfo *root,
                                                RelOptInfo *rel,
                                                Path *bitmapqual,
                                                Relids required_outer,
-                                               double loop_count);
+                                               double loop_count,
+                                               int parallel_degree);
 extern BitmapAndPath *create_bitmap_and_path(PlannerInfo *root,
                                           RelOptInfo *rel,
                                           List *bitmapquals);
index bc0dcf4468f2eb1d780fb35d5a826e6bbda2ecd3..247fd118793229c5eb53e4e5a150be49a963e895 100644 (file)
@@ -56,6 +56,8 @@ extern RelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed,
 extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel);
 extern int compute_parallel_worker(RelOptInfo *rel, BlockNumber heap_pages,
                                                BlockNumber index_pages);
+extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
+                                                                               Path *bitmapqual);
 
 #ifdef OPTIMIZER_DEBUG
 extern void debug_print_rel(PlannerInfo *root, RelOptInfo *rel);
index 0062fb8af24d0af80e4acd089f83b32013b81168..60c78d118f9ca55f80e66c9cd4de484b4b6da353 100644 (file)
@@ -787,6 +787,7 @@ typedef enum
        WAIT_EVENT_MQ_RECEIVE,
        WAIT_EVENT_MQ_SEND,
        WAIT_EVENT_PARALLEL_FINISH,
+       WAIT_EVENT_PARALLEL_BITMAP_SCAN,
        WAIT_EVENT_SAFE_SNAPSHOT,
        WAIT_EVENT_SYNC_REP
 } WaitEventIPC;
index 75558d05e0816244de30a42100a8cfbfd8308593..290b735b6b59527156b546e4f15b694fef0ee19c 100644 (file)
@@ -169,6 +169,25 @@ select  count(*) from tenk1 where thousand > 95;
 
 reset enable_seqscan;
 reset enable_bitmapscan;
+-- test parallel bitmap heap scan.
+set enable_seqscan to off;
+set enable_indexscan to off;
+explain (costs off)
+       select  count((unique1)) from tenk1 where hundred > 1;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial Aggregate
+               ->  Parallel Bitmap Heap Scan on tenk1
+                     Recheck Cond: (hundred > 1)
+                     ->  Bitmap Index Scan on tenk1_hundred
+                           Index Cond: (hundred > 1)
+(8 rows)
+
+reset enable_seqscan;
+reset enable_indexscan;
 -- test parallel merge join path.
 set enable_hashjoin to off;
 set enable_nestloop to off;
index ebdae7e9391c5bda7e85b8d9b0324042d2b0cf5f..80412b990d24c8cd1dd683e1a2a3d72a35989dd5 100644 (file)
@@ -64,6 +64,16 @@ select  count(*) from tenk1 where thousand > 95;
 reset enable_seqscan;
 reset enable_bitmapscan;
 
+-- test parallel bitmap heap scan.
+set enable_seqscan to off;
+set enable_indexscan to off;
+
+explain (costs off)
+       select  count((unique1)) from tenk1 where hundred > 1;
+
+reset enable_seqscan;
+reset enable_indexscan;
+
 -- test parallel merge join path.
 set enable_hashjoin to off;
 set enable_nestloop to off;