]> granicus.if.org Git - postgresql/commitdiff
btree: Support parallel index scans.
authorRobert Haas <rhaas@postgresql.org>
Wed, 15 Feb 2017 12:41:14 +0000 (07:41 -0500)
committerRobert Haas <rhaas@postgresql.org>
Wed, 15 Feb 2017 12:41:14 +0000 (07:41 -0500)
This isn't exposed to the optimizer or the executor yet; we'll add
support for those things in a separate patch.  But this puts the
basic mechanism in place: several processes can attach to a parallel
btree index scan, and each one will get a subset of the tuples that
would have been produced by a non-parallel scan.  Each index page
becomes the responsibility of a single worker, which then returns
all of the TIDs on that page.

Rahila Syed, Amit Kapila, Robert Haas, reviewed and tested by
Anastasia Lubennikova, Tushar Ahuja, and Haribabu Kommi.

doc/src/sgml/monitoring.sgml
src/backend/access/nbtree/nbtree.c
src/backend/access/nbtree/nbtsearch.c
src/backend/access/nbtree/nbtutils.c
src/backend/postmaster/pgstat.c
src/include/access/nbtree.h
src/include/pgstat.h
src/tools/pgindent/typedefs.list

index 5b67defdb8918e6718a265b78c7bc8af4fb6bea5..fad5cb05b95dac91a03af111af0b179ea690bbe4 100644 (file)
@@ -1207,7 +1207,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry>Waiting in an extension.</entry>
         </row>
         <row>
-         <entry morerows="9"><literal>IPC</></entry>
+         <entry morerows="10"><literal>IPC</></entry>
          <entry><literal>BgWorkerShutdown</></entry>
          <entry>Waiting for background worker to shut down.</entry>
         </row>
@@ -1215,6 +1215,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry><literal>BgWorkerStartup</></entry>
          <entry>Waiting for background worker to start up.</entry>
         </row>
+        <row>
+         <entry><literal>BtreePage</></entry>
+         <entry>Waiting for the page number needed to continue a parallel btree scan to become available.</entry>
+        </row>
         <row>
          <entry><literal>ExecuteGather</></entry>
          <entry>Waiting for activity from child process when executing <literal>Gather</> node.</entry>
index 945e563fcc50bd5c5a629e792909f6fdca4f1bad..cbc575d5cf2e8b1149f542067e812d8590b0a0c9 100644 (file)
@@ -23,6 +23,8 @@
 #include "access/xlog.h"
 #include "catalog/index.h"
 #include "commands/vacuum.h"
+#include "pgstat.h"
+#include "storage/condition_variable.h"
 #include "storage/indexfsm.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
@@ -63,6 +65,45 @@ typedef struct
        MemoryContext pagedelcontext;
 } BTVacState;
 
+/*
+ * BTPARALLEL_NOT_INITIALIZED indicates that the scan has not started.
+ *
+ * BTPARALLEL_ADVANCING indicates that some process is advancing the scan to
+ * a new page; others must wait.
+ *
+ * BTPARALLEL_IDLE indicates that no backend is currently advancing the scan
+ * to a new page; some process can start doing that.
+ *
+ * BTPARALLEL_DONE indicates that the scan is complete (including error exit).
+ * We reach this state once for every distinct combination of array keys.
+ */
+typedef enum
+{
+       BTPARALLEL_NOT_INITIALIZED,
+       BTPARALLEL_ADVANCING,
+       BTPARALLEL_IDLE,
+       BTPARALLEL_DONE
+} BTPS_State;
+
+/*
+ * BTParallelScanDescData contains btree specific shared information required
+ * for parallel scan.
+ */
+typedef struct BTParallelScanDescData
+{
+       BlockNumber btps_scanPage;      /* latest or next page to be scanned */
+       BTPS_State      btps_pageStatus;/* indicates whether next page is available
+                                                                * for scan. see above for possible states of
+                                                                * parallel scan. */
+       int                     btps_arrayKeyCount;             /* count indicating number of array
+                                                                                * scan keys processed by parallel
+                                                                                * scan */
+       slock_t         btps_mutex;             /* protects above variables */
+       ConditionVariable btps_cv;      /* used to synchronize parallel scan */
+} BTParallelScanDescData;
+
+typedef struct BTParallelScanDescData *BTParallelScanDesc;
+
 
 static void btbuildCallback(Relation index,
                                HeapTuple htup,
@@ -118,9 +159,9 @@ bthandler(PG_FUNCTION_ARGS)
        amroutine->amendscan = btendscan;
        amroutine->ammarkpos = btmarkpos;
        amroutine->amrestrpos = btrestrpos;
-       amroutine->amestimateparallelscan = NULL;
-       amroutine->aminitparallelscan = NULL;
-       amroutine->amparallelrescan = NULL;
+       amroutine->amestimateparallelscan = btestimateparallelscan;
+       amroutine->aminitparallelscan = btinitparallelscan;
+       amroutine->amparallelrescan = btparallelrescan;
 
        PG_RETURN_POINTER(amroutine);
 }
