#include "catalog/namespace.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "port/atomics.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
bool is_bitmapscan,
bool is_samplescan,
bool temp_snap);
+static void heap_parallelscan_startblock_init(HeapScanDesc scan);
static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
}
if (scan->rs_parallel != NULL)
{
+ heap_parallelscan_startblock_init(scan);
+
page = heap_parallelscan_nextpage(scan);
/* Other processes might have already finished the scan. */
}
if (scan->rs_parallel != NULL)
{
+ heap_parallelscan_startblock_init(scan);
+
page = heap_parallelscan_nextpage(scan);
/* Other processes might have already finished the scan. */
/*
* Caller is responsible for making sure that all workers have
- * finished the scan before calling this, so it really shouldn't be
- * necessary to acquire the mutex at all. We acquire it anyway, just
- * to be tidy.
+ * finished the scan before calling this.
*/
parallel_scan = scan->rs_parallel;
- SpinLockAcquire(¶llel_scan->phs_mutex);
- parallel_scan->phs_cblock = parallel_scan->phs_startblock;
- SpinLockRelease(¶llel_scan->phs_mutex);
+ pg_atomic_write_u64(¶llel_scan->phs_nallocated, 0);
}
}
!RelationUsesLocalBuffers(relation) &&
target->phs_nblocks > NBuffers / 4;
SpinLockInit(&target->phs_mutex);
- target->phs_cblock = InvalidBlockNumber;
target->phs_startblock = InvalidBlockNumber;
+ pg_atomic_write_u64(&target->phs_nallocated, 0);
SerializeSnapshot(snapshot, target->phs_snapshot_data);
}
}
/* ----------------
- * heap_parallelscan_nextpage - get the next page to scan
+ * heap_parallelscan_startblock_init - find and set the scan's startblock
*
- * Get the next page to scan. Even if there are no pages left to scan,
- * another backend could have grabbed a page to scan and not yet finished
- * looking at it, so it doesn't follow that the scan is done when the
- * first backend gets an InvalidBlockNumber return.
+ * Determine where the parallel seq scan should start. This function may
+ * be called many times, once by each parallel worker. We must be careful
+ * only to set the startblock once.
* ----------------
*/
-static BlockNumber
-heap_parallelscan_nextpage(HeapScanDesc scan)
+static void
+heap_parallelscan_startblock_init(HeapScanDesc scan)
{
- BlockNumber page = InvalidBlockNumber;
BlockNumber sync_startpage = InvalidBlockNumber;
- BlockNumber report_page = InvalidBlockNumber;
ParallelHeapScanDesc parallel_scan;
Assert(scan->rs_parallel);
sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
goto retry;
}
- parallel_scan->phs_cblock = parallel_scan->phs_startblock;
}
+ SpinLockRelease(¶llel_scan->phs_mutex);
+}
+
+/* ----------------
+ * heap_parallelscan_nextpage - get the next page to scan
+ *
+ * Get the next page to scan. Even if there are no pages left to scan,
+ * another backend could have grabbed a page to scan and not yet finished
+ * looking at it, so it doesn't follow that the scan is done when the
+ * first backend gets an InvalidBlockNumber return.
+ * ----------------
+ */
+static BlockNumber
+heap_parallelscan_nextpage(HeapScanDesc scan)
+{
+ BlockNumber page;
+ ParallelHeapScanDesc parallel_scan;
+ uint64 nallocated;
+
+ Assert(scan->rs_parallel);
+ parallel_scan = scan->rs_parallel;
/*
- * The current block number is the next one that needs to be scanned,
- * unless it's InvalidBlockNumber already, in which case there are no more
- * blocks to scan. After remembering the current value, we must advance
- * it so that the next call to this function returns the next block to be
- * scanned.
+ * phs_nallocated tracks how many pages have been allocated to workers
+ * already. When phs_nallocated >= rs_nblocks, all blocks have been
+ * allocated.
+ *
+ * Because we use an atomic fetch-and-add to fetch the current value, the
+ * phs_nallocated counter will exceed rs_nblocks, because workers will
+ * still increment the value, when they try to allocate the next block but
+ * all blocks have been allocated already. The counter must be 64 bits
+ * wide because of that, to avoid wrapping around when rs_nblocks is close
+ * to 2^32.
+ *
+ * The actual page to return is calculated by adding the counter to the
+ * starting block number, modulo nblocks.
*/
- page = parallel_scan->phs_cblock;
- if (page != InvalidBlockNumber)
- {
- parallel_scan->phs_cblock++;
- if (parallel_scan->phs_cblock >= scan->rs_nblocks)
- parallel_scan->phs_cblock = 0;
- if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
- {
- parallel_scan->phs_cblock = InvalidBlockNumber;
- report_page = parallel_scan->phs_startblock;
- }
- }
-
- /* Release the lock. */
- SpinLockRelease(¶llel_scan->phs_mutex);
+ nallocated = pg_atomic_fetch_add_u64(¶llel_scan->phs_nallocated, 1);
+ if (nallocated >= scan->rs_nblocks)
+ page = InvalidBlockNumber; /* all blocks have been allocated */
+ else
+ page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;
/*
* Report scan location. Normally, we report the current page number.
* When we reach the end of the scan, though, we report the starting page,
* not the ending page, just so the starting positions for later scans
* doesn't slew backwards. We only report the position at the end of the
- * scan once, though: subsequent callers will have report nothing, since
- * they will have page == InvalidBlockNumber.
+ * scan once, though: subsequent callers will report nothing.
*/
if (scan->rs_syncscan)
{
- if (report_page == InvalidBlockNumber)
- report_page = page;
- if (report_page != InvalidBlockNumber)
- ss_report_location(scan->rs_rd, report_page);
+ if (page != InvalidBlockNumber)
+ ss_report_location(scan->rs_rd, page);
+ else if (nallocated == scan->rs_nblocks)
+ ss_report_location(scan->rs_rd, parallel_scan->phs_startblock);
}
return page;