]> granicus.if.org Git - postgresql/commitdiff
Extend index AM API for parallel index scans.
authorRobert Haas <rhaas@postgresql.org>
Tue, 24 Jan 2017 21:42:58 +0000 (16:42 -0500)
committerRobert Haas <rhaas@postgresql.org>
Tue, 24 Jan 2017 21:42:58 +0000 (16:42 -0500)
This patch doesn't actually make any index AM parallel-aware, but it
provides the necessary functions at the AM layer to do so.

Rahila Syed, Amit Kapila, Robert Haas

14 files changed:
contrib/bloom/blutils.c
doc/src/sgml/indexam.sgml
src/backend/access/brin/brin.c
src/backend/access/gin/ginutil.c
src/backend/access/gist/gist.c
src/backend/access/hash/hash.c
src/backend/access/index/indexam.c
src/backend/access/nbtree/nbtree.c
src/backend/access/spgist/spgutils.c
src/include/access/amapi.h
src/include/access/genam.h
src/include/access/relscan.h
src/include/c.h
src/tools/pgindent/typedefs.list

index 06077afed6952ebc4733cada17fe111c8cd8a3fe..858798db85c6693e290dab64b01f1805a3db5ad3 100644 (file)
@@ -138,6 +138,9 @@ blhandler(PG_FUNCTION_ARGS)
        amroutine->amendscan = blendscan;
        amroutine->ammarkpos = NULL;
        amroutine->amrestrpos = NULL;
+       amroutine->amestimateparallelscan = NULL;
+       amroutine->aminitparallelscan = NULL;
+       amroutine->amparallelrescan = NULL;
 
        PG_RETURN_POINTER(amroutine);
 }
index 40f201b11be211944e0a00f9dc010858795c7b7f..5d8e5574608316ec248f01549497bdae5ecc70fb 100644 (file)
@@ -131,6 +131,11 @@ typedef struct IndexAmRoutine
     amendscan_function amendscan;
     ammarkpos_function ammarkpos;       /* can be NULL */
     amrestrpos_function amrestrpos;     /* can be NULL */
+
+    /* interface functions to support parallel index scans */
+    amestimateparallelscan_function amestimateparallelscan;    /* can be NULL */
+    aminitparallelscan_function aminitparallelscan;    /* can be NULL */
+    amparallelrescan_function amparallelrescan;    /* can be NULL */
 } IndexAmRoutine;
 </programlisting>
   </para>
@@ -624,6 +629,68 @@ amrestrpos (IndexScanDesc scan);
    the <structfield>amrestrpos</> field in its <structname>IndexAmRoutine</>
    struct may be set to NULL.
   </para>
+
+  <para>
+   In addition to supporting ordinary index scans, some types of index
+   may wish to support <firstterm>parallel index scans</>, which allow
+   multiple backends to cooperate in performing an index scan.  The
+   index access method should arrange things so that each cooperating
+   process returns a subset of the tuples that would be performed by
+   an ordinary, non-parallel index scan, but in such a way that the
+   union of those subsets is equal to the set of tuples that would be
+   returned by an ordinary, non-parallel index scan.  Furthermore, while
+   there need not be any global ordering of tuples returned by a parallel
+   scan, the ordering of that subset of tuples returned within each
+   cooperating backend must match the requested ordering.  The following
+   functions may be implemented to support parallel index scans:
+  </para>
+
+  <para>
+<programlisting>
+Size
+amestimateparallelscan (void);
+</programlisting>
+   Estimate and return the number of bytes of dynamic shared memory which
+   the access method will be needed to perform a parallel scan.  (This number
+   is in addition to, not in lieu of, the amount of space needed for
+   AM-independent data in <structname>ParallelIndexScanDescData</>.)
+  </para>
+
+  <para>
+   It is not necessary to implement this function for access methods which
+   do not support parallel scans or for which the number of additional bytes
+   of storage required is zero.
+  </para>
+
+  <para>
+<programlisting>
+void
+aminitparallelscan (void *target);
+</programlisting>
+   This function will be called to initialize dynamic shared memory at the
+   beginning of a parallel scan.  <parameter>target</> will point to at least
+   the number of bytes previously returned by
+   <function>amestimateparallelscan</>, and this function may use that
+   amount of space to store whatever data it wishes.
+  </para>
+
+  <para>
+   It is not necessary to implement this function for access methods which
+   do not support parallel scans or in cases where the shared memory space
+   required needs no initialization.
+  </para>
+
+  <para>
+<programlisting>
+void
+amparallelrescan (IndexScanDesc scan);
+</programlisting>
+   This function, if implemented, will be called when a parallel index scan
+   must be restarted.  It should reset any shared state set up by
+   <function>aminitparallelscan</> such that the scan will be restarted from
+   the beginning.
+  </para>
+
  </sect1>
 
  <sect1 id="index-scanning">