@@ -491,6 +532,7 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
        }
 
        so->markItemIndex = -1;
+       so->arrayKeyCount = 0;
        BTScanPosUnpinIfPinned(so->markPos);
        BTScanPosInvalidate(so->markPos);
 
@@ -652,6 +694,217 @@ btrestrpos(IndexScanDesc scan)
        }
 }
 
+/*
+ * btestimateparallelscan -- estimate storage for BTParallelScanDescData
+ */
+Size
+btestimateparallelscan(void)
+{
+       return sizeof(BTParallelScanDescData);
+}
+
+/*
+ * btinitparallelscan -- initialize BTParallelScanDesc for parallel btree scan
+ */
+void
+btinitparallelscan(void *target)
+{
+       BTParallelScanDesc bt_target = (BTParallelScanDesc) target;
+
+       SpinLockInit(&bt_target->btps_mutex);
+       bt_target->btps_scanPage = InvalidBlockNumber;
+       bt_target->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+       bt_target->btps_arrayKeyCount = 0;
+       ConditionVariableInit(&bt_target->btps_cv);
+}
+
+/*
+ *     btparallelrescan() -- reset parallel scan
+ */
+void
+btparallelrescan(IndexScanDesc scan)
+{
+       BTParallelScanDesc btscan;
+       ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+
+       Assert(parallel_scan);
+
+       btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+                                                                                                 parallel_scan->ps_offset);
+
+       /*
+        * In theory, we don't need to acquire the spinlock here, because there
+        * shouldn't be any other workers running at this point, but we do so for
+        * consistency.
+        */
+       SpinLockAcquire(&btscan->btps_mutex);
+       btscan->btps_scanPage = InvalidBlockNumber;
+       btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+       btscan->btps_arrayKeyCount = 0;
+       SpinLockRelease(&btscan->btps_mutex);
+}
+
+/*
+ * _bt_parallel_seize() -- Begin the process of advancing the scan to a new
+ *             page.  Other scans must wait until we call bt_parallel_release() or
+ *             bt_parallel_done().
+ *
+ * The return value is true if we successfully seized the scan and false
+ * if we did not.  The latter case occurs if no pages remain for the current
+ * set of scankeys.
+ *
+ * If the return value is true, *pageno returns the next or current page
+ * of the scan (depending on the scan direction).  An invalid block number
+ * means the scan hasn't yet started, and P_NONE means we've reached the end.
+ * The first time a participating process reaches the last page, it will return
+ * true and set *pageno to P_NONE; after that, further attempts to seize the
+ * scan will return false.
+ *
+ * Callers should ignore the value of pageno if the return value is false.
+ */
+bool
+_bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno)
+{
+       BTScanOpaque so = (BTScanOpaque) scan->opaque;
+       BTPS_State      pageStatus;
+       bool            exit_loop = false;
+       bool            status = true;
+       ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+       BTParallelScanDesc btscan;
+
+       *pageno = P_NONE;
+
+       btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+                                                                                                 parallel_scan->ps_offset);
+
+       while (1)
+       {
+               SpinLockAcquire(&btscan->btps_mutex);
+               pageStatus = btscan->btps_pageStatus;
+
+               if (so->arrayKeyCount < btscan->btps_arrayKeyCount)
+               {
+                       /* Parallel scan has already advanced to a new set of scankeys. */
+                       status = false;
+               }
+               else if (pageStatus == BTPARALLEL_DONE)
+               {
+                       /*
+                        * We're done with this set of scankeys.  This may be the end, or
+                        * there could be more sets to try.
+                        */
+                       status = false;
+               }
+               else if (pageStatus != BTPARALLEL_ADVANCING)
+               {
+                       /*
+                        * We have successfully seized control of the scan for the purpose
+                        * of advancing it to a new page!
+                        */
+                       btscan->btps_pageStatus = BTPARALLEL_ADVANCING;
+                       *pageno = btscan->btps_scanPage;
+                       exit_loop = true;
+               }
+               SpinLockRelease(&btscan->btps_mutex);
+               if (exit_loop || !status)
+                       break;
+               ConditionVariableSleep(&btscan->btps_cv, WAIT_EVENT_BTREE_PAGE);
+       }
+       ConditionVariableCancelSleep();
+
+       return status;
+}
+
+/*
+ * _bt_parallel_release() -- Complete the process of advancing the scan to a
+ *             new page.  We now have the new value btps_scanPage; some other backend
+ *             can now begin advancing the scan.
+ */
+void
+_bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page)
+{
+       ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+       BTParallelScanDesc btscan;
+
+       btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+                                                                                                 parallel_scan->ps_offset);
+
+       SpinLockAcquire(&btscan->btps_mutex);
+       btscan->btps_scanPage = scan_page;
+       btscan->btps_pageStatus = BTPARALLEL_IDLE;
+       SpinLockRelease(&btscan->btps_mutex);
+       ConditionVariableSignal(&btscan->btps_cv);
+}
+
+/*
+ * _bt_parallel_done() -- Mark the parallel scan as complete.
+ *
+ * When there are no pages left to scan, this function should be called to
+ * notify other workers.  Otherwise, they might wait forever for the scan to
+ * advance to the next page.
+ */
+void
+_bt_parallel_done(IndexScanDesc scan)
+{
+       BTScanOpaque so = (BTScanOpaque) scan->opaque;
+       ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+       BTParallelScanDesc btscan;
+       bool            status_changed = false;
+
+       /* Do nothing, for non-parallel scans */
+       if (parallel_scan == NULL)
+               return;
+
+       btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+                                                                                                 parallel_scan->ps_offset);
+
+       /*
+        * Mark the parallel scan as done for this combination of scan keys,
+        * unless some other process already did so.  See also
+        * _bt_advance_array_keys.
+        */
+       SpinLockAcquire(&btscan->btps_mutex);
+       if (so->arrayKeyCount >= btscan->btps_arrayKeyCount &&
+               btscan->btps_pageStatus != BTPARALLEL_DONE)
+       {
+               btscan->btps_pageStatus = BTPARALLEL_DONE;
+               status_changed = true;
+       }
+       SpinLockRelease(&btscan->btps_mutex);
+
+       /* wake up all the workers associated with this parallel scan */
+       if (status_changed)
+               ConditionVariableBroadcast(&btscan->btps_cv);
+}
+
+/*
+ * _bt_parallel_advance_array_keys() -- Advances the parallel scan for array
+ *                     keys.
+ *
+ * Updates the count of array keys processed for both local and parallel
+ * scans.
+ */
+void
+_bt_parallel_advance_array_keys(IndexScanDesc scan)
+{
+       BTScanOpaque so = (BTScanOpaque) scan->opaque;
+       ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+       BTParallelScanDesc btscan;
+
+       btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+                                                                                                 parallel_scan->ps_offset);
+
+       so->arrayKeyCount++;
+       SpinLockAcquire(&btscan->btps_mutex);
+       if (btscan->btps_pageStatus == BTPARALLEL_DONE)
+       {
+               btscan->btps_scanPage = InvalidBlockNumber;
+               btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+               btscan->btps_arrayKeyCount++;
+       }
+       SpinLockRelease(&btscan->btps_mutex);
+}
+
 /*
  * Bulk deletion of all index entries pointing to a set of heap tuples.
  * The set of target tuples is specified via a callback routine that tells
index b6459d2f2a0e434b816e95a34109d35e9f070600..2f32b2e78d26905fcacb96052776ae1d88bf369e 100644 (file)
@@ -30,9 +30,13 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
 static void _bt_saveitem(BTScanOpaque so, int itemIndex,
                         OffsetNumber offnum, IndexTuple itup);
 static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
+static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir);
+static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno,
+                                         ScanDirection dir);
 static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot);
 static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
 static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp);
+static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir);
 
 
 /*
@@ -544,8 +548,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        ScanKeyData notnullkeys[INDEX_MAX_KEYS];
        int                     keysCount = 0;
        int                     i;
+       bool            status = true;
        StrategyNumber strat_total;
        BTScanPosItem *currItem;
+       BlockNumber blkno;
 
        Assert(!BTScanPosIsValid(so->currPos));
 
@@ -564,6 +570,30 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        if (!so->qual_ok)
                return false;
 
+       /*
+        * For parallel scans, get the starting page from shared state. If the
+        * scan has not started, proceed to find out first leaf page in the usual
+        * way while keeping other participating processes waiting.  If the scan
+        * has already begun, use the page number from the shared structure.
+        */
+       if (scan->parallel_scan != NULL)
+       {
+               status = _bt_parallel_seize(scan, &blkno);
+               if (!status)
+                       return false;
+               else if (blkno == P_NONE)
+               {
+                       _bt_parallel_done(scan);
+                       return false;
+               }
+               else if (blkno != InvalidBlockNumber)
+               {
+                       if (!_bt_parallel_readpage(scan, blkno, dir))
+                               return false;
+                       goto readcomplete;
+               }
+       }
+
        /*----------
         * Examine the scan keys to discover where we need to start the scan.
         *
@@ -743,7 +773,19 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
         * there.
         */
        if (keysCount == 0)
