#include "access/xlog.h"
#include "catalog/index.h"
#include "commands/vacuum.h"
+#include "pgstat.h"
+#include "storage/condition_variable.h"
#include "storage/indexfsm.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
MemoryContext pagedelcontext;
} BTVacState;
+/*
+ * BTPARALLEL_NOT_INITIALIZED indicates that the scan has not started.
+ *
+ * BTPARALLEL_ADVANCING indicates that some process is advancing the scan to
+ * a new page; others must wait.
+ *
+ * BTPARALLEL_IDLE indicates that no backend is currently advancing the scan
+ * to a new page; some process can start doing that.
+ *
+ * BTPARALLEL_DONE indicates that the scan is complete (including error exit).
+ * We reach this state once for every distinct combination of array keys.
+ */
+typedef enum
+{
+ BTPARALLEL_NOT_INITIALIZED,
+ BTPARALLEL_ADVANCING,
+ BTPARALLEL_IDLE,
+ BTPARALLEL_DONE
+} BTPS_State;
+
+/*
+ * BTParallelScanDescData contains btree specific shared information required
+ * for parallel scan.
+ */
+typedef struct BTParallelScanDescData
+{
+ BlockNumber btps_scanPage; /* latest or next page to be scanned */
+ BTPS_State btps_pageStatus;/* indicates whether next page is available
+ * for scan. see above for possible states of
+ * parallel scan. */
+ int btps_arrayKeyCount; /* count indicating number of array
+ * scan keys processed by parallel
+ * scan */
+ slock_t btps_mutex; /* protects above variables */
+ ConditionVariable btps_cv; /* used to synchronize parallel scan */
+} BTParallelScanDescData;
+
+typedef struct BTParallelScanDescData *BTParallelScanDesc;
+
static void btbuildCallback(Relation index,
HeapTuple htup,
amroutine->amendscan = btendscan;
amroutine->ammarkpos = btmarkpos;
amroutine->amrestrpos = btrestrpos;
- amroutine->amestimateparallelscan = NULL;
- amroutine->aminitparallelscan = NULL;
- amroutine->amparallelrescan = NULL;
+ amroutine->amestimateparallelscan = btestimateparallelscan;
+ amroutine->aminitparallelscan = btinitparallelscan;
+ amroutine->amparallelrescan = btparallelrescan;
PG_RETURN_POINTER(amroutine);
}
}
so->markItemIndex = -1;
+ so->arrayKeyCount = 0;
BTScanPosUnpinIfPinned(so->markPos);
BTScanPosInvalidate(so->markPos);
}
}
+/*
+ * btestimateparallelscan -- estimate storage for BTParallelScanDescData
+ */
+Size
+btestimateparallelscan(void)
+{
+ return sizeof(BTParallelScanDescData);
+}
+
+/*
+ * btinitparallelscan -- initialize BTParallelScanDesc for parallel btree scan
+ */
+void
+btinitparallelscan(void *target)
+{
+ BTParallelScanDesc bt_target = (BTParallelScanDesc) target;
+
+ SpinLockInit(&bt_target->btps_mutex);
+ bt_target->btps_scanPage = InvalidBlockNumber;
+ bt_target->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+ bt_target->btps_arrayKeyCount = 0;
+ ConditionVariableInit(&bt_target->btps_cv);
+}
+
+/*
+ * btparallelrescan() -- reset parallel scan
+ */
+void
+btparallelrescan(IndexScanDesc scan)
+{
+ BTParallelScanDesc btscan;
+ ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+
+ Assert(parallel_scan);
+
+ btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+ parallel_scan->ps_offset);
+
+ /*
+ * In theory, we don't need to acquire the spinlock here, because there
+ * shouldn't be any other workers running at this point, but we do so for
+ * consistency.
+ */
+ SpinLockAcquire(&btscan->btps_mutex);
+ btscan->btps_scanPage = InvalidBlockNumber;
+ btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+ btscan->btps_arrayKeyCount = 0;
+ SpinLockRelease(&btscan->btps_mutex);
+}
+
+/*
+ * _bt_parallel_seize() -- Begin the process of advancing the scan to a new
+ * page. Other scans must wait until we call bt_parallel_release() or
+ * bt_parallel_done().
+ *
+ * The return value is true if we successfully seized the scan and false
+ * if we did not. The latter case occurs if no pages remain for the current
+ * set of scankeys.
+ *
+ * If the return value is true, *pageno returns the next or current page
+ * of the scan (depending on the scan direction). An invalid block number
+ * means the scan hasn't yet started, and P_NONE means we've reached the end.
+ * The first time a participating process reaches the last page, it will return
+ * true and set *pageno to P_NONE; after that, further attempts to seize the
+ * scan will return false.
+ *
+ * Callers should ignore the value of pageno if the return value is false.
+ */
+bool
+_bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ BTPS_State pageStatus;
+ bool exit_loop = false;
+ bool status = true;
+ ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+ BTParallelScanDesc btscan;
+
+ *pageno = P_NONE;
+
+ btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+ parallel_scan->ps_offset);
+
+ while (1)
+ {
+ SpinLockAcquire(&btscan->btps_mutex);
+ pageStatus = btscan->btps_pageStatus;
+
+ if (so->arrayKeyCount < btscan->btps_arrayKeyCount)
+ {
+ /* Parallel scan has already advanced to a new set of scankeys. */
+ status = false;
+ }
+ else if (pageStatus == BTPARALLEL_DONE)
+ {
+ /*
+ * We're done with this set of scankeys. This may be the end, or
+ * there could be more sets to try.
+ */
+ status = false;
+ }
+ else if (pageStatus != BTPARALLEL_ADVANCING)
+ {
+ /*
+ * We have successfully seized control of the scan for the purpose
+ * of advancing it to a new page!
+ */
+ btscan->btps_pageStatus = BTPARALLEL_ADVANCING;
+ *pageno = btscan->btps_scanPage;
+ exit_loop = true;
+ }
+ SpinLockRelease(&btscan->btps_mutex);
+ if (exit_loop || !status)
+ break;
+ ConditionVariableSleep(&btscan->btps_cv, WAIT_EVENT_BTREE_PAGE);
+ }
+ ConditionVariableCancelSleep();
+
+ return status;
+}
+
+/*
+ * _bt_parallel_release() -- Complete the process of advancing the scan to a
+ * new page. We now have the new value btps_scanPage; some other backend
+ * can now begin advancing the scan.
+ */
+void
+_bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page)
+{
+ ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+ BTParallelScanDesc btscan;
+
+ btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+ parallel_scan->ps_offset);
+
+ SpinLockAcquire(&btscan->btps_mutex);
+ btscan->btps_scanPage = scan_page;
+ btscan->btps_pageStatus = BTPARALLEL_IDLE;
+ SpinLockRelease(&btscan->btps_mutex);
+ ConditionVariableSignal(&btscan->btps_cv);
+}
+
+/*
+ * _bt_parallel_done() -- Mark the parallel scan as complete.
+ *
+ * When there are no pages left to scan, this function should be called to
+ * notify other workers. Otherwise, they might wait forever for the scan to
+ * advance to the next page.
+ */
+void
+_bt_parallel_done(IndexScanDesc scan)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+ BTParallelScanDesc btscan;
+ bool status_changed = false;
+
+ /* Do nothing, for non-parallel scans */
+ if (parallel_scan == NULL)
+ return;
+
+ btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+ parallel_scan->ps_offset);
+
+ /*
+ * Mark the parallel scan as done for this combination of scan keys,
+ * unless some other process already did so. See also
+ * _bt_advance_array_keys.
+ */
+ SpinLockAcquire(&btscan->btps_mutex);
+ if (so->arrayKeyCount >= btscan->btps_arrayKeyCount &&
+ btscan->btps_pageStatus != BTPARALLEL_DONE)
+ {
+ btscan->btps_pageStatus = BTPARALLEL_DONE;
+ status_changed = true;
+ }
+ SpinLockRelease(&btscan->btps_mutex);
+
+ /* wake up all the workers associated with this parallel scan */
+ if (status_changed)
+ ConditionVariableBroadcast(&btscan->btps_cv);
+}
+
+/*
+ * _bt_parallel_advance_array_keys() -- Advances the parallel scan for array
+ * keys.
+ *
+ * Updates the count of array keys processed for both local and parallel
+ * scans.
+ */
+void
+_bt_parallel_advance_array_keys(IndexScanDesc scan)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+ BTParallelScanDesc btscan;
+
+ btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+ parallel_scan->ps_offset);
+
+ so->arrayKeyCount++;
+ SpinLockAcquire(&btscan->btps_mutex);
+ if (btscan->btps_pageStatus == BTPARALLEL_DONE)
+ {
+ btscan->btps_scanPage = InvalidBlockNumber;
+ btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+ btscan->btps_arrayKeyCount++;
+ }
+ SpinLockRelease(&btscan->btps_mutex);
+}
+
/*
* Bulk deletion of all index entries pointing to a set of heap tuples.
* The set of target tuples is specified via a callback routine that tells
static void _bt_saveitem(BTScanOpaque so, int itemIndex,
OffsetNumber offnum, IndexTuple itup);
static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
+static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir);
+static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno,
+ ScanDirection dir);
static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot);
static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp);
+static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir);
/*
ScanKeyData notnullkeys[INDEX_MAX_KEYS];
int keysCount = 0;
int i;
+ bool status = true;
StrategyNumber strat_total;
BTScanPosItem *currItem;
+ BlockNumber blkno;
Assert(!BTScanPosIsValid(so->currPos));
if (!so->qual_ok)
return false;
+ /*
+ * For parallel scans, get the starting page from shared state. If the
+ * scan has not started, proceed to find out first leaf page in the usual
+ * way while keeping other participating processes waiting. If the scan
+ * has already begun, use the page number from the shared structure.
+ */
+ if (scan->parallel_scan != NULL)
+ {
+ status = _bt_parallel_seize(scan, &blkno);
+ if (!status)
+ return false;
+ else if (blkno == P_NONE)
+ {
+ _bt_parallel_done(scan);
+ return false;
+ }
+ else if (blkno != InvalidBlockNumber)
+ {
+ if (!_bt_parallel_readpage(scan, blkno, dir))
+ return false;
+ goto readcomplete;
+ }
+ }
+
/*----------
* Examine the scan keys to discover where we need to start the scan.
*
* there.
*/
if (keysCount == 0)
- return _bt_endpoint(scan, dir);
+ {
+ bool match;
+
+ match = _bt_endpoint(scan, dir);
+
+ if (!match)
+ {
+ /* No match, so mark (parallel) scan finished */
+ _bt_parallel_done(scan);
+ }
+
+ return match;
+ }
/*
* We want to start the scan somewhere within the index. Set up an
Assert(subkey->sk_flags & SK_ROW_MEMBER);
if (subkey->sk_flags & SK_ISNULL)
+ {
+ _bt_parallel_done(scan);
return false;
+ }
memcpy(scankeys + i, subkey, sizeof(ScanKeyData));
/*
* because nothing finer to lock exists.
*/
PredicateLockRelation(rel, scan->xs_snapshot);
+
+ /*
+ * mark parallel scan as done, so that all the workers can finish
+ * their scan
+ */
+ _bt_parallel_done(scan);
+ BTScanPosInvalidate(so->currPos);
+
return false;
}
else
PredicateLockPage(rel, BufferGetBlockNumber(buf),
scan->xs_snapshot);
- /* initialize moreLeft/moreRight appropriately for scan direction */
- if (ScanDirectionIsForward(dir))
- {
- so->currPos.moreLeft = false;
- so->currPos.moreRight = true;
- }
- else
- {
- so->currPos.moreLeft = true;
- so->currPos.moreRight = false;
- }
- so->numKilled = 0; /* just paranoia */
- Assert(so->markItemIndex == -1);
+ _bt_initialize_more_data(so, dir);
/* position to the precise item on the page */
offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey);
_bt_drop_lock_and_maybe_pin(scan, &so->currPos);
}
+readcomplete:
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
scan->xs_ctup.t_self = currItem->heapTid;
* moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports
* that there can be no more matching tuples in the current scan direction.
*
+ * In the case of a parallel scan, caller must have called _bt_parallel_seize
+ * prior to calling this function; this function will invoke
+ * _bt_parallel_release before returning.
+ *
* Returns true if any matching items found on the page, false if none.
*/
static bool
page = BufferGetPage(so->currPos.buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ /* allow next page be processed by parallel worker */
+ if (scan->parallel_scan)
+ {
+ if (ScanDirectionIsForward(dir))
+ _bt_parallel_release(scan, opaque->btpo_next);
+ else
+ _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf));
+ }
+
minoff = P_FIRSTDATAKEY(opaque);
maxoff = PageGetMaxOffsetNumber(page);
* if pinned, we'll drop the pin before moving to next page. The buffer is
* not locked on entry.
*
- * On success exit, so->currPos is updated to contain data from the next
- * interesting page. For success on a scan using a non-MVCC snapshot we hold
- * a pin, but not a read lock, on that page. If we do not hold the pin, we
- * set so->currPos.buf to InvalidBuffer. We return TRUE to indicate success.
- *
- * If there are no more matching records in the given direction, we drop all
- * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
+ * For success on a scan using a non-MVCC snapshot we hold a pin, but not a
+ * read lock, on that page. If we do not hold the pin, we set so->currPos.buf
+ * to InvalidBuffer. We return TRUE to indicate success.
*/
static bool
_bt_steppage(IndexScanDesc scan, ScanDirection dir)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
- Relation rel;
- Page page;
- BTPageOpaque opaque;
+ BlockNumber blkno = InvalidBlockNumber;
+ bool status = true;
Assert(BTScanPosIsValid(so->currPos));
so->markItemIndex = -1;
}
- rel = scan->indexRelation;
-
if (ScanDirectionIsForward(dir))
{
/* Walk right to the next page with data */
- /* We must rely on the previously saved nextPage link! */
- BlockNumber blkno = so->currPos.nextPage;
+ if (scan->parallel_scan != NULL)
+ {
+ /*
+ * Seize the scan to get the next block number; if the scan has
+ * ended already, bail out.
+ */
+ status = _bt_parallel_seize(scan, &blkno);
+ if (!status)
+ {
+ /* release the previous buffer, if pinned */
+ BTScanPosUnpinIfPinned(so->currPos);
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ }
+ else
+ {
+ /* Not parallel, so use the previously-saved nextPage link. */
+ blkno = so->currPos.nextPage;
+ }
/* Remember we left a page with data */
so->currPos.moreLeft = true;
/* release the previous buffer, if pinned */
BTScanPosUnpinIfPinned(so->currPos);
+ }
+ else
+ {
+ /* Remember we left a page with data */
+ so->currPos.moreRight = true;
+
+ if (scan->parallel_scan != NULL)
+ {
+ /*
+ * Seize the scan to get the current block number; if the scan has
+ * ended already, bail out.
+ */
+ status = _bt_parallel_seize(scan, &blkno);
+ BTScanPosUnpinIfPinned(so->currPos);
+ if (!status)
+ {
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ }
+ else
+ {
+ /* Not parallel, so just use our own notion of the current page */
+ blkno = so->currPos.currPage;
+ }
+ }
+
+ if (!_bt_readnextpage(scan, blkno, dir))
+ return false;
+
+ /* Drop the lock, and maybe the pin, on the current page */
+ _bt_drop_lock_and_maybe_pin(scan, &so->currPos);
+ return true;
+}
+
+/*
+ * _bt_readnextpage() -- Read next page containing valid data for scan
+ *
+ * On success exit, so->currPos is updated to contain data from the next
+ * interesting page. Caller is responsible to release lock and pin on
+ * buffer on success. We return TRUE to indicate success.
+ *
+ * If there are no more matching records in the given direction, we drop all
+ * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
+ */
+static bool
+_bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ Relation rel;
+ Page page;
+ BTPageOpaque opaque;
+ bool status = true;
+
+ rel = scan->indexRelation;
+
+ if (ScanDirectionIsForward(dir))
+ {
for (;;)
{
- /* if we're at end of scan, give up */
+ /*
+ * if we're at end of scan, give up and mark parallel scan as
+ * done, so that all the workers can finish their scan
+ */
if (blkno == P_NONE || !so->currPos.moreRight)
{
+ _bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos);
return false;
}
}
/* nope, keep going */
- blkno = opaque->btpo_next;
+ if (scan->parallel_scan != NULL)
+ {
+ status = _bt_parallel_seize(scan, &blkno);
+ if (!status)
+ {
+ _bt_relbuf(rel, so->currPos.buf);
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ }
+ else
+ blkno = opaque->btpo_next;
_bt_relbuf(rel, so->currPos.buf);
}
}
else
{
- /* Remember we left a page with data */
- so->currPos.moreRight = true;
+ /*
+ * Should only happen in parallel cases, when some other backend
+ * advanced the scan.
+ */
+ if (so->currPos.currPage != blkno)
+ {
+ BTScanPosUnpinIfPinned(so->currPos);
+ so->currPos.currPage = blkno;
+ }
/*
* Walk left to the next page with data. This is much more complex
if (!so->currPos.moreLeft)
{
_bt_relbuf(rel, so->currPos.buf);
+ _bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos);
return false;
}
/* if we're physically at end of index, return failure */
if (so->currPos.buf == InvalidBuffer)
{
+ _bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos);
return false;
}
if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
break;
}
+
+ /*
+ * For parallel scans, get the last page scanned as it is quite
+ * possible that by the time we try to seize the scan, some other
+ * worker has already advanced the scan to a different page. We
+ * must continue based on the latest page scanned by any worker.
+ */
+ if (scan->parallel_scan != NULL)
+ {
+ _bt_relbuf(rel, so->currPos.buf);
+ status = _bt_parallel_seize(scan, &blkno);
+ if (!status)
+ {
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
+ }
}
}
+ return true;
+}
+
+/*
+ * _bt_parallel_readpage() -- Read current page containing valid data for scan
+ *
+ * On success, release lock and maybe pin on buffer. We return TRUE to
+ * indicate success.
+ */
+static bool
+_bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+
+ _bt_initialize_more_data(so, dir);
+
+ if (!_bt_readnextpage(scan, blkno, dir))
+ return false;
+
/* Drop the lock, and maybe the pin, on the current page */
_bt_drop_lock_and_maybe_pin(scan, &so->currPos);
/* remember which buffer we have pinned */
so->currPos.buf = buf;
- /* initialize moreLeft/moreRight appropriately for scan direction */
- if (ScanDirectionIsForward(dir))
- {
- so->currPos.moreLeft = false;
- so->currPos.moreRight = true;
- }
- else
- {
- so->currPos.moreLeft = true;
- so->currPos.moreRight = false;
- }
- so->numKilled = 0; /* just paranoia */
- so->markItemIndex = -1; /* ditto */
+ _bt_initialize_more_data(so, dir);
/*
* Now load data from the first page of the scan.
return true;
}
+
+/*
+ * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately
+ * for scan direction
+ */
+static inline void
+_bt_initialize_more_data(BTScanOpaque so, ScanDirection dir)
+{
+ /* initialize moreLeft/moreRight appropriately for scan direction */
+ if (ScanDirectionIsForward(dir))
+ {
+ so->currPos.moreLeft = false;
+ so->currPos.moreRight = true;
+ }
+ else
+ {
+ so->currPos.moreLeft = true;
+ so->currPos.moreRight = false;
+ }
+ so->numKilled = 0; /* just paranoia */
+ so->markItemIndex = -1; /* ditto */
+}