index d60ddd242cb91bd29173c4b9100e1e4330266ffe..b2afdb7bedb04933e6bfc3e875f80ae71868ee04 100644 (file)
@@ -112,6 +112,9 @@ brinhandler(PG_FUNCTION_ARGS)
        amroutine->amendscan = brinendscan;
        amroutine->ammarkpos = NULL;
        amroutine->amrestrpos = NULL;
+       amroutine->amestimateparallelscan = NULL;
+       amroutine->aminitparallelscan = NULL;
+       amroutine->amparallelrescan = NULL;
 
        PG_RETURN_POINTER(amroutine);
 }
index 3909638906b087bb417f4ee8f1d4898142cf2aed..02d920bb9dbb5b87ac4dcac5ab941e766ac11e08 100644 (file)
@@ -68,6 +68,9 @@ ginhandler(PG_FUNCTION_ARGS)
        amroutine->amendscan = ginendscan;
        amroutine->ammarkpos = NULL;
        amroutine->amrestrpos = NULL;
+       amroutine->amestimateparallelscan = NULL;
+       amroutine->aminitparallelscan = NULL;
+       amroutine->amparallelrescan = NULL;
 
        PG_RETURN_POINTER(amroutine);
 }
index 597056ae440b05f0f68f44a5f98853c8009d4355..c2247ad2f782ff866788a049d49c09c79ec3ac21 100644 (file)
@@ -89,6 +89,9 @@ gisthandler(PG_FUNCTION_ARGS)
        amroutine->amendscan = gistendscan;
        amroutine->ammarkpos = NULL;
        amroutine->amrestrpos = NULL;
+       amroutine->amestimateparallelscan = NULL;
+       amroutine->aminitparallelscan = NULL;
+       amroutine->amparallelrescan = NULL;
 
        PG_RETURN_POINTER(amroutine);
 }
index a64a9b9696a3b3a635b5279f471ceb02894731a1..ec8ed33c7087e22f90973d713b1a2daaf96160d5 100644 (file)
@@ -86,6 +86,9 @@ hashhandler(PG_FUNCTION_ARGS)
        amroutine->amendscan = hashendscan;
        amroutine->ammarkpos = NULL;
        amroutine->amrestrpos = NULL;
+       amroutine->amestimateparallelscan = NULL;
+       amroutine->aminitparallelscan = NULL;
+       amroutine->amparallelrescan = NULL;
 
        PG_RETURN_POINTER(amroutine);
 }
index 4822af95a32e28d529e9b6f57279b943a7b46bc7..ba27c1e86d9f64de25aa2db6cd8da9b31eb6453b 100644 (file)
  *             index_insert    - insert an index tuple into a relation
  *             index_markpos   - mark a scan position
  *             index_restrpos  - restore a scan position
+ *             index_parallelscan_estimate - estimate shared memory for parallel scan
+ *             index_parallelscan_initialize - initialize parallel scan
+ *             index_parallelrescan  - (re)start a parallel scan of an index
+ *             index_beginscan_parallel - join parallel index scan
  *             index_getnext_tid       - get the next TID from a scan
  *             index_fetch_heap                - get the scan's next heap tuple
  *             index_getnext   - get the next heap tuple from a scan