-               return _bt_endpoint(scan, dir);
+       {
+               bool            match;
+
+               match = _bt_endpoint(scan, dir);
+
+               if (!match)
+               {
+                       /* No match, so mark (parallel) scan finished */
+                       _bt_parallel_done(scan);
+               }
+
+               return match;
+       }
 
        /*
         * We want to start the scan somewhere within the index.  Set up an
@@ -773,7 +815,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 
                        Assert(subkey->sk_flags & SK_ROW_MEMBER);
                        if (subkey->sk_flags & SK_ISNULL)
+                       {
+                               _bt_parallel_done(scan);
                                return false;
+                       }
                        memcpy(scankeys + i, subkey, sizeof(ScanKeyData));
 
                        /*
@@ -993,25 +1038,21 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                 * because nothing finer to lock exists.
                 */
                PredicateLockRelation(rel, scan->xs_snapshot);
+
+               /*
+                * mark parallel scan as done, so that all the workers can finish
+                * their scan
+                */
+               _bt_parallel_done(scan);
+               BTScanPosInvalidate(so->currPos);
+
                return false;
        }
        else
                PredicateLockPage(rel, BufferGetBlockNumber(buf),
                                                  scan->xs_snapshot);
 
-       /* initialize moreLeft/moreRight appropriately for scan direction */
-       if (ScanDirectionIsForward(dir))
-       {
-               so->currPos.moreLeft = false;
-               so->currPos.moreRight = true;
-       }
-       else
-       {
-               so->currPos.moreLeft = true;
-               so->currPos.moreRight = false;
-       }
-       so->numKilled = 0;                      /* just paranoia */
-       Assert(so->markItemIndex == -1);
+       _bt_initialize_more_data(so, dir);
 
        /* position to the precise item on the page */
        offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey);
