#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
+#include "storage/spin.h"
#include "storage/standby.h"
#include "utils/datum.h"
#include "utils/inval.h"
static HeapScanDesc heap_beginscan_internal(Relation relation,
Snapshot snapshot,
int nkeys, ScanKey key,
+ ParallelHeapScanDesc parallel_scan,
bool allow_strat,
bool allow_sync,
bool allow_pagemode,
bool is_bitmapscan,
bool is_samplescan,
bool temp_snap);
+static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
* results for a non-MVCC snapshot, the caller must hold some higher-level
* lock that ensures the interesting tuple(s) won't change.)
*/
- scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
+ if (scan->rs_parallel != NULL)
+ scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
+ else
+ scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
/*
* If the table is large relative to NBuffers, use a bulk-read access
* behaviors, independently of the size of the table; also there is a GUC
* variable that can disable synchronized scanning.)
*
- * During a rescan, don't make a new strategy object if we don't have to.
+ * Note that heap_parallelscan_initialize has a very similar test; if you
+ * change this, consider changing that one, too.
*/
if (!RelationUsesLocalBuffers(scan->rs_rd) &&
scan->rs_nblocks > NBuffers / 4)
if (allow_strat)
{
+ /* During a rescan, keep the previous strategy object. */
if (scan->rs_strategy == NULL)
scan->rs_strategy = GetAccessStrategy(BAS_BULKREAD);
}
scan->rs_strategy = NULL;
}
- if (keep_startblock)
+ if (scan->rs_parallel != NULL)
+ {
+ /* For parallel scan, believe whatever ParallelHeapScanDesc says. */
+ scan->rs_syncscan = scan->rs_parallel->phs_syncscan;
+ }
+ else if (keep_startblock)
{
/*
* When rescanning, we want to keep the previous startblock setting,
tuple->t_data = NULL;
return;
}
- page = scan->rs_startblock; /* first page */
+ if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+
+ /* Other processes might have already finished the scan. */
+ if (page == InvalidBlockNumber)
+ {
+ Assert(!BufferIsValid(scan->rs_cbuf));
+ tuple->t_data = NULL;
+ return;
+ }
+ }
+ else
+ page = scan->rs_startblock; /* first page */
heapgetpage(scan, page);
lineoff = FirstOffsetNumber; /* first offnum */
scan->rs_inited = true;
}
else if (backward)
{
+ /* backward parallel scan not supported */
+ Assert(scan->rs_parallel == NULL);
+
if (!scan->rs_inited)
{
/*
page = scan->rs_nblocks;
page--;
}
+ else if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+ finished = (page == InvalidBlockNumber);
+ }
else
{
page++;
tuple->t_data = NULL;
return;
}
- page = scan->rs_startblock; /* first page */
+ if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+
+ /* Other processes might have already finished the scan. */
+ if (page == InvalidBlockNumber)
+ {
+ Assert(!BufferIsValid(scan->rs_cbuf));
+ tuple->t_data = NULL;
+ return;
+ }
+ }
+ else
+ page = scan->rs_startblock; /* first page */
heapgetpage(scan, page);
lineindex = 0;
scan->rs_inited = true;
}
else if (backward)
{
+ /* backward parallel scan not supported */
+ Assert(scan->rs_parallel == NULL);
+
if (!scan->rs_inited)
{
/*
page = scan->rs_nblocks;
page--;
}
+ else if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+ finished = (page == InvalidBlockNumber);
+ }
else
{
page++;
heap_beginscan(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
true, true, true, false, false, false);
}
Oid relid = RelationGetRelid(relation);
Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
true, true, true, false, false, true);
}
int nkeys, ScanKey key,
bool allow_strat, bool allow_sync)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
allow_strat, allow_sync, true,
false, false, false);
}
heap_beginscan_bm(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
false, false, true, true, false, false);
}
int nkeys, ScanKey key,
bool allow_strat, bool allow_sync, bool allow_pagemode)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
allow_strat, allow_sync, allow_pagemode,
false, true, false);
}
static HeapScanDesc
heap_beginscan_internal(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
+ ParallelHeapScanDesc parallel_scan,
bool allow_strat,
bool allow_sync,
bool allow_pagemode,
scan->rs_allow_strat = allow_strat;
scan->rs_allow_sync = allow_sync;
scan->rs_temp_snap = temp_snap;
+ scan->rs_parallel = parallel_scan;
/*
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
* reinitialize scan descriptor
*/
initscan(scan, key, true);
+
+ /*
+ * reset parallel scan, if present
+ */
+ if (scan->rs_parallel != NULL)
+ {
+ ParallelHeapScanDesc parallel_scan;
+
+ /*
+ * Caller is responsible for making sure that all workers have
+ * finished the scan before calling this, so it really shouldn't be
+ * necessary to acquire the mutex at all. We acquire it anyway, just
+ * to be tidy.
+ */
+ parallel_scan = scan->rs_parallel;
+ SpinLockAcquire(¶llel_scan->phs_mutex);
+ parallel_scan->phs_cblock = parallel_scan->phs_startblock;
+ SpinLockRelease(¶llel_scan->phs_mutex);
+ }
}
/* ----------------
pfree(scan);
}
+/* ----------------
+ * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
+ *
+ * Sadly, this doesn't reduce to a constant, because the size required
+ * to serialize the snapshot can vary.
+ * ----------------
+ */
+Size
+heap_parallelscan_estimate(Snapshot snapshot)
+{
+ return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data),
+ EstimateSnapshotSpace(snapshot));
+}
+
+/* ----------------
+ * heap_parallelscan_initialize - initialize ParallelHeapScanDesc
+ *
+ * Must allow as many bytes of shared memory as returned by
+ * heap_parallelscan_estimate. Call this just once in the leader
+ * process; then, individual workers attach via heap_beginscan_parallel.
+ * ----------------
+ */
+void
+heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
+ Snapshot snapshot)
+{
+ target->phs_relid = RelationGetRelid(relation);
+ target->phs_nblocks = RelationGetNumberOfBlocks(relation);
+ /* compare phs_syncscan initialization to similar logic in initscan */
+ target->phs_syncscan = synchronize_seqscans &&
+ !RelationUsesLocalBuffers(relation) &&
+ target->phs_nblocks > NBuffers / 4;
+ SpinLockInit(&target->phs_mutex);
+ target->phs_cblock = InvalidBlockNumber;
+ target->phs_startblock = InvalidBlockNumber;
+ SerializeSnapshot(snapshot, target->phs_snapshot_data);
+}
+
+/* ----------------
+ * heap_beginscan_parallel - join a parallel scan
+ *
+ * Caller must hold a suitable lock on the correct relation.
+ * ----------------
+ */
+HeapScanDesc
+heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
+{
+ Snapshot snapshot;
+
+ Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
+ snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
+ RegisterSnapshot(snapshot);
+
+ return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
+ true, true, true, false, false, true);
+}
+
+/* ----------------
+ * heap_parallelscan_nextpage - get the next page to scan
+ *
+ * Get the next page to scan. Even if there are no pages left to scan,
+ * another backend could have grabbed a page to scan and not yet finished
+ * looking at it, so it doesn't follow that the scan is done when the
+ * first backend gets an InvalidBlockNumber return.
+ * ----------------
+ */
+static BlockNumber
+heap_parallelscan_nextpage(HeapScanDesc scan)
+{
+ BlockNumber page = InvalidBlockNumber;
+ BlockNumber sync_startpage = InvalidBlockNumber;
+ BlockNumber report_page = InvalidBlockNumber;
+ ParallelHeapScanDesc parallel_scan;
+
+ Assert(scan->rs_parallel);
+ parallel_scan = scan->rs_parallel;
+
+retry:
+ /* Grab the spinlock. */
+ SpinLockAcquire(¶llel_scan->phs_mutex);
+
+ /*
+ * If the scan's startblock has not yet been initialized, we must do so
+ * now. If this is not a synchronized scan, we just start at block 0, but
+ * if it is a synchronized scan, we must get the starting position from
+ * the synchronized scan machinery. We can't hold the spinlock while
+ * doing that, though, so release the spinlock, get the information we
+ * need, and retry. If nobody else has initialized the scan in the
+ * meantime, we'll fill in the value we fetched on the second time
+ * through.
+ */
+ if (parallel_scan->phs_startblock == InvalidBlockNumber)
+ {
+ if (!parallel_scan->phs_syncscan)
+ parallel_scan->phs_startblock = 0;
+ else if (sync_startpage != InvalidBlockNumber)
+ parallel_scan->phs_startblock = sync_startpage;
+ else
+ {
+ SpinLockRelease(¶llel_scan->phs_mutex);
+ sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
+ goto retry;
+ }
+ parallel_scan->phs_cblock = parallel_scan->phs_startblock;
+ }
+
+ /*
+ * The current block number is the next one that needs to be scanned,
+ * unless it's InvalidBlockNumber already, in which case there are no more
+ * blocks to scan. After remembering the current value, we must advance
+ * it so that the next call to this function returns the next block to be
+ * scanned.
+ */
+ page = parallel_scan->phs_cblock;
+ if (page != InvalidBlockNumber)
+ {
+ parallel_scan->phs_cblock++;
+ if (parallel_scan->phs_cblock >= scan->rs_nblocks)
+ parallel_scan->phs_cblock = 0;
+ if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
+ {
+ parallel_scan->phs_cblock = InvalidBlockNumber;
+ report_page = parallel_scan->phs_startblock;
+ }
+ }
+
+ /* Release the lock. */
+ SpinLockRelease(¶llel_scan->phs_mutex);
+
+ /*
+ * Report scan location. Normally, we report the current page number.
+ * When we reach the end of the scan, though, we report the starting page,
+ * not the ending page, just so the starting positions for later scans
+ * doesn't slew backwards. We only report the position at the end of the
+ * scan once, though: subsequent callers will have report nothing, since
+ * they will have page == InvalidBlockNumber.
+ */
+ if (scan->rs_syncscan)
+ {
+ if (report_page == InvalidBlockNumber)
+ report_page = page;
+ if (report_page != InvalidBlockNumber)
+ ss_report_location(scan->rs_rd, report_page);
+ }
+
+ return page;
+}
+
/* ----------------
* heap_getnext - retrieve next tuple in scan
*