@@ -120,7 +124,8 @@ do { \
 } while(0)
 
 static IndexScanDesc index_beginscan_internal(Relation indexRelation,
-                                                int nkeys, int norderbys, Snapshot snapshot);
+                                                int nkeys, int norderbys, Snapshot snapshot,
+                                                ParallelIndexScanDesc pscan, bool temp_snap);
 
 
 /* ----------------------------------------------------------------
@@ -219,7 +224,7 @@ index_beginscan(Relation heapRelation,
 {
        IndexScanDesc scan;
 
-       scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot);
+       scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot, NULL, false);
 
        /*
         * Save additional parameters into the scandesc.  Everything else was set
@@ -244,7 +249,7 @@ index_beginscan_bitmap(Relation indexRelation,
 {
        IndexScanDesc scan;
 
-       scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot);
+       scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot, NULL, false);
 
        /*
         * Save additional parameters into the scandesc.  Everything else was set
@@ -260,8 +265,11 @@ index_beginscan_bitmap(Relation indexRelation,
  */
 static IndexScanDesc
 index_beginscan_internal(Relation indexRelation,
-                                                int nkeys, int norderbys, Snapshot snapshot)
+                                                int nkeys, int norderbys, Snapshot snapshot,
+                                                ParallelIndexScanDesc pscan, bool temp_snap)
 {
+       IndexScanDesc scan;
+
        RELATION_CHECKS;
        CHECK_REL_PROCEDURE(ambeginscan);
 
@@ -276,8 +284,13 @@ index_beginscan_internal(Relation indexRelation,
        /*
         * Tell the AM to open a scan.
         */
-       return indexRelation->rd_amroutine->ambeginscan(indexRelation, nkeys,
+       scan = indexRelation->rd_amroutine->ambeginscan(indexRelation, nkeys,
                                                                                                        norderbys);
+       /* Initialize information for parallel scan. */
+       scan->parallel_scan = pscan;
+       scan->xs_temp_snap = temp_snap;
+
+       return scan;
 }
 
 /* ----------------
@@ -341,6 +354,9 @@ index_endscan(IndexScanDesc scan)
        /* Release index refcount acquired by index_beginscan */
        RelationDecrementReferenceCount(scan->indexRelation);
 
+       if (scan->xs_temp_snap)
+               UnregisterSnapshot(scan->xs_snapshot);
+
        /* Release the scan data structure itself */
        IndexScanEnd(scan);
 }
@@ -389,6 +405,115 @@ index_restrpos(IndexScanDesc scan)
        scan->indexRelation->rd_amroutine->amrestrpos(scan);
 }
 
