static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node);
static void bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres);
+static inline void BitmapDoneInitializingSharedState(
+ ParallelBitmapHeapState *pstate);
static inline void BitmapAdjustPrefetchIterator(BitmapHeapScanState *node,
TBMIterateResult *tbmres);
static inline void BitmapAdjustPrefetchTarget(BitmapHeapScanState *node);
static inline void BitmapPrefetch(BitmapHeapScanState *node,
HeapScanDesc scan);
+static bool BitmapShouldInitializeSharedState(
+ ParallelBitmapHeapState *pstate);
/* ----------------------------------------------------------------
HeapScanDesc scan;
TIDBitmap *tbm;
TBMIterator *tbmiterator;
+ TBMSharedIterator *shared_tbmiterator;
TBMIterateResult *tbmres;
OffsetNumber targoffset;
TupleTableSlot *slot;
+ ParallelBitmapHeapState *pstate = node->pstate;
+ dsa_area *dsa = node->ss.ps.state->es_query_dsa;
/*
* extract necessary information from index scan node
slot = node->ss.ss_ScanTupleSlot;
scan = node->ss.ss_currentScanDesc;
tbm = node->tbm;
- tbmiterator = node->tbmiterator;
+ if (pstate == NULL)
+ tbmiterator = node->tbmiterator;
+ else
+ shared_tbmiterator = node->shared_tbmiterator;
tbmres = node->tbmres;
/*
* node->prefetch_maximum. This is to avoid doing a lot of prefetching in
* a scan that stops after a few tuples because of a LIMIT.
*/
- if (tbm == NULL)
+ if (!node->initialized)
{
- tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+ if (!pstate)
+ {
+ tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
- if (!tbm || !IsA(tbm, TIDBitmap))
- elog(ERROR, "unrecognized result from subplan");
+ if (!tbm || !IsA(tbm, TIDBitmap))
+ elog(ERROR, "unrecognized result from subplan");
- node->tbm = tbm;
- node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm);
- node->tbmres = tbmres = NULL;
+ node->tbm = tbm;
+ node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm);
+ node->tbmres = tbmres = NULL;
#ifdef USE_PREFETCH
- if (node->prefetch_maximum > 0)
- {
- node->prefetch_iterator = tbm_begin_iterate(tbm);
- node->prefetch_pages = 0;
- node->prefetch_target = -1;
+ if (node->prefetch_maximum > 0)
+ {
+ node->prefetch_iterator = tbm_begin_iterate(tbm);
+ node->prefetch_pages = 0;
+ node->prefetch_target = -1;
+ }
+#endif /* USE_PREFETCH */
}
+ else
+ {
+ /*
+ * The leader will immediately come out of the function, but
+ * others will be blocked until leader populates the TBM and wakes
+ * them up.
+ */
+ if (BitmapShouldInitializeSharedState(pstate))
+ {
+ tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+ if (!tbm || !IsA(tbm, TIDBitmap))
+ elog(ERROR, "unrecognized result from subplan");
+
+ node->tbm = tbm;
+
+ /*
+ * Prepare to iterate over the TBM. This will return the
+ * dsa_pointer of the iterator state which will be used by
+ * multiple processes to iterate jointly.
+ */
+ pstate->tbmiterator = tbm_prepare_shared_iterate(tbm);
+#ifdef USE_PREFETCH
+ if (node->prefetch_maximum > 0)
+ {
+ pstate->prefetch_iterator =
+ tbm_prepare_shared_iterate(tbm);
+
+ /*
+ * We don't need the mutex here as we haven't yet woke up
+ * others.
+ */
+ pstate->prefetch_pages = 0;
+ pstate->prefetch_target = -1;
+ }
+#endif
+
+ /* We have initialized the shared state so wake up others. */
+ BitmapDoneInitializingSharedState(pstate);
+ }
+
+ /* Allocate a private iterator and attach the shared state to it */
+ node->shared_tbmiterator = shared_tbmiterator =
+ tbm_attach_shared_iterate(dsa, pstate->tbmiterator);
+ node->tbmres = tbmres = NULL;
+
+#ifdef USE_PREFETCH
+ if (node->prefetch_maximum > 0)
+ {
+ node->shared_prefetch_iterator =
+ tbm_attach_shared_iterate(dsa, pstate->prefetch_iterator);
+ }
#endif /* USE_PREFETCH */
+ }
+ node->initialized = true;
}
for (;;)
*/
if (tbmres == NULL)
{
- node->tbmres = tbmres = tbm_iterate(tbmiterator);
+ if (!pstate)
+ node->tbmres = tbmres = tbm_iterate(tbmiterator);
+ else
+ node->tbmres = tbmres = tbm_shared_iterate(shared_tbmiterator);
if (tbmres == NULL)
{
/* no more entries in the bitmap */
* Try to prefetch at least a few pages even before we get to the
* second page if we don't stop reading after the first tuple.
*/
- if (node->prefetch_target < node->prefetch_maximum)
- node->prefetch_target++;
+ if (!pstate)
+ {
+ if (node->prefetch_target < node->prefetch_maximum)
+ node->prefetch_target++;
+ }
+ else if (pstate->prefetch_target < node->prefetch_maximum)
+ {
+ /* take spinlock while updating shared state */
+ SpinLockAcquire(&pstate->mutex);
+ if (pstate->prefetch_target < node->prefetch_maximum)
+ pstate->prefetch_target++;
+ SpinLockRelease(&pstate->mutex);
+ }
#endif /* USE_PREFETCH */
}
scan->rs_ntuples = ntup;
}
+/*
+ * BitmapDoneInitializingSharedState - Shared state is initialized
+ *
+ * By this time the leader has already populated the TBM and initialized the
+ * shared state so wake up other processes.
+ */
+static inline void
+BitmapDoneInitializingSharedState(ParallelBitmapHeapState *pstate)
+{
+ SpinLockAcquire(&pstate->mutex);
+ pstate->state = BM_FINISHED;
+ SpinLockRelease(&pstate->mutex);
+ ConditionVariableBroadcast(&pstate->cv);
+}
+
/*
* BitmapAdjustPrefetchIterator - Adjust the prefetch iterator
*/
TBMIterateResult *tbmres)
{
#ifdef USE_PREFETCH
- TBMIterator *prefetch_iterator = node->prefetch_iterator;
+ ParallelBitmapHeapState *pstate = node->pstate;
- if (node->prefetch_pages > 0)
+ if (pstate == NULL)
{
- /* The main iterator has closed the distance by one page */
- node->prefetch_pages--;
+ TBMIterator *prefetch_iterator = node->prefetch_iterator;
+
+ if (node->prefetch_pages > 0)
+ {
+ /* The main iterator has closed the distance by one page */
+ node->prefetch_pages--;
+ }
+ else if (prefetch_iterator)
+ {
+ /* Do not let the prefetch iterator get behind the main one */
+ TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+
+ if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
+ elog(ERROR, "prefetch and main iterators are out of sync");
+ }
+ return;
}
- else if (prefetch_iterator)
+
+ if (node->prefetch_maximum > 0)
{
- /* Do not let the prefetch iterator get behind the main one */
- TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+ TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
+
+ SpinLockAcquire(&pstate->mutex);
+ if (pstate->prefetch_pages > 0)
+ {
+ node->prefetch_pages--;
+ SpinLockRelease(&pstate->mutex);
+ }
+ else
+ {
+ /* Release the mutex before iterating */
+ SpinLockRelease(&pstate->mutex);
- if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
- elog(ERROR, "prefetch and main iterators are out of sync");
+ /*
+ * In case of shared mode, we can not ensure that the current
+ * blockno of the main iterator and that of the prefetch iterator
+ * are same. It's possible that whatever blockno we are
+ * prefetching will be processed by another process. Therefore, we
+ * don't validate the blockno here as we do in non-parallel case.
+ */
+ if (prefetch_iterator)
+ tbm_shared_iterate(prefetch_iterator);
+ }
}
#endif /* USE_PREFETCH */
}
BitmapAdjustPrefetchTarget(BitmapHeapScanState *node)
{
#ifdef USE_PREFETCH
- if (node->prefetch_target >= node->prefetch_maximum)
- /* don't increase any further */ ;
- else if (node->prefetch_target >= node->prefetch_maximum / 2)
- node->prefetch_target = node->prefetch_maximum;
- else if (node->prefetch_target > 0)
- node->prefetch_target *= 2;
- else
- node->prefetch_target++;
+ ParallelBitmapHeapState *pstate = node->pstate;
+
+ if (pstate == NULL)
+ {
+ if (node->prefetch_target >= node->prefetch_maximum)
+ /* don't increase any further */ ;
+ else if (node->prefetch_target >= node->prefetch_maximum / 2)
+ node->prefetch_target = node->prefetch_maximum;
+ else if (node->prefetch_target > 0)
+ node->prefetch_target *= 2;
+ else
+ node->prefetch_target++;
+ return;
+ }
+
+ /* Do an unlocked check first to save spinlock acquisitions. */
+ if (pstate->prefetch_target < node->prefetch_maximum)
+ {
+ SpinLockAcquire(&pstate->mutex);
+ if (pstate->prefetch_target >= node->prefetch_maximum)
+ /* don't increase any further */ ;
+ else if (pstate->prefetch_target >= node->prefetch_maximum / 2)
+ pstate->prefetch_target = node->prefetch_maximum;
+ else if (pstate->prefetch_target > 0)
+ pstate->prefetch_target *= 2;
+ else
+ pstate->prefetch_target++;
+ SpinLockRelease(&pstate->mutex);
+ }
#endif /* USE_PREFETCH */
}
BitmapPrefetch(BitmapHeapScanState *node, HeapScanDesc scan)
{
#ifdef USE_PREFETCH
- TBMIterator *prefetch_iterator = node->prefetch_iterator;
+ ParallelBitmapHeapState *pstate = node->pstate;
- if (prefetch_iterator)
+ if (pstate == NULL)
{
- while (node->prefetch_pages < node->prefetch_target)
+ TBMIterator *prefetch_iterator = node->prefetch_iterator;
+
+ if (prefetch_iterator)
{
- TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+ while (node->prefetch_pages < node->prefetch_target)
+ {
+ TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+
+ if (tbmpre == NULL)
+ {
+ /* No more pages to prefetch */
+ tbm_end_iterate(prefetch_iterator);
+ node->prefetch_iterator = NULL;
+ break;
+ }
+ node->prefetch_pages++;
+ PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
+ }
+ }
+
+ return;
+ }
- if (tbmpre == NULL)
+ if (pstate->prefetch_pages < pstate->prefetch_target)
+ {
+ TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
+
+ if (prefetch_iterator)
+ {
+ while (1)
{
- /* No more pages to prefetch */
- tbm_end_iterate(prefetch_iterator);
- node->prefetch_iterator = NULL;
- break;
+ TBMIterateResult *tbmpre;
+ bool do_prefetch = false;
+
+ /*
+ * Recheck under the mutex. If some other process has already
+ * done enough prefetching then we need not to do anything.
+ */
+ SpinLockAcquire(&pstate->mutex);
+ if (pstate->prefetch_pages < pstate->prefetch_target)
+ {
+ pstate->prefetch_pages++;
+ do_prefetch = true;
+ }
+ SpinLockRelease(&pstate->mutex);
+
+ if (!do_prefetch)
+ return;
+
+ tbmpre = tbm_shared_iterate(prefetch_iterator);
+ if (tbmpre == NULL)
+ {
+ /* No more pages to prefetch */
+ tbm_end_shared_iterate(prefetch_iterator);
+ node->shared_prefetch_iterator = NULL;
+ break;
+ }
+
+ PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
}
- node->prefetch_pages++;
- PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
}
}
#endif /* USE_PREFETCH */
tbm_end_iterate(node->tbmiterator);
if (node->prefetch_iterator)
tbm_end_iterate(node->prefetch_iterator);
+ if (node->shared_tbmiterator)
+ tbm_end_shared_iterate(node->shared_tbmiterator);
+ if (node->shared_prefetch_iterator)
+ tbm_end_shared_iterate(node->shared_prefetch_iterator);
if (node->tbm)
tbm_free(node->tbm);
node->tbm = NULL;
node->tbmiterator = NULL;
node->tbmres = NULL;
node->prefetch_iterator = NULL;
+ node->initialized = false;
+ node->shared_tbmiterator = NULL;
+ node->shared_prefetch_iterator = NULL;
+
+ /* Reset parallel bitmap state, if present */
+ if (node->pstate)
+ {
+ dsa_area *dsa = node->ss.ps.state->es_query_dsa;
+
+ node->pstate->state = BM_INITIAL;
+
+ if (DsaPointerIsValid(node->pstate->tbmiterator))
+ tbm_free_shared_area(dsa, node->pstate->tbmiterator);
+
+ if (DsaPointerIsValid(node->pstate->prefetch_iterator))
+ tbm_free_shared_area(dsa, node->pstate->prefetch_iterator);
+
+ node->pstate->tbmiterator = InvalidDsaPointer;
+ node->pstate->prefetch_iterator = InvalidDsaPointer;
+ }
ExecScanReScan(&node->ss);
tbm_end_iterate(node->prefetch_iterator);
if (node->tbm)
tbm_free(node->tbm);
+ if (node->shared_tbmiterator)
+ tbm_end_shared_iterate(node->shared_tbmiterator);
+ if (node->shared_prefetch_iterator)
+ tbm_end_shared_iterate(node->shared_prefetch_iterator);
/*
* close heap scan
scanstate->prefetch_target = 0;
/* may be updated below */
scanstate->prefetch_maximum = target_prefetch_pages;
+ scanstate->pscan_len = 0;
+ scanstate->initialized = false;
+ scanstate->shared_tbmiterator = NULL;
+ scanstate->pstate = NULL;
/*
* Miscellaneous initialization
*/
return scanstate;
}
+
+/*----------------
+ * BitmapShouldInitializeSharedState
+ *
+ * The first process to come here and see the state to the BM_INITIAL
+ * will become the leader for the parallel bitmap scan and will be
+ * responsible for populating the TIDBitmap. The other processes will
+ * be blocked by the condition variable until the leader wakes them up.
+ * ---------------
+ */
+static bool
+BitmapShouldInitializeSharedState(ParallelBitmapHeapState *pstate)
+{
+ SharedBitmapState state;
+
+ while (1)
+ {
+ SpinLockAcquire(&pstate->mutex);
+ state = pstate->state;
+ if (pstate->state == BM_INITIAL)
+ pstate->state = BM_INPROGRESS;
+ SpinLockRelease(&pstate->mutex);
+
+ /* Exit if bitmap is done, or if we're the leader. */
+ if (state != BM_INPROGRESS)
+ break;
+
+ /* Wait for the leader to wake us up. */
+ ConditionVariableSleep(&pstate->cv, WAIT_EVENT_PARALLEL_BITMAP_SCAN);
+ }
+
+ ConditionVariableCancelSleep();
+
+ return (state == BM_INITIAL);
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapEstimate
+ *
+ * estimates the space required to serialize bitmap scan node.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapEstimate(BitmapHeapScanState *node,
+ ParallelContext *pcxt)
+{
+ EState *estate = node->ss.ps.state;
+
+ node->pscan_len = add_size(offsetof(ParallelBitmapHeapState,
+ phs_snapshot_data),
+ EstimateSnapshotSpace(estate->es_snapshot));
+
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapInitializeDSM
+ *
+ * Set up a parallel bitmap heap scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
+ ParallelContext *pcxt)
+{
+ ParallelBitmapHeapState *pstate;
+ EState *estate = node->ss.ps.state;
+
+ pstate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+
+ pstate->tbmiterator = 0;
+ pstate->prefetch_iterator = 0;
+
+ /* Initialize the mutex */
+ SpinLockInit(&pstate->mutex);
+ pstate->prefetch_pages = 0;
+ pstate->prefetch_target = 0;
+ pstate->state = BM_INITIAL;
+
+ ConditionVariableInit(&pstate->cv);
+ SerializeSnapshot(estate->es_snapshot, pstate->phs_snapshot_data);
+
+ shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pstate);
+ node->pstate = pstate;
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapInitializeWorker
+ *
+ * Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc)
+{
+ ParallelBitmapHeapState *pstate;
+ Snapshot snapshot;
+
+ pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
+ node->pstate = pstate;
+
+ snapshot = RestoreSnapshot(pstate->phs_snapshot_data);
+ heap_update_snapshot(node->ss.ss_currentScanDesc, snapshot);
+}