]> granicus.if.org Git - postgresql/commitdiff
Add a C API for parallel heap scans.
authorRobert Haas <rhaas@postgresql.org>
Fri, 16 Oct 2015 21:25:02 +0000 (17:25 -0400)
committerRobert Haas <rhaas@postgresql.org>
Fri, 16 Oct 2015 21:33:18 +0000 (17:33 -0400)
Using this API, one backend can set up a ParallelHeapScanDesc to
which multiple backends can then attach.  Each tuple in the relation
will be returned to exactly one of the scanning backends.  Only
forward scans are supported, and rescans must be carefully
coordinated.

This is not exposed to the planner or executor yet.

The original version of this code was written by me.  Amit Kapila
reviewed it, tested it, and improved it, including adding support for
synchronized scans, per review comments from Jeff Davis.  Extensive
testing of this and related patches was performed by Haribabu Kommi.
Final cleanup of this patch by me.

src/backend/access/heap/heapam.c
src/include/access/heapam.h
src/include/access/relscan.h

index bcf987124fded27ba94c4b174ad002223fdba6c6..66deb1faee01f765624febc0954a3c26b3b47404 100644 (file)
@@ -63,6 +63,7 @@
 #include "storage/predicate.h"
 #include "storage/procarray.h"
 #include "storage/smgr.h"
+#include "storage/spin.h"
 #include "storage/standby.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
@@ -80,12 +81,14 @@ bool                synchronize_seqscans = true;
 static HeapScanDesc heap_beginscan_internal(Relation relation,
                                                Snapshot snapshot,
                                                int nkeys, ScanKey key,
+                                               ParallelHeapScanDesc parallel_scan,
                                                bool allow_strat,
                                                bool allow_sync,
                                                bool allow_pagemode,
                                                bool is_bitmapscan,
                                                bool is_samplescan,
                                                bool temp_snap);
+static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
                                        TransactionId xid, CommandId cid, int options);
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@@ -226,7 +229,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
         * results for a non-MVCC snapshot, the caller must hold some higher-level
         * lock that ensures the interesting tuple(s) won't change.)
         */
-       scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
+       if (scan->rs_parallel != NULL)
+               scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
+       else
+               scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
 
        /*
         * If the table is large relative to NBuffers, use a bulk-read access
@@ -237,7 +243,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
         * behaviors, independently of the size of the table; also there is a GUC
         * variable that can disable synchronized scanning.)
         *
-        * During a rescan, don't make a new strategy object if we don't have to.
+        * Note that heap_parallelscan_initialize has a very similar test; if you
+        * change this, consider changing that one, too.
         */
        if (!RelationUsesLocalBuffers(scan->rs_rd) &&
                scan->rs_nblocks > NBuffers / 4)
@@ -250,6 +257,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 
        if (allow_strat)
        {
+               /* During a rescan, keep the previous strategy object. */
                if (scan->rs_strategy == NULL)
                        scan->rs_strategy = GetAccessStrategy(BAS_BULKREAD);
        }
@@ -260,7 +268,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
                scan->rs_strategy = NULL;
        }
 