@@ -1060,6 +1101,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                _bt_drop_lock_and_maybe_pin(scan, &so->currPos);
        }
 
+readcomplete:
        /* OK, itemIndex says what to return */
        currItem = &so->currPos.items[so->currPos.itemIndex];
        scan->xs_ctup.t_self = currItem->heapTid;
@@ -1132,6 +1174,10 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
  * moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports
  * that there can be no more matching tuples in the current scan direction.
  *
+ * In the case of a parallel scan, caller must have called _bt_parallel_seize
+ * prior to calling this function; this function will invoke
+ * _bt_parallel_release before returning.
+ *
  * Returns true if any matching items found on the page, false if none.
  */
 static bool
@@ -1154,6 +1200,16 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 
        page = BufferGetPage(so->currPos.buf);
        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+       /* allow next page be processed by parallel worker */
+       if (scan->parallel_scan)
+       {
+               if (ScanDirectionIsForward(dir))
+                       _bt_parallel_release(scan, opaque->btpo_next);
+               else
+                       _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf));
+       }
+
        minoff = P_FIRSTDATAKEY(opaque);
        maxoff = PageGetMaxOffsetNumber(page);
 
@@ -1278,21 +1334,16 @@ _bt_saveitem(BTScanOpaque so, int itemIndex,
  * if pinned, we'll drop the pin before moving to next page.  The buffer is
  * not locked on entry.
  *
- * On success exit, so->currPos is updated to contain data from the next
- * interesting page.  For success on a scan using a non-MVCC snapshot we hold
- * a pin, but not a read lock, on that page.  If we do not hold the pin, we
- * set so->currPos.buf to InvalidBuffer.  We return TRUE to indicate success.
- *
- * If there are no more matching records in the given direction, we drop all
- * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
+ * For success on a scan using a non-MVCC snapshot we hold a pin, but not a
+ * read lock, on that page.  If we do not hold the pin, we set so->currPos.buf
+ * to InvalidBuffer.  We return TRUE to indicate success.
  */
 static bool
 _bt_steppage(IndexScanDesc scan, ScanDirection dir)
 {
        BTScanOpaque so = (BTScanOpaque) scan->opaque;
-       Relation        rel;
-       Page            page;
-       BTPageOpaque opaque;
+       BlockNumber blkno = InvalidBlockNumber;
+       bool            status = true;
 
        Assert(BTScanPosIsValid(so->currPos));
 
@@ -1319,25 +1370,103 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
                so->markItemIndex = -1;
        }
 
-       rel = scan->indexRelation;
-
        if (ScanDirectionIsForward(dir))
        {
                /* Walk right to the next page with data */
-               /* We must rely on the previously saved nextPage link! */
-               BlockNumber blkno = so->currPos.nextPage;
+               if (scan->parallel_scan != NULL)
+               {
+                       /*
+                        * Seize the scan to get the next block number; if the scan has
+                        * ended already, bail out.
+                        */
+                       status = _bt_parallel_seize(scan, &blkno);
+                       if (!status)
+                       {
+                               /* release the previous buffer, if pinned */
+                               BTScanPosUnpinIfPinned(so->currPos);
+                               BTScanPosInvalidate(so->currPos);
+                               return false;
+                       }
+               }
+               else
+               {
+                       /* Not parallel, so use the previously-saved nextPage link. */
+                       blkno = so->currPos.nextPage;
+               }
 
                /* Remember we left a page with data */
                so->currPos.moreLeft = true;
 
                /* release the previous buffer, if pinned */
                BTScanPosUnpinIfPinned(so->currPos);
+       }
+       else
+       {
+               /* Remember we left a page with data */
+               so->currPos.moreRight = true;
+
+               if (scan->parallel_scan != NULL)
+               {
+                       /*
+                        * Seize the scan to get the current block number; if the scan has
+                        * ended already, bail out.
+                        */
+                       status = _bt_parallel_seize(scan, &blkno);
+                       BTScanPosUnpinIfPinned(so->currPos);
+                       if (!status)
+                       {
+                               BTScanPosInvalidate(so->currPos);
+                               return false;
+                       }
+               }
+               else
+               {
+                       /* Not parallel, so just use our own notion of the current page */
+                       blkno = so->currPos.currPage;
+               }
+       }
+
+       if (!_bt_readnextpage(scan, blkno, dir))
+               return false;
+
+       /* Drop the lock, and maybe the pin, on the current page */
+       _bt_drop_lock_and_maybe_pin(scan, &so->currPos);
 
+       return true;
+}
+
+/*
+ *     _bt_readnextpage() -- Read next page containing valid data for scan
+ *
+ * On success exit, so->currPos is updated to contain data from the next
+ * interesting page.  Caller is responsible to release lock and pin on
+ * buffer on success.  We return TRUE to indicate success.
+ *
+ * If there are no more matching records in the given direction, we drop all
+ * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
+ */
+static bool
+_bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
+{
+       BTScanOpaque so = (BTScanOpaque) scan->opaque;
+       Relation        rel;
+       Page            page;
+       BTPageOpaque opaque;
+       bool            status = true;
+
+       rel = scan->indexRelation;
+
+       if (ScanDirectionIsForward(dir))
+       {
                for (;;)
                {
-                       /* if we're at end of scan, give up */
+                       /*
+                        * if we're at end of scan, give up and mark parallel scan as
+                        * done, so that all the workers can finish their scan
+                        */
                        if (blkno == P_NONE || !so->currPos.moreRight)
                        {
+                               _bt_parallel_done(scan);
                                BTScanPosInvalidate(so->currPos);
                                return false;
                        }
@@ -1359,14 +1488,32 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
                        }
 
                        /* nope, keep going */
-                       blkno = opaque->btpo_next;
+                       if (scan->parallel_scan != NULL)
+                       {
+                               status = _bt_parallel_seize(scan, &blkno);
+                               if (!status)
+                               {
+                                       _bt_relbuf(rel, so->currPos.buf);
+                                       BTScanPosInvalidate(so->currPos);
+                                       return false;
+                               }
+                       }
+                       else
+                               blkno = opaque->btpo_next;
                        _bt_relbuf(rel, so->currPos.buf);
                }
        }
        else
        {
-               /* Remember we left a page with data */
-               so->currPos.moreRight = true;
+               /*
+                * Should only happen in parallel cases, when some other backend
+                * advanced the scan.
+                */
+               if (so->currPos.currPage != blkno)
+               {
+                       BTScanPosUnpinIfPinned(so->currPos);
+                       so->currPos.currPage = blkno;
+               }
 
                /*
                 * Walk left to the next page with data.  This is much more complex
@@ -1401,6 +1548,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
                        if (!so->currPos.moreLeft)
                        {
                                _bt_relbuf(rel, so->currPos.buf);
+                               _bt_parallel_done(scan);
                                BTScanPosInvalidate(so->currPos);
                                return false;
                        }
@@ -1412,6 +1560,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
                        /* if we're physically at end of index, return failure */
                        if (so->currPos.buf == InvalidBuffer)
                        {
+                               _bt_parallel_done(scan);
                                BTScanPosInvalidate(so->currPos);
                                return false;
                        }
@@ -1432,9 +1581,46 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
                                if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
                                        break;
                        }
