From 569174f1be92be93f5366212cc46960d28a5c5cd Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Wed, 15 Feb 2017 07:41:14 -0500 Subject: [PATCH] btree: Support parallel index scans. This isn't exposed to the optimizer or the executor yet; we'll add support for those things in a separate patch. But this puts the basic mechanism in place: several processes can attach to a parallel btree index scan, and each one will get a subset of the tuples that would have been produced by a non-parallel scan. Each index page becomes the responsibility of a single worker, which then returns all of the TIDs on that page. Rahila Syed, Amit Kapila, Robert Haas, reviewed and tested by Anastasia Lubennikova, Tushar Ahuja, and Haribabu Kommi. --- doc/src/sgml/monitoring.sgml | 6 +- src/backend/access/nbtree/nbtree.c | 259 ++++++++++++++++++++++- src/backend/access/nbtree/nbtsearch.c | 286 ++++++++++++++++++++++---- src/backend/access/nbtree/nbtutils.c | 4 + src/backend/postmaster/pgstat.c | 3 + src/include/access/nbtree.h | 15 +- src/include/pgstat.h | 1 + src/tools/pgindent/typedefs.list | 3 + 8 files changed, 527 insertions(+), 50 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 5b67defdb8..fad5cb05b9 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1207,7 +1207,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting in an extension. - IPC + IPC BgWorkerShutdown Waiting for background worker to shut down. @@ -1215,6 +1215,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser BgWorkerStartup Waiting for background worker to start up. + + BtreePage + Waiting for the page number needed to continue a parallel btree scan to become available. + ExecuteGather Waiting for activity from child process when executing Gather node. diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 945e563fcc..cbc575d5cf 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -23,6 +23,8 @@ #include "access/xlog.h" #include "catalog/index.h" #include "commands/vacuum.h" +#include "pgstat.h" +#include "storage/condition_variable.h" #include "storage/indexfsm.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -63,6 +65,45 @@ typedef struct MemoryContext pagedelcontext; } BTVacState; +/* + * BTPARALLEL_NOT_INITIALIZED indicates that the scan has not started. + * + * BTPARALLEL_ADVANCING indicates that some process is advancing the scan to + * a new page; others must wait. + * + * BTPARALLEL_IDLE indicates that no backend is currently advancing the scan + * to a new page; some process can start doing that. + * + * BTPARALLEL_DONE indicates that the scan is complete (including error exit). + * We reach this state once for every distinct combination of array keys. + */ +typedef enum +{ + BTPARALLEL_NOT_INITIALIZED, + BTPARALLEL_ADVANCING, + BTPARALLEL_IDLE, + BTPARALLEL_DONE +} BTPS_State; + +/* + * BTParallelScanDescData contains btree specific shared information required + * for parallel scan. + */ +typedef struct BTParallelScanDescData +{ + BlockNumber btps_scanPage; /* latest or next page to be scanned */ + BTPS_State btps_pageStatus;/* indicates whether next page is available + * for scan. see above for possible states of + * parallel scan. */ + int btps_arrayKeyCount; /* count indicating number of array + * scan keys processed by parallel + * scan */ + slock_t btps_mutex; /* protects above variables */ + ConditionVariable btps_cv; /* used to synchronize parallel scan */ +} BTParallelScanDescData; + +typedef struct BTParallelScanDescData *BTParallelScanDesc; + static void btbuildCallback(Relation index, HeapTuple htup, @@ -118,9 +159,9 @@ bthandler(PG_FUNCTION_ARGS) amroutine->amendscan = btendscan; amroutine->ammarkpos = btmarkpos; amroutine->amrestrpos = btrestrpos; - amroutine->amestimateparallelscan = NULL; - amroutine->aminitparallelscan = NULL; - amroutine->amparallelrescan = NULL; + amroutine->amestimateparallelscan = btestimateparallelscan; + amroutine->aminitparallelscan = btinitparallelscan; + amroutine->amparallelrescan = btparallelrescan; PG_RETURN_POINTER(amroutine); } @@ -491,6 +532,7 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, } so->markItemIndex = -1; + so->arrayKeyCount = 0; BTScanPosUnpinIfPinned(so->markPos); BTScanPosInvalidate(so->markPos); @@ -652,6 +694,217 @@ btrestrpos(IndexScanDesc scan) } } +/* + * btestimateparallelscan -- estimate storage for BTParallelScanDescData + */ +Size +btestimateparallelscan(void) +{ + return sizeof(BTParallelScanDescData); +} + +/* + * btinitparallelscan -- initialize BTParallelScanDesc for parallel btree scan + */ +void +btinitparallelscan(void *target) +{ + BTParallelScanDesc bt_target = (BTParallelScanDesc) target; + + SpinLockInit(&bt_target->btps_mutex); + bt_target->btps_scanPage = InvalidBlockNumber; + bt_target->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + bt_target->btps_arrayKeyCount = 0; + ConditionVariableInit(&bt_target->btps_cv); +} + +/* + * btparallelrescan() -- reset parallel scan + */ +void +btparallelrescan(IndexScanDesc scan) +{ + BTParallelScanDesc btscan; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + + Assert(parallel_scan); + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + /* + * In theory, we don't need to acquire the spinlock here, because there + * shouldn't be any other workers running at this point, but we do so for + * consistency. + */ + SpinLockAcquire(&btscan->btps_mutex); + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btps_arrayKeyCount = 0; + SpinLockRelease(&btscan->btps_mutex); +} + +/* + * _bt_parallel_seize() -- Begin the process of advancing the scan to a new + * page. Other scans must wait until we call bt_parallel_release() or + * bt_parallel_done(). + * + * The return value is true if we successfully seized the scan and false + * if we did not. The latter case occurs if no pages remain for the current + * set of scankeys. + * + * If the return value is true, *pageno returns the next or current page + * of the scan (depending on the scan direction). An invalid block number + * means the scan hasn't yet started, and P_NONE means we've reached the end. + * The first time a participating process reaches the last page, it will return + * true and set *pageno to P_NONE; after that, further attempts to seize the + * scan will return false. + * + * Callers should ignore the value of pageno if the return value is false. + */ +bool +_bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + BTPS_State pageStatus; + bool exit_loop = false; + bool status = true; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + *pageno = P_NONE; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + while (1) + { + SpinLockAcquire(&btscan->btps_mutex); + pageStatus = btscan->btps_pageStatus; + + if (so->arrayKeyCount < btscan->btps_arrayKeyCount) + { + /* Parallel scan has already advanced to a new set of scankeys. */ + status = false; + } + else if (pageStatus == BTPARALLEL_DONE) + { + /* + * We're done with this set of scankeys. This may be the end, or + * there could be more sets to try. + */ + status = false; + } + else if (pageStatus != BTPARALLEL_ADVANCING) + { + /* + * We have successfully seized control of the scan for the purpose + * of advancing it to a new page! + */ + btscan->btps_pageStatus = BTPARALLEL_ADVANCING; + *pageno = btscan->btps_scanPage; + exit_loop = true; + } + SpinLockRelease(&btscan->btps_mutex); + if (exit_loop || !status) + break; + ConditionVariableSleep(&btscan->btps_cv, WAIT_EVENT_BTREE_PAGE); + } + ConditionVariableCancelSleep(); + + return status; +} + +/* + * _bt_parallel_release() -- Complete the process of advancing the scan to a + * new page. We now have the new value btps_scanPage; some other backend + * can now begin advancing the scan. + */ +void +_bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page) +{ + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + SpinLockAcquire(&btscan->btps_mutex); + btscan->btps_scanPage = scan_page; + btscan->btps_pageStatus = BTPARALLEL_IDLE; + SpinLockRelease(&btscan->btps_mutex); + ConditionVariableSignal(&btscan->btps_cv); +} + +/* + * _bt_parallel_done() -- Mark the parallel scan as complete. + * + * When there are no pages left to scan, this function should be called to + * notify other workers. Otherwise, they might wait forever for the scan to + * advance to the next page. + */ +void +_bt_parallel_done(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + bool status_changed = false; + + /* Do nothing, for non-parallel scans */ + if (parallel_scan == NULL) + return; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + /* + * Mark the parallel scan as done for this combination of scan keys, + * unless some other process already did so. See also + * _bt_advance_array_keys. + */ + SpinLockAcquire(&btscan->btps_mutex); + if (so->arrayKeyCount >= btscan->btps_arrayKeyCount && + btscan->btps_pageStatus != BTPARALLEL_DONE) + { + btscan->btps_pageStatus = BTPARALLEL_DONE; + status_changed = true; + } + SpinLockRelease(&btscan->btps_mutex); + + /* wake up all the workers associated with this parallel scan */ + if (status_changed) + ConditionVariableBroadcast(&btscan->btps_cv); +} + +/* + * _bt_parallel_advance_array_keys() -- Advances the parallel scan for array + * keys. + * + * Updates the count of array keys processed for both local and parallel + * scans. + */ +void +_bt_parallel_advance_array_keys(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + so->arrayKeyCount++; + SpinLockAcquire(&btscan->btps_mutex); + if (btscan->btps_pageStatus == BTPARALLEL_DONE) + { + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btps_arrayKeyCount++; + } + SpinLockRelease(&btscan->btps_mutex); +} + /* * Bulk deletion of all index entries pointing to a set of heap tuples. * The set of target tuples is specified via a callback routine that tells diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c index b6459d2f2a..2f32b2e78d 100644 --- a/src/backend/access/nbtree/nbtsearch.c +++ b/src/backend/access/nbtree/nbtsearch.c @@ -30,9 +30,13 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, static void _bt_saveitem(BTScanOpaque so, int itemIndex, OffsetNumber offnum, IndexTuple itup); static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir); +static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); +static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, + ScanDirection dir); static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot); static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir); static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp); +static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir); /* @@ -544,8 +548,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) ScanKeyData notnullkeys[INDEX_MAX_KEYS]; int keysCount = 0; int i; + bool status = true; StrategyNumber strat_total; BTScanPosItem *currItem; + BlockNumber blkno; Assert(!BTScanPosIsValid(so->currPos)); @@ -564,6 +570,30 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) if (!so->qual_ok) return false; + /* + * For parallel scans, get the starting page from shared state. If the + * scan has not started, proceed to find out first leaf page in the usual + * way while keeping other participating processes waiting. If the scan + * has already begun, use the page number from the shared structure. + */ + if (scan->parallel_scan != NULL) + { + status = _bt_parallel_seize(scan, &blkno); + if (!status) + return false; + else if (blkno == P_NONE) + { + _bt_parallel_done(scan); + return false; + } + else if (blkno != InvalidBlockNumber) + { + if (!_bt_parallel_readpage(scan, blkno, dir)) + return false; + goto readcomplete; + } + } + /*---------- * Examine the scan keys to discover where we need to start the scan. * @@ -743,7 +773,19 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) * there. */ if (keysCount == 0) - return _bt_endpoint(scan, dir); + { + bool match; + + match = _bt_endpoint(scan, dir); + + if (!match) + { + /* No match, so mark (parallel) scan finished */ + _bt_parallel_done(scan); + } + + return match; + } /* * We want to start the scan somewhere within the index. Set up an @@ -773,7 +815,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) Assert(subkey->sk_flags & SK_ROW_MEMBER); if (subkey->sk_flags & SK_ISNULL) + { + _bt_parallel_done(scan); return false; + } memcpy(scankeys + i, subkey, sizeof(ScanKeyData)); /* @@ -993,25 +1038,21 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) * because nothing finer to lock exists. */ PredicateLockRelation(rel, scan->xs_snapshot); + + /* + * mark parallel scan as done, so that all the workers can finish + * their scan + */ + _bt_parallel_done(scan); + BTScanPosInvalidate(so->currPos); + return false; } else PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot); - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) - { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } - else - { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - Assert(so->markItemIndex == -1); + _bt_initialize_more_data(so, dir); /* position to the precise item on the page */ offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey); @@ -1060,6 +1101,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) _bt_drop_lock_and_maybe_pin(scan, &so->currPos); } +readcomplete: /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; scan->xs_ctup.t_self = currItem->heapTid; @@ -1132,6 +1174,10 @@ _bt_next(IndexScanDesc scan, ScanDirection dir) * moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports * that there can be no more matching tuples in the current scan direction. * + * In the case of a parallel scan, caller must have called _bt_parallel_seize + * prior to calling this function; this function will invoke + * _bt_parallel_release before returning. + * * Returns true if any matching items found on the page, false if none. */ static bool @@ -1154,6 +1200,16 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum) page = BufferGetPage(so->currPos.buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); + + /* allow next page be processed by parallel worker */ + if (scan->parallel_scan) + { + if (ScanDirectionIsForward(dir)) + _bt_parallel_release(scan, opaque->btpo_next); + else + _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf)); + } + minoff = P_FIRSTDATAKEY(opaque); maxoff = PageGetMaxOffsetNumber(page); @@ -1278,21 +1334,16 @@ _bt_saveitem(BTScanOpaque so, int itemIndex, * if pinned, we'll drop the pin before moving to next page. The buffer is * not locked on entry. * - * On success exit, so->currPos is updated to contain data from the next - * interesting page. For success on a scan using a non-MVCC snapshot we hold - * a pin, but not a read lock, on that page. If we do not hold the pin, we - * set so->currPos.buf to InvalidBuffer. We return TRUE to indicate success. - * - * If there are no more matching records in the given direction, we drop all - * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE. + * For success on a scan using a non-MVCC snapshot we hold a pin, but not a + * read lock, on that page. If we do not hold the pin, we set so->currPos.buf + * to InvalidBuffer. We return TRUE to indicate success. */ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) { BTScanOpaque so = (BTScanOpaque) scan->opaque; - Relation rel; - Page page; - BTPageOpaque opaque; + BlockNumber blkno = InvalidBlockNumber; + bool status = true; Assert(BTScanPosIsValid(so->currPos)); @@ -1319,25 +1370,103 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) so->markItemIndex = -1; } - rel = scan->indexRelation; - if (ScanDirectionIsForward(dir)) { /* Walk right to the next page with data */ - /* We must rely on the previously saved nextPage link! */ - BlockNumber blkno = so->currPos.nextPage; + if (scan->parallel_scan != NULL) + { + /* + * Seize the scan to get the next block number; if the scan has + * ended already, bail out. + */ + status = _bt_parallel_seize(scan, &blkno); + if (!status) + { + /* release the previous buffer, if pinned */ + BTScanPosUnpinIfPinned(so->currPos); + BTScanPosInvalidate(so->currPos); + return false; + } + } + else + { + /* Not parallel, so use the previously-saved nextPage link. */ + blkno = so->currPos.nextPage; + } /* Remember we left a page with data */ so->currPos.moreLeft = true; /* release the previous buffer, if pinned */ BTScanPosUnpinIfPinned(so->currPos); + } + else + { + /* Remember we left a page with data */ + so->currPos.moreRight = true; + + if (scan->parallel_scan != NULL) + { + /* + * Seize the scan to get the current block number; if the scan has + * ended already, bail out. + */ + status = _bt_parallel_seize(scan, &blkno); + BTScanPosUnpinIfPinned(so->currPos); + if (!status) + { + BTScanPosInvalidate(so->currPos); + return false; + } + } + else + { + /* Not parallel, so just use our own notion of the current page */ + blkno = so->currPos.currPage; + } + } + + if (!_bt_readnextpage(scan, blkno, dir)) + return false; + + /* Drop the lock, and maybe the pin, on the current page */ + _bt_drop_lock_and_maybe_pin(scan, &so->currPos); + return true; +} + +/* + * _bt_readnextpage() -- Read next page containing valid data for scan + * + * On success exit, so->currPos is updated to contain data from the next + * interesting page. Caller is responsible to release lock and pin on + * buffer on success. We return TRUE to indicate success. + * + * If there are no more matching records in the given direction, we drop all + * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE. + */ +static bool +_bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + Relation rel; + Page page; + BTPageOpaque opaque; + bool status = true; + + rel = scan->indexRelation; + + if (ScanDirectionIsForward(dir)) + { for (;;) { - /* if we're at end of scan, give up */ + /* + * if we're at end of scan, give up and mark parallel scan as + * done, so that all the workers can finish their scan + */ if (blkno == P_NONE || !so->currPos.moreRight) { + _bt_parallel_done(scan); BTScanPosInvalidate(so->currPos); return false; } @@ -1359,14 +1488,32 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) } /* nope, keep going */ - blkno = opaque->btpo_next; + if (scan->parallel_scan != NULL) + { + status = _bt_parallel_seize(scan, &blkno); + if (!status) + { + _bt_relbuf(rel, so->currPos.buf); + BTScanPosInvalidate(so->currPos); + return false; + } + } + else + blkno = opaque->btpo_next; _bt_relbuf(rel, so->currPos.buf); } } else { - /* Remember we left a page with data */ - so->currPos.moreRight = true; + /* + * Should only happen in parallel cases, when some other backend + * advanced the scan. + */ + if (so->currPos.currPage != blkno) + { + BTScanPosUnpinIfPinned(so->currPos); + so->currPos.currPage = blkno; + } /* * Walk left to the next page with data. This is much more complex @@ -1401,6 +1548,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) if (!so->currPos.moreLeft) { _bt_relbuf(rel, so->currPos.buf); + _bt_parallel_done(scan); BTScanPosInvalidate(so->currPos); return false; } @@ -1412,6 +1560,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) /* if we're physically at end of index, return failure */ if (so->currPos.buf == InvalidBuffer) { + _bt_parallel_done(scan); BTScanPosInvalidate(so->currPos); return false; } @@ -1432,9 +1581,46 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) break; } + + /* + * For parallel scans, get the last page scanned as it is quite + * possible that by the time we try to seize the scan, some other + * worker has already advanced the scan to a different page. We + * must continue based on the latest page scanned by any worker. + */ + if (scan->parallel_scan != NULL) + { + _bt_relbuf(rel, so->currPos.buf); + status = _bt_parallel_seize(scan, &blkno); + if (!status) + { + BTScanPosInvalidate(so->currPos); + return false; + } + so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); + } } } + return true; +} + +/* + * _bt_parallel_readpage() -- Read current page containing valid data for scan + * + * On success, release lock and maybe pin on buffer. We return TRUE to + * indicate success. + */ +static bool +_bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + + _bt_initialize_more_data(so, dir); + + if (!_bt_readnextpage(scan, blkno, dir)) + return false; + /* Drop the lock, and maybe the pin, on the current page */ _bt_drop_lock_and_maybe_pin(scan, &so->currPos); @@ -1712,19 +1898,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) /* remember which buffer we have pinned */ so->currPos.buf = buf; - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) - { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } - else - { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - so->markItemIndex = -1; /* ditto */ + _bt_initialize_more_data(so, dir); /* * Now load data from the first page of the scan. @@ -1753,3 +1927,25 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) return true; } + +/* + * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately + * for scan direction + */ +static inline void +_bt_initialize_more_data(BTScanOpaque so, ScanDirection dir) +{ + /* initialize moreLeft/moreRight appropriately for scan direction */ + if (ScanDirectionIsForward(dir)) + { + so->currPos.moreLeft = false; + so->currPos.moreRight = true; + } + else + { + so->currPos.moreLeft = true; + so->currPos.moreRight = false; + } + so->numKilled = 0; /* just paranoia */ + so->markItemIndex = -1; /* ditto */ +} diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index da0f330c96..5b259a31d9 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -590,6 +590,10 @@ _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir) break; } + /* advance parallel scan */ + if (scan->parallel_scan != NULL) + _bt_parallel_advance_array_keys(scan); + return found; } diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 7176cf1bbe..ada374c0c4 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3374,6 +3374,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_BGWORKER_STARTUP: event_name = "BgWorkerStartup"; break; + case WAIT_EVENT_BTREE_PAGE: + event_name = "BtreePage"; + break; case WAIT_EVENT_EXECUTE_GATHER: event_name = "ExecuteGather"; break; diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index 25a1dc818c..6289ffa9bd 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -383,6 +383,8 @@ typedef struct BTScanOpaqueData ScanKey arrayKeyData; /* modified copy of scan->keyData */ int numArrayKeys; /* number of equality-type array keys (-1 if * there are any unsatisfiable array keys) */ + int arrayKeyCount; /* count indicating number of array scan keys + * processed */ BTArrayKeyInfo *arrayKeys; /* info about each equality-type array key */ MemoryContext arrayContext; /* scan-lifespan context for array data */ @@ -426,7 +428,7 @@ typedef BTScanOpaqueData *BTScanOpaque; #define SK_BT_NULLS_FIRST (INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT) /* - * prototypes for functions in nbtree.c (external entry points for btree) + * external entry points for btree, in nbtree.c */ extern IndexBuildResult *btbuild(Relation heap, Relation index, struct IndexInfo *indexInfo); @@ -436,10 +438,13 @@ extern bool btinsert(Relation rel, Datum *values, bool *isnull, IndexUniqueCheck checkUnique, struct IndexInfo *indexInfo); extern IndexScanDesc btbeginscan(Relation rel, int nkeys, int norderbys); +extern Size btestimateparallelscan(void); +extern void btinitparallelscan(void *target); extern bool btgettuple(IndexScanDesc scan, ScanDirection dir); extern int64 btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm); extern void btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, ScanKey orderbys, int norderbys); +extern void btparallelrescan(IndexScanDesc scan); extern void btendscan(IndexScanDesc scan); extern void btmarkpos(IndexScanDesc scan); extern void btrestrpos(IndexScanDesc scan); @@ -451,6 +456,14 @@ extern IndexBulkDeleteResult *btvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats); extern bool btcanreturn(Relation index, int attno); +/* + * prototypes for internal functions in nbtree.c + */ +extern bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno); +extern void _bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page); +extern void _bt_parallel_done(IndexScanDesc scan); +extern void _bt_parallel_advance_array_keys(IndexScanDesc scan); + /* * prototypes for functions in nbtinsert.c */ diff --git a/src/include/pgstat.h b/src/include/pgstat.h index de8225b989..8b710ecb24 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -780,6 +780,7 @@ typedef enum { WAIT_EVENT_BGWORKER_SHUTDOWN = PG_WAIT_IPC, WAIT_EVENT_BGWORKER_STARTUP, + WAIT_EVENT_BTREE_PAGE, WAIT_EVENT_EXECUTE_GATHER, WAIT_EVENT_MQ_INTERNAL, WAIT_EVENT_MQ_PUT_MESSAGE, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c4235ae63a..9f876ae264 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -161,6 +161,9 @@ BTPageOpaque BTPageOpaqueData BTPageStat BTPageState +BTParallelScanDesc +BTParallelScanDescData +BTPS_State BTScanOpaque BTScanOpaqueData BTScanPos -- 2.40.0