]> granicus.if.org Git - postgresql/commitdiff
Use atomic ops to hand out pages to scan in parallel scan.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 16 Aug 2017 13:18:41 +0000 (16:18 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 16 Aug 2017 13:18:41 +0000 (16:18 +0300)
With a lot of CPUs, the spinlock that protects the current scan location
in a parallel scan can become a bottleneck. Use an atomic fetch-and-add
instruction instead.

David Rowley

Discussion: https://www.postgresql.org/message-id/CAKJS1f9tgsPhqBcoPjv9_KUPZvTLCZ4jy%3DB%3DbhqgaKn7cYzm-w@mail.gmail.com

src/backend/access/heap/heapam.c
src/include/access/relscan.h

index e283fe5b1f51ac8dbc2712b16f6774efda96ee87..7dea8472c17243dfd99d7db0ab77a8531d384d55 100644 (file)
@@ -58,6 +58,7 @@
 #include "catalog/namespace.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "port/atomics.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
@@ -89,6 +90,7 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
                                                bool is_bitmapscan,
                                                bool is_samplescan,
                                                bool temp_snap);
+static void heap_parallelscan_startblock_init(HeapScanDesc scan);
 static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
                                        TransactionId xid, CommandId cid, int options);
@@ -510,6 +512,8 @@ heapgettup(HeapScanDesc scan,
                        }
                        if (scan->rs_parallel != NULL)
                        {
+                               heap_parallelscan_startblock_init(scan);
+
                                page = heap_parallelscan_nextpage(scan);
 
                                /* Other processes might have already finished the scan. */
@@ -812,6 +816,8 @@ heapgettup_pagemode(HeapScanDesc scan,
                        }
                        if (scan->rs_parallel != NULL)
                        {
+                               heap_parallelscan_startblock_init(scan);
+
                                page = heap_parallelscan_nextpage(scan);
 
                                /* Other processes might have already finished the scan. */
@@ -1535,14 +1541,10 @@ heap_rescan(HeapScanDesc scan,
 
                /*
                 * Caller is responsible for making sure that all workers have
-                * finished the scan before calling this, so it really shouldn't be
-                * necessary to acquire the mutex at all.  We acquire it anyway, just
-                * to be tidy.
+                * finished the scan before calling this.
                 */
                parallel_scan = scan->rs_parallel;
-               SpinLockAcquire(&parallel_scan->phs_mutex);
-               parallel_scan->phs_cblock = parallel_scan->phs_startblock;
-               SpinLockRelease(&parallel_scan->phs_mutex);
+               pg_atomic_write_u64(&parallel_scan->phs_nallocated, 0);
        }
 }
 
@@ -1635,8 +1637,8 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
                !RelationUsesLocalBuffers(relation) &&
                target->phs_nblocks > NBuffers / 4;
        SpinLockInit(&target->phs_mutex);
-       target->phs_cblock = InvalidBlockNumber;
        target->phs_startblock = InvalidBlockNumber;
+       pg_atomic_write_u64(&target->phs_nallocated, 0);
        SerializeSnapshot(snapshot, target->phs_snapshot_data);
 }
 
@@ -1660,20 +1662,17 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
 }
 
 /* ----------------
- *             heap_parallelscan_nextpage - get the next page to scan
+ *             heap_parallelscan_startblock_init - find and set the scan's startblock
  *
- *             Get the next page to scan.  Even if there are no pages left to scan,
- *             another backend could have grabbed a page to scan and not yet finished
- *             looking at it, so it doesn't follow that the scan is done when the
- *             first backend gets an InvalidBlockNumber return.
+ *             Determine where the parallel seq scan should start.  This function may
+ *             be called many times, once by each parallel worker.  We must be careful
+ *             only to set the startblock once.
  * ----------------
  */
-static BlockNumber
-heap_parallelscan_nextpage(HeapScanDesc scan)
+static void
+heap_parallelscan_startblock_init(HeapScanDesc scan)
 {
-       BlockNumber page = InvalidBlockNumber;
        BlockNumber sync_startpage = InvalidBlockNumber;
-       BlockNumber report_page = InvalidBlockNumber;
        ParallelHeapScanDesc parallel_scan;
 
        Assert(scan->rs_parallel);
@@ -1705,46 +1704,63 @@ retry:
                        sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
                        goto retry;
                }