+
+                       /*
+                        * For parallel scans, get the last page scanned as it is quite
+                        * possible that by the time we try to seize the scan, some other
+                        * worker has already advanced the scan to a different page.  We
+                        * must continue based on the latest page scanned by any worker.
+                        */
+                       if (scan->parallel_scan != NULL)
+                       {
+                               _bt_relbuf(rel, so->currPos.buf);
+                               status = _bt_parallel_seize(scan, &blkno);
+                               if (!status)
+                               {
+                                       BTScanPosInvalidate(so->currPos);
+                                       return false;
+                               }
+                               so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
+                       }
                }
        }
 
+       return true;
+}
+
+/*
+ *     _bt_parallel_readpage() -- Read current page containing valid data for scan
+ *
+ * On success, release lock and maybe pin on buffer.  We return TRUE to
+ * indicate success.
+ */
+static bool
+_bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
+{
+       BTScanOpaque so = (BTScanOpaque) scan->opaque;
+
+       _bt_initialize_more_data(so, dir);
+
+       if (!_bt_readnextpage(scan, blkno, dir))
+               return false;
+
        /* Drop the lock, and maybe the pin, on the current page */
        _bt_drop_lock_and_maybe_pin(scan, &so->currPos);
 
@@ -1712,19 +1898,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
        /* remember which buffer we have pinned */
        so->currPos.buf = buf;
 
-       /* initialize moreLeft/moreRight appropriately for scan direction */
-       if (ScanDirectionIsForward(dir))
-       {
-               so->currPos.moreLeft = false;
-               so->currPos.moreRight = true;
-       }
-       else
-       {
-               so->currPos.moreLeft = true;
-               so->currPos.moreRight = false;
-       }
-       so->numKilled = 0;                      /* just paranoia */
-       so->markItemIndex = -1;         /* ditto */
+       _bt_initialize_more_data(so, dir);
 
        /*
         * Now load data from the first page of the scan.
@@ -1753,3 +1927,25 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
 
        return true;
 }
+
+/*
+ * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately
+ * for scan direction
+ */
+static inline void
+_bt_initialize_more_data(BTScanOpaque so, ScanDirection dir)
+{
+       /* initialize moreLeft/moreRight appropriately for scan direction */
+       if (ScanDirectionIsForward(dir))
+       {
+               so->currPos.moreLeft = false;
+               so->currPos.moreRight = true;
+       }
+       else
+       {
+               so->currPos.moreLeft = true;
+               so->currPos.moreRight = false;
+       }
+       so->numKilled = 0;                      /* just paranoia */
+       so->markItemIndex = -1;         /* ditto */
+}
index da0f330c9680d2b6773b760483945d64869c35b7..5b259a31d99a6e822b9bf5fee4616f9447dc19f6 100644 (file)
@@ -590,6 +590,10 @@ _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
                        break;
        }
 
