From: Heikki Linnakangas Date: Wed, 16 Aug 2017 13:18:41 +0000 (+0300) Subject: Use atomic ops to hand out pages to scan in parallel scan. X-Git-Tag: REL_11_BETA1~1782 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=3cda10f41bfed7e34b0db7013b66dd40a5f75926;p=postgresql Use atomic ops to hand out pages to scan in parallel scan. With a lot of CPUs, the spinlock that protects the current scan location in a parallel scan can become a bottleneck. Use an atomic fetch-and-add instruction instead. David Rowley Discussion: https://www.postgresql.org/message-id/CAKJS1f9tgsPhqBcoPjv9_KUPZvTLCZ4jy%3DB%3DbhqgaKn7cYzm-w@mail.gmail.com --- diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index e283fe5b1f..7dea8472c1 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -58,6 +58,7 @@ #include "catalog/namespace.h" #include "miscadmin.h" #include "pgstat.h" +#include "port/atomics.h" #include "storage/bufmgr.h" #include "storage/freespace.h" #include "storage/lmgr.h" @@ -89,6 +90,7 @@ static HeapScanDesc heap_beginscan_internal(Relation relation, bool is_bitmapscan, bool is_samplescan, bool temp_snap); +static void heap_parallelscan_startblock_init(HeapScanDesc scan); static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan); static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); @@ -510,6 +512,8 @@ heapgettup(HeapScanDesc scan, } if (scan->rs_parallel != NULL) { + heap_parallelscan_startblock_init(scan); + page = heap_parallelscan_nextpage(scan); /* Other processes might have already finished the scan. */ @@ -812,6 +816,8 @@ heapgettup_pagemode(HeapScanDesc scan, } if (scan->rs_parallel != NULL) { + heap_parallelscan_startblock_init(scan); + page = heap_parallelscan_nextpage(scan); /* Other processes might have already finished the scan. */ @@ -1535,14 +1541,10 @@ heap_rescan(HeapScanDesc scan, /* * Caller is responsible for making sure that all workers have - * finished the scan before calling this, so it really shouldn't be - * necessary to acquire the mutex at all. We acquire it anyway, just - * to be tidy. + * finished the scan before calling this. */ parallel_scan = scan->rs_parallel; - SpinLockAcquire(¶llel_scan->phs_mutex); - parallel_scan->phs_cblock = parallel_scan->phs_startblock; - SpinLockRelease(¶llel_scan->phs_mutex); + pg_atomic_write_u64(¶llel_scan->phs_nallocated, 0); } } @@ -1635,8 +1637,8 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, !RelationUsesLocalBuffers(relation) && target->phs_nblocks > NBuffers / 4; SpinLockInit(&target->phs_mutex); - target->phs_cblock = InvalidBlockNumber; target->phs_startblock = InvalidBlockNumber; + pg_atomic_write_u64(&target->phs_nallocated, 0); SerializeSnapshot(snapshot, target->phs_snapshot_data); } @@ -1660,20 +1662,17 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) } /* ---------------- - * heap_parallelscan_nextpage - get the next page to scan + * heap_parallelscan_startblock_init - find and set the scan's startblock * - * Get the next page to scan. Even if there are no pages left to scan, - * another backend could have grabbed a page to scan and not yet finished - * looking at it, so it doesn't follow that the scan is done when the - * first backend gets an InvalidBlockNumber return. + * Determine where the parallel seq scan should start. This function may + * be called many times, once by each parallel worker. We must be careful + * only to set the startblock once. * ---------------- */ -static BlockNumber -heap_parallelscan_nextpage(HeapScanDesc scan) +static void +heap_parallelscan_startblock_init(HeapScanDesc scan) { - BlockNumber page = InvalidBlockNumber; BlockNumber sync_startpage = InvalidBlockNumber; - BlockNumber report_page = InvalidBlockNumber; ParallelHeapScanDesc parallel_scan; Assert(scan->rs_parallel); @@ -1705,46 +1704,63 @@ retry: sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks); goto retry; } - parallel_scan->phs_cblock = parallel_scan->phs_startblock; } + SpinLockRelease(¶llel_scan->phs_mutex); +} + +/* ---------------- + * heap_parallelscan_nextpage - get the next page to scan + * + * Get the next page to scan. Even if there are no pages left to scan, + * another backend could have grabbed a page to scan and not yet finished + * looking at it, so it doesn't follow that the scan is done when the + * first backend gets an InvalidBlockNumber return. + * ---------------- + */ +static BlockNumber +heap_parallelscan_nextpage(HeapScanDesc scan) +{ + BlockNumber page; + ParallelHeapScanDesc parallel_scan; + uint64 nallocated; + + Assert(scan->rs_parallel); + parallel_scan = scan->rs_parallel; /* - * The current block number is the next one that needs to be scanned, - * unless it's InvalidBlockNumber already, in which case there are no more - * blocks to scan. After remembering the current value, we must advance - * it so that the next call to this function returns the next block to be - * scanned. + * phs_nallocated tracks how many pages have been allocated to workers + * already. When phs_nallocated >= rs_nblocks, all blocks have been + * allocated. + * + * Because we use an atomic fetch-and-add to fetch the current value, the + * phs_nallocated counter will exceed rs_nblocks, because workers will + * still increment the value, when they try to allocate the next block but + * all blocks have been allocated already. The counter must be 64 bits + * wide because of that, to avoid wrapping around when rs_nblocks is close + * to 2^32. + * + * The actual page to return is calculated by adding the counter to the + * starting block number, modulo nblocks. */ - page = parallel_scan->phs_cblock; - if (page != InvalidBlockNumber) - { - parallel_scan->phs_cblock++; - if (parallel_scan->phs_cblock >= scan->rs_nblocks) - parallel_scan->phs_cblock = 0; - if (parallel_scan->phs_cblock == parallel_scan->phs_startblock) - { - parallel_scan->phs_cblock = InvalidBlockNumber; - report_page = parallel_scan->phs_startblock; - } - } - - /* Release the lock. */ - SpinLockRelease(¶llel_scan->phs_mutex); + nallocated = pg_atomic_fetch_add_u64(¶llel_scan->phs_nallocated, 1); + if (nallocated >= scan->rs_nblocks) + page = InvalidBlockNumber; /* all blocks have been allocated */ + else + page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks; /* * Report scan location. Normally, we report the current page number. * When we reach the end of the scan, though, we report the starting page, * not the ending page, just so the starting positions for later scans * doesn't slew backwards. We only report the position at the end of the - * scan once, though: subsequent callers will have report nothing, since - * they will have page == InvalidBlockNumber. + * scan once, though: subsequent callers will report nothing. */ if (scan->rs_syncscan) { - if (report_page == InvalidBlockNumber) - report_page = page; - if (report_page != InvalidBlockNumber) - ss_report_location(scan->rs_rd, report_page); + if (page != InvalidBlockNumber) + ss_report_location(scan->rs_rd, page); + else if (nallocated == scan->rs_nblocks) + ss_report_location(scan->rs_rd, parallel_scan->phs_startblock); } return page; diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index a20646b2b7..147f862a2b 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -35,9 +35,10 @@ typedef struct ParallelHeapScanDescData Oid phs_relid; /* OID of relation to scan */ bool phs_syncscan; /* report location to syncscan logic? */ BlockNumber phs_nblocks; /* # blocks in relation at start of scan */ - slock_t phs_mutex; /* mutual exclusion for block number fields */ + slock_t phs_mutex; /* mutual exclusion for setting startblock */ BlockNumber phs_startblock; /* starting block number */ - BlockNumber phs_cblock; /* current block number */ + pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to + * workers so far. */ char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; } ParallelHeapScanDescData;