+/*
+ * index_parallelscan_estimate - estimate shared memory for parallel scan
+ *
+ * Currently, we don't pass any information to the AM-specific estimator,
+ * so it can probably only return a constant.  In the future, we might need
+ * to pass more information.
+ */
+Size
+index_parallelscan_estimate(Relation indexRelation, Snapshot snapshot)
+{
+       Size            nbytes;
+
+       RELATION_CHECKS;
+
+       nbytes = offsetof(ParallelIndexScanDescData, ps_snapshot_data);
+       nbytes = add_size(nbytes, EstimateSnapshotSpace(snapshot));
+       nbytes = MAXALIGN(nbytes);
+
+       /*
+        * If amestimateparallelscan is not provided, assume there is no
+        * AM-specific data needed.  (It's hard to believe that could work, but
+        * it's easy enough to cater to it here.)
+        */
+       if (indexRelation->rd_amroutine->amestimateparallelscan != NULL)
+               nbytes = add_size(nbytes,
+                                         indexRelation->rd_amroutine->amestimateparallelscan());
+
+       return nbytes;
+}
+
+/*
+ * index_parallelscan_initialize - initialize parallel scan
+ *
+ * We initialize both the ParallelIndexScanDesc proper and the AM-specific
+ * information which follows it.
+ *
+ * This function calls access method specific initialization routine to
+ * initialize am specific information.  Call this just once in the leader
+ * process; then, individual workers attach via index_beginscan_parallel.
+ */
+void
+index_parallelscan_initialize(Relation heapRelation, Relation indexRelation,
+                                                         Snapshot snapshot, ParallelIndexScanDesc target)
+{
+       Size            offset;
+
+       RELATION_CHECKS;
+
+       offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data),
+                                         EstimateSnapshotSpace(snapshot));
+       offset = MAXALIGN(offset);
+
+       target->ps_relid = RelationGetRelid(heapRelation);
+       target->ps_indexid = RelationGetRelid(indexRelation);
+       target->ps_offset = offset;
+       SerializeSnapshot(snapshot, target->ps_snapshot_data);
+
+       /* aminitparallelscan is optional; assume no-op if not provided by AM */
+       if (indexRelation->rd_amroutine->aminitparallelscan != NULL)
+       {
+               void       *amtarget;
+
+               amtarget = OffsetToPointer(target, offset);
+               indexRelation->rd_amroutine->aminitparallelscan(amtarget);
+       }
+}
+
+/* ----------------
+ *             index_parallelrescan  - (re)start a parallel scan of an index
+ * ----------------
+ */
+void
+index_parallelrescan(IndexScanDesc scan)
+{
+       SCAN_CHECKS;
+
+       /* amparallelrescan is optional; assume no-op if not provided by AM */
+       if (scan->indexRelation->rd_amroutine->amparallelrescan != NULL)
+               scan->indexRelation->rd_amroutine->amparallelrescan(scan);
+}
+
+/*
+ * index_beginscan_parallel - join parallel index scan
+ *
+ * Caller must be holding suitable locks on the heap and the index.
+ */
+IndexScanDesc
+index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys,
+                                                int norderbys, ParallelIndexScanDesc pscan)
+{
+       Snapshot        snapshot;
+       IndexScanDesc scan;
+
+       Assert(RelationGetRelid(heaprel) == pscan->ps_relid);
+       snapshot = RestoreSnapshot(pscan->ps_snapshot_data);
+       RegisterSnapshot(snapshot);
+       scan = index_beginscan_internal(indexrel, nkeys, norderbys, snapshot,
+                                                                       pscan, true);
+
+       /*
+        * Save additional parameters into the scandesc.  Everything else was set
+        * up by index_beginscan_internal.
+        */
+       scan->heapRelation = heaprel;
+       scan->xs_snapshot = snapshot;
+
+       return scan;
+}
+
 /* ----------------
  * index_getnext_tid - get the next TID from a scan
  *
index 1bb1acfea6ae468c8d16bcb91966877f2837dc39..469e7abe4dff2612eef8f63d223cd96033287125 100644 (file)
@@ -118,6 +118,9 @@ bthandler(PG_FUNCTION_ARGS)
        amroutine->amendscan = btendscan;
        amroutine->ammarkpos = btmarkpos;
        amroutine->amrestrpos = btrestrpos;
+       amroutine->amestimateparallelscan = NULL;
+       amroutine->aminitparallelscan = NULL;
+       amroutine->amparallelrescan = NULL;
 
        PG_RETURN_POINTER(amroutine);
 }
index ca4b0bdbe4f9f64a945e6c09e847777e18baf099..78846bec6665ab3ad52bb5116ab17a80f7f8b72f 100644 (file)
@@ -68,6 +68,9 @@ spghandler(PG_FUNCTION_ARGS)
        amroutine->amendscan = spgendscan;
        amroutine->ammarkpos = NULL;
        amroutine->amrestrpos = NULL;
+       amroutine->amestimateparallelscan = NULL;
+       amroutine->aminitparallelscan = NULL;
+       amroutine->amparallelrescan = NULL;
 
        PG_RETURN_POINTER(amroutine);
 }
index 6a5f279e7f9ddd7ecae7e43421fb95090a9c6224..e91e41dc0f405e741bc239adab30eb321a5449df 100644 (file)
@@ -137,6 +137,18 @@ typedef void (*ammarkpos_function) (IndexScanDesc scan);
 /* restore marked scan position */
 typedef void (*amrestrpos_function) (IndexScanDesc scan);
 
+/*
+ * Callback function signatures - for parallel index scans.
+ */
+
+/* estimate size of parallel scan descriptor */
+typedef Size (*amestimateparallelscan_function) (void);
+
+/* prepare for parallel index scan */
+typedef void (*aminitparallelscan_function) (void *target);
+
+/* (re)start parallel index scan */
+typedef void (*amparallelrescan_function) (IndexScanDesc scan);
 
 /*
  * API struct for an index AM.  Note this must be stored in a single palloc'd
@@ -196,6 +208,11 @@ typedef struct IndexAmRoutine
        amendscan_function amendscan;
        ammarkpos_function ammarkpos;           /* can be NULL */
        amrestrpos_function amrestrpos;         /* can be NULL */