+       /* advance parallel scan */
+       if (scan->parallel_scan != NULL)
+               _bt_parallel_advance_array_keys(scan);
+
        return found;
 }
 
index 7176cf1bbeb52de251c179650b34c02ea66bc673..ada374c0c4402da9e0cccaaf4baf2c7ccf8dd063 100644 (file)
@@ -3374,6 +3374,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
                case WAIT_EVENT_BGWORKER_STARTUP:
                        event_name = "BgWorkerStartup";
                        break;
+               case WAIT_EVENT_BTREE_PAGE:
+                       event_name = "BtreePage";
+                       break;
                case WAIT_EVENT_EXECUTE_GATHER:
                        event_name = "ExecuteGather";
                        break;
index 25a1dc818cf90e890f0eeee454184b7a7523561f..6289ffa9bd44fb75a5aa7313dfe8817893c9851a 100644 (file)
@@ -383,6 +383,8 @@ typedef struct BTScanOpaqueData
        ScanKey         arrayKeyData;   /* modified copy of scan->keyData */
        int                     numArrayKeys;   /* number of equality-type array keys (-1 if
                                                                 * there are any unsatisfiable array keys) */
+       int                     arrayKeyCount;  /* count indicating number of array scan keys
+                                                                * processed */
        BTArrayKeyInfo *arrayKeys;      /* info about each equality-type array key */
        MemoryContext arrayContext; /* scan-lifespan context for array data */
 