-               parallel_scan->phs_cblock = parallel_scan->phs_startblock;
        }
+       SpinLockRelease(&parallel_scan->phs_mutex);
+}
+
+/* ----------------
+ *             heap_parallelscan_nextpage - get the next page to scan
+ *
+ *             Get the next page to scan.  Even if there are no pages left to scan,
+ *             another backend could have grabbed a page to scan and not yet finished
+ *             looking at it, so it doesn't follow that the scan is done when the
+ *             first backend gets an InvalidBlockNumber return.
+ * ----------------
+ */
+static BlockNumber
+heap_parallelscan_nextpage(HeapScanDesc scan)
+{
+       BlockNumber page;
+       ParallelHeapScanDesc parallel_scan;
+       uint64          nallocated;
+
+       Assert(scan->rs_parallel);
+       parallel_scan = scan->rs_parallel;
 
        /*
-        * The current block number is the next one that needs to be scanned,
-        * unless it's InvalidBlockNumber already, in which case there are no more
-        * blocks to scan.  After remembering the current value, we must advance
-        * it so that the next call to this function returns the next block to be
-        * scanned.
+        * phs_nallocated tracks how many pages have been allocated to workers
+        * already.  When phs_nallocated >= rs_nblocks, all blocks have been
+        * allocated.
+        *
+        * Because we use an atomic fetch-and-add to fetch the current value, the
+        * phs_nallocated counter will exceed rs_nblocks, because workers will
+        * still increment the value, when they try to allocate the next block but
+        * all blocks have been allocated already. The counter must be 64 bits
+        * wide because of that, to avoid wrapping around when rs_nblocks is close
+        * to 2^32.
+        *
+        * The actual page to return is calculated by adding the counter to the
+        * starting block number, modulo nblocks.
         */
-       page = parallel_scan->phs_cblock;
-       if (page != InvalidBlockNumber)
-       {
-               parallel_scan->phs_cblock++;
-               if (parallel_scan->phs_cblock >= scan->rs_nblocks)
-                       parallel_scan->phs_cblock = 0;
-               if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
-               {
-                       parallel_scan->phs_cblock = InvalidBlockNumber;
-                       report_page = parallel_scan->phs_startblock;
-               }
-       }
-
-       /* Release the lock. */
-       SpinLockRelease(&parallel_scan->phs_mutex);
+       nallocated = pg_atomic_fetch_add_u64(&parallel_scan->phs_nallocated, 1);
+       if (nallocated >= scan->rs_nblocks)
+               page = InvalidBlockNumber;      /* all blocks have been allocated */
+       else
+               page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;
 
        /*
         * Report scan location.  Normally, we report the current page number.
         * When we reach the end of the scan, though, we report the starting page,
         * not the ending page, just so the starting positions for later scans
         * doesn't slew backwards.  We only report the position at the end of the
-        * scan once, though: subsequent callers will have report nothing, since
-        * they will have page == InvalidBlockNumber.
+        * scan once, though: subsequent callers will report nothing.
         */
        if (scan->rs_syncscan)
        {
-               if (report_page == InvalidBlockNumber)
-                       report_page = page;
-               if (report_page != InvalidBlockNumber)
-                       ss_report_location(scan->rs_rd, report_page);
+               if (page != InvalidBlockNumber)
+                       ss_report_location(scan->rs_rd, page);
+               else if (nallocated == scan->rs_nblocks)
+                       ss_report_location(scan->rs_rd, parallel_scan->phs_startblock);
        }
 
        return page;
index a20646b2b706191b442e98f72420238015488238..147f862a2b85fcf19ab3992b3c5d2eda6ecc6b5b 100644 (file)
@@ -35,9 +35,10 @@ typedef struct ParallelHeapScanDescData
        Oid                     phs_relid;              /* OID of relation to scan */
        bool            phs_syncscan;   /* report location to syncscan logic? */
        BlockNumber phs_nblocks;        /* # blocks in relation at start of scan */
-       slock_t         phs_mutex;              /* mutual exclusion for block number fields */
+       slock_t         phs_mutex;              /* mutual exclusion for setting startblock */
        BlockNumber phs_startblock; /* starting block number */
-       BlockNumber phs_cblock;         /* current block number */
+       pg_atomic_uint64 phs_nallocated;        /* number of blocks allocated to
+                                                                                * workers so far. */
        char            phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
 }                      ParallelHeapScanDescData;