+
+       /* interface functions to support parallel index scans */
+       amestimateparallelscan_function amestimateparallelscan;         /* can be NULL */
+       aminitparallelscan_function aminitparallelscan;         /* can be NULL */
+       amparallelrescan_function amparallelrescan; /* can be NULL */
 } IndexAmRoutine;
 
 
index b2e078aed2e666fc355af0459e8e83cafbf823ff..51466b96e8b079e24f10169c367ff971007db015 100644 (file)
@@ -83,6 +83,8 @@ typedef bool (*IndexBulkDeleteCallback) (ItemPointer itemptr, void *state);
 typedef struct IndexScanDescData *IndexScanDesc;
 typedef struct SysScanDescData *SysScanDesc;
 
+typedef struct ParallelIndexScanDescData *ParallelIndexScanDesc;
+
 /*
  * Enumeration specifying the type of uniqueness check to perform in
  * index_insert().
@@ -144,6 +146,13 @@ extern void index_rescan(IndexScanDesc scan,
 extern void index_endscan(IndexScanDesc scan);
 extern void index_markpos(IndexScanDesc scan);
 extern void index_restrpos(IndexScanDesc scan);
+extern Size index_parallelscan_estimate(Relation indexrel, Snapshot snapshot);
+extern void index_parallelscan_initialize(Relation heaprel, Relation indexrel,
+                                                       Snapshot snapshot, ParallelIndexScanDesc target);
+extern void index_parallelrescan(IndexScanDesc scan);
+extern IndexScanDesc index_beginscan_parallel(Relation heaprel,
+                                                Relation indexrel, int nkeys, int norderbys,
+                                                ParallelIndexScanDesc pscan);
 extern ItemPointer index_getnext_tid(IndexScanDesc scan,
                                  ScanDirection direction);
 extern HeapTuple index_fetch_heap(IndexScanDesc scan);
index 8746045d8d8c12f86a344a6815bf5c302701e520..ce3ca8d4ac27bf574f0376577e568a037902a693 100644 (file)
@@ -93,6 +93,7 @@ typedef struct IndexScanDescData
        ScanKey         keyData;                /* array of index qualifier descriptors */
        ScanKey         orderByData;    /* array of ordering op descriptors */
        bool            xs_want_itup;   /* caller requests index tuples */
+       bool            xs_temp_snap;   /* unregister snapshot at scan end? */
 
        /* signaling to index AM about killing index tuples */
        bool            kill_prior_tuple;               /* last-returned tuple is dead */
@@ -126,8 +127,20 @@ typedef struct IndexScanDescData
 
        /* state data for traversing HOT chains in index_getnext */
        bool            xs_continue_hot;        /* T if must keep walking HOT chain */
+
+       /* parallel index scan information, in shared memory */
+       ParallelIndexScanDesc parallel_scan;
 }      IndexScanDescData;
 
+/* Generic structure for parallel scans */
+typedef struct ParallelIndexScanDescData
+{
+       Oid                     ps_relid;
+       Oid                     ps_indexid;
+       Size            ps_offset;              /* Offset in bytes of am specific structure */
+       char            ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+} ParallelIndexScanDescData;
+
 /* Struct for heap-or-index scans of system tables */
 typedef struct SysScanDescData
 {
index efbb77f540a4afe004188b3af060db5b2835d39f..a2c043adfbfe314a3b7d59b3645e1da9fbd99d58 100644 (file)
@@ -527,6 +527,9 @@ typedef NameData *Name;
 #define PointerIsAligned(pointer, type) \
                (((uintptr_t)(pointer) % (sizeof (type))) == 0)
 
+#define OffsetToPointer(base, offset) \
+               ((void *)((char *) base + offset))
+
 #define OidIsValid(objectId)  ((bool) ((objectId) != InvalidOid))
 
 #define RegProcedureIsValid(p) OidIsValid(p)
index 993880da43ead9a5925a1bc1a278c51555befabf..c4235ae63a4e544bba1bec004128350adadbcd1b 100644 (file)
@@ -1264,6 +1264,8 @@ OverrideSearchPath
 OverrideStackEntry
 PACE_HEADER
 PACL
+ParallelIndexScanDesc
+ParallelIndexScanDescData
 PATH
 PBOOL
 PCtxtHandle