@@ -426,7 +428,7 @@ typedef BTScanOpaqueData *BTScanOpaque;
 #define SK_BT_NULLS_FIRST      (INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT)
 
 /*
- * prototypes for functions in nbtree.c (external entry points for btree)
+ * external entry points for btree, in nbtree.c
  */
 extern IndexBuildResult *btbuild(Relation heap, Relation index,
                struct IndexInfo *indexInfo);
@@ -436,10 +438,13 @@ extern bool btinsert(Relation rel, Datum *values, bool *isnull,
                 IndexUniqueCheck checkUnique,
                 struct IndexInfo *indexInfo);
 extern IndexScanDesc btbeginscan(Relation rel, int nkeys, int norderbys);
+extern Size btestimateparallelscan(void);
+extern void btinitparallelscan(void *target);
 extern bool btgettuple(IndexScanDesc scan, ScanDirection dir);
 extern int64 btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm);
 extern void btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
                 ScanKey orderbys, int norderbys);
+extern void btparallelrescan(IndexScanDesc scan);
 extern void btendscan(IndexScanDesc scan);
 extern void btmarkpos(IndexScanDesc scan);
 extern void btrestrpos(IndexScanDesc scan);
@@ -451,6 +456,14 @@ extern IndexBulkDeleteResult *btvacuumcleanup(IndexVacuumInfo *info,
                                IndexBulkDeleteResult *stats);
 extern bool btcanreturn(Relation index, int attno);
 
+/*
+ * prototypes for internal functions in nbtree.c
+ */
+extern bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno);
+extern void _bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page);
+extern void _bt_parallel_done(IndexScanDesc scan);
+extern void _bt_parallel_advance_array_keys(IndexScanDesc scan);
+
 /*
  * prototypes for functions in nbtinsert.c
  */
index de8225b9890a3b43ffc8f37d63c48b79e2c5bac9..8b710ecb24e2011e2b9861fc7d73c7e2466467b3 100644 (file)
@@ -780,6 +780,7 @@ typedef enum
 {
        WAIT_EVENT_BGWORKER_SHUTDOWN = PG_WAIT_IPC,
        WAIT_EVENT_BGWORKER_STARTUP,
+       WAIT_EVENT_BTREE_PAGE,
        WAIT_EVENT_EXECUTE_GATHER,
        WAIT_EVENT_MQ_INTERNAL,
        WAIT_EVENT_MQ_PUT_MESSAGE,
index c4235ae63a4e544bba1bec004128350adadbcd1b..9f876ae264f5afbd36d182eeecc3ce0e0a3daa22 100644 (file)
@@ -161,6 +161,9 @@ BTPageOpaque
 BTPageOpaqueData
 BTPageStat
 BTPageState
+BTParallelScanDesc
+BTParallelScanDescData
+BTPS_State
 BTScanOpaque
 BTScanOpaqueData
 BTScanPos