-       if (keep_startblock)
+       if (scan->rs_parallel != NULL)
+       {
+               /* For parallel scan, believe whatever ParallelHeapScanDesc says. */
+               scan->rs_syncscan = scan->rs_parallel->phs_syncscan;
+       }
+       else if (keep_startblock)
        {
                /*
                 * When rescanning, we want to keep the previous startblock setting,
@@ -496,7 +509,20 @@ heapgettup(HeapScanDesc scan,
                                tuple->t_data = NULL;
                                return;
                        }
-                       page = scan->rs_startblock; /* first page */
+                       if (scan->rs_parallel != NULL)
+                       {
+                               page = heap_parallelscan_nextpage(scan);
+
+                               /* Other processes might have already finished the scan. */
+                               if (page == InvalidBlockNumber)
+                               {
+                                       Assert(!BufferIsValid(scan->rs_cbuf));
+                                       tuple->t_data = NULL;
+                                       return;
+                               }
+                       }
+                       else
+                               page = scan->rs_startblock;             /* first page */
                        heapgetpage(scan, page);
                        lineoff = FirstOffsetNumber;            /* first offnum */
                        scan->rs_inited = true;
@@ -519,6 +545,9 @@ heapgettup(HeapScanDesc scan,
        }
        else if (backward)
        {
+               /* backward parallel scan not supported */
+               Assert(scan->rs_parallel == NULL);
+
                if (!scan->rs_inited)
                {
                        /*
@@ -669,6 +698,11 @@ heapgettup(HeapScanDesc scan,
                                page = scan->rs_nblocks;
                        page--;
                }
+               else if (scan->rs_parallel != NULL)
+               {
+                       page = heap_parallelscan_nextpage(scan);
+                       finished = (page == InvalidBlockNumber);
+               }
                else
                {
                        page++;
@@ -773,7 +807,20 @@ heapgettup_pagemode(HeapScanDesc scan,
                                tuple->t_data = NULL;
                                return;
                        }
-                       page = scan->rs_startblock; /* first page */
+                       if (scan->rs_parallel != NULL)
+                       {
+                               page = heap_parallelscan_nextpage(scan);
+
+                               /* Other processes might have already finished the scan. */
+                               if (page == InvalidBlockNumber)
+                               {
+                                       Assert(!BufferIsValid(scan->rs_cbuf));
+                                       tuple->t_data = NULL;
+                                       return;
+                               }
+                       }
+                       else
+                               page = scan->rs_startblock;             /* first page */
                        heapgetpage(scan, page);
                        lineindex = 0;
                        scan->rs_inited = true;
@@ -793,6 +840,9 @@ heapgettup_pagemode(HeapScanDesc scan,
        }
        else if (backward)
        {
+               /* backward parallel scan not supported */
+               Assert(scan->rs_parallel == NULL);
+
                if (!scan->rs_inited)
                {
                        /*
@@ -932,6 +982,11 @@ heapgettup_pagemode(HeapScanDesc scan,
                                page = scan->rs_nblocks;
                        page--;
                }
+               else if (scan->rs_parallel != NULL)
+               {
+                       page = heap_parallelscan_nextpage(scan);
+                       finished = (page == InvalidBlockNumber);
+               }
                else
                {
                        page++;
@@ -1341,7 +1396,7 @@ HeapScanDesc
 heap_beginscan(Relation relation, Snapshot snapshot,
                           int nkeys, ScanKey key)
 {
-       return heap_beginscan_internal(relation, snapshot, nkeys, key,
+       return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
                                                                   true, true, true, false, false, false);
 }
 
@@ -1351,7 +1406,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
        Oid                     relid = RelationGetRelid(relation);
        Snapshot        snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
 
-       return heap_beginscan_internal(relation, snapshot, nkeys, key,
+       return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
                                                                   true, true, true, false, false, true);
 }
 
@@ -1360,7 +1415,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot,
                                         int nkeys, ScanKey key,
                                         bool allow_strat, bool allow_sync)
 {
-       return heap_beginscan_internal(relation, snapshot, nkeys, key,
+       return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
                                                                   allow_strat, allow_sync, true,
                                                                   false, false, false);
 }
@@ -1369,7 +1424,7 @@ HeapScanDesc
 heap_beginscan_bm(Relation relation, Snapshot snapshot,
                                  int nkeys, ScanKey key)
 {
-       return heap_beginscan_internal(relation, snapshot, nkeys, key,
+       return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
                                                                   false, false, true, true, false, false);
 }
 
@@ -1378,7 +1433,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot,
                                                int nkeys, ScanKey key,
                                          bool allow_strat, bool allow_sync, bool allow_pagemode)
 {
-       return heap_beginscan_internal(relation, snapshot, nkeys, key,
+       return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
                                                                   allow_strat, allow_sync, allow_pagemode,
                                                                   false, true, false);
 }
@@ -1386,6 +1441,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot,
 static HeapScanDesc
 heap_beginscan_internal(Relation relation, Snapshot snapshot,
                                                int nkeys, ScanKey key,
+                                               ParallelHeapScanDesc parallel_scan,
                                                bool allow_strat,
                                                bool allow_sync,
                                                bool allow_pagemode,
@@ -1418,6 +1474,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
        scan->rs_allow_strat = allow_strat;
        scan->rs_allow_sync = allow_sync;
        scan->rs_temp_snap = temp_snap;
+       scan->rs_parallel = parallel_scan;
 
        /*
         * we can use page-at-a-time mode if it's an MVCC-safe snapshot
@@ -1473,6 +1530,25 @@ heap_rescan(HeapScanDesc scan,
         * reinitialize scan descriptor
         */
        initscan(scan, key, true);
+
+       /*
+        * reset parallel scan, if present
+        */
+       if (scan->rs_parallel != NULL)
+       {
+               ParallelHeapScanDesc parallel_scan;
+
+               /*
+                * Caller is responsible for making sure that all workers have
+                * finished the scan before calling this, so it really shouldn't be
+                * necessary to acquire the mutex at all.  We acquire it anyway, just
+                * to be tidy.
+                */
+               parallel_scan = scan->rs_parallel;
+               SpinLockAcquire(&parallel_scan->phs_mutex);
+               parallel_scan->phs_cblock = parallel_scan->phs_startblock;
+               SpinLockRelease(&parallel_scan->phs_mutex);
+       }
 }
 
 /* ----------------
@@ -1531,6 +1607,154 @@ heap_endscan(HeapScanDesc scan)
        pfree(scan);
 }
 
+/* ----------------
+ *             heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
+ *
+ *             Sadly, this doesn't reduce to a constant, because the size required
+ *             to serialize the snapshot can vary.
+ * ----------------
+ */
+Size
+heap_parallelscan_estimate(Snapshot snapshot)
+{
+       return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data),
+                                       EstimateSnapshotSpace(snapshot));
+}
+
+/* ----------------
+ *             heap_parallelscan_initialize - initialize ParallelHeapScanDesc
+ *
+ *             Must allow as many bytes of shared memory as returned by
+ *             heap_parallelscan_estimate.  Call this just once in the leader
+ *             process; then, individual workers attach via heap_beginscan_parallel.
+ * ----------------
+ */
+void
+heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
+                                                        Snapshot snapshot)
+{
+       target->phs_relid = RelationGetRelid(relation);
+       target->phs_nblocks = RelationGetNumberOfBlocks(relation);
+       /* compare phs_syncscan initialization to similar logic in initscan */
+       target->phs_syncscan = synchronize_seqscans &&
+               !RelationUsesLocalBuffers(relation) &&
+               target->phs_nblocks > NBuffers / 4;
+       SpinLockInit(&target->phs_mutex);
+       target->phs_cblock = InvalidBlockNumber;
+       target->phs_startblock = InvalidBlockNumber;
+       SerializeSnapshot(snapshot, target->phs_snapshot_data);
+}
+
+/* ----------------
+ *             heap_beginscan_parallel - join a parallel scan
+ *
+ *             Caller must hold a suitable lock on the correct relation.
+ * ----------------
+ */
+HeapScanDesc
+heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
+{
+       Snapshot        snapshot;
+
+       Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
+       snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
+       RegisterSnapshot(snapshot);
+
+       return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
+                                                                  true, true, true, false, false, true);
+}
+
+/* ----------------
+ *             heap_parallelscan_nextpage - get the next page to scan
+ *
+ *             Get the next page to scan.  Even if there are no pages left to scan,
+ *             another backend could have grabbed a page to scan and not yet finished
+ *             looking at it, so it doesn't follow that the scan is done when the
+ *             first backend gets an InvalidBlockNumber return.
+ * ----------------
+ */
+static BlockNumber
+heap_parallelscan_nextpage(HeapScanDesc scan)
+{
+       BlockNumber page = InvalidBlockNumber;
+       BlockNumber sync_startpage = InvalidBlockNumber;
+       BlockNumber     report_page = InvalidBlockNumber;
+       ParallelHeapScanDesc parallel_scan;
+
+       Assert(scan->rs_parallel);
+       parallel_scan = scan->rs_parallel;
+
+retry:
+       /* Grab the spinlock. */
+       SpinLockAcquire(&parallel_scan->phs_mutex);
+
+       /*
+        * If the scan's startblock has not yet been initialized, we must do so
+        * now.  If this is not a synchronized scan, we just start at block 0, but
+        * if it is a synchronized scan, we must get the starting position from
+        * the synchronized scan machinery.  We can't hold the spinlock while
+        * doing that, though, so release the spinlock, get the information we
+        * need, and retry.  If nobody else has initialized the scan in the
+        * meantime, we'll fill in the value we fetched on the second time
+        * through.
+        */
+       if (parallel_scan->phs_startblock == InvalidBlockNumber)
+       {
+               if (!parallel_scan->phs_syncscan)
+                       parallel_scan->phs_startblock = 0;
+               else if (sync_startpage != InvalidBlockNumber)
+                       parallel_scan->phs_startblock = sync_startpage;
+               else
+               {
+                       SpinLockRelease(&parallel_scan->phs_mutex);
+                       sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
+                       goto retry;
+               }
+               parallel_scan->phs_cblock = parallel_scan->phs_startblock;
+       }
+
+       /*
+        * The current block number is the next one that needs to be scanned,
+        * unless it's InvalidBlockNumber already, in which case there are no more
+        * blocks to scan.  After remembering the current value, we must advance
+        * it so that the next call to this function returns the next block to be
+        * scanned.
+        */
+       page = parallel_scan->phs_cblock;
+       if (page != InvalidBlockNumber)
+       {
+               parallel_scan->phs_cblock++;
+               if (parallel_scan->phs_cblock >= scan->rs_nblocks)
+                       parallel_scan->phs_cblock = 0;
+               if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
+               {
+                       parallel_scan->phs_cblock = InvalidBlockNumber;
+                       report_page = parallel_scan->phs_startblock;
+               }
+       }
+
+       /* Release the lock. */
+       SpinLockRelease(&parallel_scan->phs_mutex);
+
+       /*
+        * Report scan location.  Normally, we report the current page number.
+        * When we reach the end of the scan, though, we report the starting page,
+        * not the ending page, just so the starting positions for later scans
+        * doesn't slew backwards.  We only report the position at the end of the
+        * scan once, though: subsequent callers will have report nothing, since
+        * they will have page == InvalidBlockNumber.
+        */
+       if (scan->rs_syncscan)
+       {
+               if (report_page == InvalidBlockNumber)
+                       report_page = page;
+               if (report_page != InvalidBlockNumber)
+                       ss_report_location(scan->rs_rd, report_page);
+       }
+
+       return page;
+}
+
 /* ----------------
  *             heap_getnext    - retrieve next tuple in scan
  *
index 75e6b72f9e0204913254548a42322a6fa7708d63..98eeadd23f82ca227ab7b2610130e1148e4f6586 100644 (file)
@@ -96,8 +96,9 @@ extern Relation heap_openrv_extended(const RangeVar *relation,
 
 #define heap_close(r,l)  relation_close(r,l)
 
-/* struct definition appears in relscan.h */
+/* struct definitions appear in relscan.h */
 typedef struct HeapScanDescData *HeapScanDesc;
+typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc;
 
 /*
  * HeapScanIsValid
@@ -126,6 +127,11 @@ extern void heap_rescan_set_params(HeapScanDesc scan, ScanKey key,
 extern void heap_endscan(HeapScanDesc scan);
 extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction);
 
+extern Size heap_parallelscan_estimate(Snapshot snapshot);
+extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
+                                                        Relation relation, Snapshot snapshot);
+extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
+
 extern bool heap_fetch(Relation relation, Snapshot snapshot,
                   HeapTuple tuple, Buffer *userbuf, bool keep_buf,
                   Relation stats_relation);
index 6e6231971fdca3ef0a780f179b7cda515b4835a3..356c7e6b048f98d7a03a5d1475cea1f7c4fb9ed8 100644 (file)
 #include "access/itup.h"
 #include "access/tupdesc.h"
 
+/*
+ * Shared state for parallel heap scan.
+ *
+ * Each backend participating in a parallel heap scan has its own
+ * HeapScanDesc in backend-private memory, and those objects all contain
+ * a pointer to this structure.  The information here must be sufficient
+ * to properly initialize each new HeapScanDesc as workers join the scan,
+ * and it must act as a font of block numbers for those workers.
+ */
+typedef struct ParallelHeapScanDescData
+{
+       Oid                     phs_relid;              /* OID of relation to scan */
+       bool            phs_syncscan;   /* report location to syncscan logic? */
+       BlockNumber phs_nblocks;        /* # blocks in relation at start of scan */
+       slock_t         phs_mutex;              /* mutual exclusion for block number fields */
+       BlockNumber phs_startblock; /* starting block number */
+       BlockNumber phs_cblock;         /* current block number */
+       char            phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+}      ParallelHeapScanDescData;
 
 typedef struct HeapScanDescData
 {
@@ -49,6 +68,7 @@ typedef struct HeapScanDescData
        BlockNumber rs_cblock;          /* current block # in scan, if any */
        Buffer          rs_cbuf;                /* current buffer in scan, if any */
        /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
+       ParallelHeapScanDesc rs_parallel;       /* parallel scan information */
 
        /* these fields only used in page-at-a-time mode and for bitmap scans */
        int                     rs_cindex;              /* current tuple's index in vistuples */