]> granicus.if.org Git - postgresql/commitdiff
SP-GiST support of the range adjacent operator -|-
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 8 Mar 2013 13:03:19 +0000 (15:03 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 8 Mar 2013 13:03:19 +0000 (15:03 +0200)
Alexander Korotkov, reviewed by Jeff Davis.

src/backend/utils/adt/rangetypes.c
src/backend/utils/adt/rangetypes_spgist.c
src/include/catalog/pg_amop.h
src/include/utils/rangetypes.h
src/test/regress/expected/opr_sanity.out

index c440847916b9ccb70a5755ccc82d0c44e957364a..84a4aca16c097a69af89898b476bda0e19b2d698 100644 (file)
@@ -709,6 +709,64 @@ range_after(PG_FUNCTION_ARGS)
        PG_RETURN_BOOL(range_after_internal(typcache, r1, r2));
 }
 
+/*
+ * Check if two bounds A and B are "adjacent", where A is an upper bound and B
+ * is a lower bound. For the bounds to be adjacent, each subtype value must
+ * satisfy strictly one of the bounds: there are no values which satisfy both
+ * bounds (i.e. less than A and greater than B); and there are no values which
+ * satisfy neither bound (i.e. greater than A and less than B).
+ *
+ * For discrete ranges, we rely on the canonicalization function to see if A..B
+ * normalizes to empty. (If there is no canonicalization function, it's
+ * impossible for such a range to normalize to empty, so we needn't bother to
+ * try.)
+ *
+ * If A == B, the ranges are adjacent only if the bounds have different
+ * inclusive flags (i.e., exactly one of the ranges includes the common
+ * boundary point).
+ *
+ * And if A > B then the ranges are not adjacent in this order.
+ */
+bool
+bounds_adjacent(TypeCacheEntry *typcache, RangeBound boundA, RangeBound boundB)
+{
+       int                     cmp;
+
+       Assert(!boundA.lower && boundB.lower);
+
+       cmp = range_cmp_bound_values(typcache, &boundA, &boundB);
+       if (cmp < 0)
+       {
+               RangeType *r;
+
+               /*
+                * Bounds do not overlap; see if there are points in between.
+                */
+
+               /* in a continuous subtype, there are assumed to be points between */
+               if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
+                       return false;
+
+               /*
+                * The bounds are of a discrete range type; so make a range A..B and
+                * see if it's empty.
+                */
+
+               /* flip the inclusion flags */
+               boundA.inclusive = !boundA.inclusive;
+               boundB.inclusive = !boundB.inclusive;
+               /* change upper/lower labels to avoid Assert failures */
+               boundA.lower = true;
+               boundB.lower = false;
+               r = make_range(typcache, &boundA, &boundB, false);
+               return RangeIsEmpty(r);
+       }
+       else if (cmp == 0)
+               return boundA.inclusive != boundB.inclusive;
+       else
+               return false;           /* bounds overlap */
+}
+
 /* adjacent to (but not overlapping)? (internal version) */
 bool
 range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
@@ -719,8 +777,6 @@ range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
                                upper2;
        bool            empty1,
                                empty2;
-       RangeType  *r3;
-       int                     cmp;
 
        /* Different types should be prevented by ANYRANGE matching rules */
        if (RangeTypeGetOid(r1) != RangeTypeGetOid(r2))
@@ -734,62 +790,11 @@ range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
                return false;
 
        /*
-        * Given two ranges A..B and C..D, where B < C, the ranges are adjacent if
-        * and only if the range B..C is empty, where inclusivity of these two
-        * bounds is inverted compared to the original bounds.  For discrete
-        * ranges, we have to rely on the canonicalization function to normalize
-        * B..C to empty if it contains no elements of the subtype.  (If there is
-        * no canonicalization function, it's impossible for such a range to
-        * normalize to empty, so we needn't bother to try.)
-        *
-        * If B == C, the ranges are adjacent only if these bounds have different
-        * inclusive flags (i.e., exactly one of the ranges includes the common
-        * boundary point).
-        *
-        * And if B > C then the ranges cannot be adjacent in this order, but we
-        * must consider the other order (i.e., check D <= A).
+        * Given two ranges A..B and C..D, the ranges are adjacent if and only if
+        * B is adjacent to C, or D is adjacent to A.
         */
-       cmp = range_cmp_bound_values(typcache, &upper1, &lower2);
-       if (cmp < 0)
-       {
-               /* in a continuous subtype, there are assumed to be points between */
-               if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
-                       return (false);
-               /* flip the inclusion flags */
-               upper1.inclusive = !upper1.inclusive;
-               lower2.inclusive = !lower2.inclusive;
-               /* change upper/lower labels to avoid Assert failures */
-               upper1.lower = true;
-               lower2.lower = false;
-               r3 = make_range(typcache, &upper1, &lower2, false);
-               return RangeIsEmpty(r3);
-       }
-       if (cmp == 0)
-       {
-               return (upper1.inclusive != lower2.inclusive);
-       }
-
-       cmp = range_cmp_bound_values(typcache, &upper2, &lower1);
-       if (cmp < 0)
-       {
-               /* in a continuous subtype, there are assumed to be points between */
-               if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
-                       return (false);
-               /* flip the inclusion flags */
-               upper2.inclusive = !upper2.inclusive;
-               lower1.inclusive = !lower1.inclusive;
-               /* change upper/lower labels to avoid Assert failures */
-               upper2.lower = true;
-               lower1.lower = false;
-               r3 = make_range(typcache, &upper2, &lower1, false);
-               return RangeIsEmpty(r3);
-       }
-       if (cmp == 0)
-       {
-               return (upper2.inclusive != lower1.inclusive);
-       }
-
-       return false;
+       return (bounds_adjacent(typcache, upper1, lower2) ||
+                       bounds_adjacent(typcache, upper2, lower1));
 }
 
 /* adjacent to (but not overlapping)? */
index d395ef47180d1f5032b001392337cce42cbb2ad9..9a7f20d9f37ec00b77c199004f7926a92c1d0319 100644 (file)
@@ -305,6 +305,16 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
        int                     which;
        int                     i;
 
+       /*
+        * For adjacent search we need also previous centroid (if any) to improve
+        * the precision of the consistent check. In this case needPrevious flag is
+        * set and centroid is passed into reconstructedValues. This is not the
+        * intended purpose of reconstructedValues (because we already have the
+        * full value available at the leaf), but it's a convenient place to store
+        * state while traversing the tree.
+        */
+       bool            needPrevious = false;
+
        if (in->allTheSame)
        {
                /* Report that all nodes should be visited */
@@ -351,6 +361,7 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
                                case RANGESTRAT_OVERLAPS:
                                case RANGESTRAT_OVERRIGHT:
                                case RANGESTRAT_AFTER:
+                               case RANGESTRAT_ADJACENT:
                                        /* These strategies return false if any argument is empty */
                                        if (empty)
                                                which = 0;
@@ -435,6 +446,9 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
                        /* Are the restrictions on range bounds inclusive? */
                        bool            inclusive = true;
                        bool            strictEmpty = true;
+                       int                     cmp,
+                                               which1,
+                                               which2;
 
                        strategy = in->scankeys[i].sk_strategy;
 
@@ -522,6 +536,118 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
                                        inclusive = false;
                                        break;
 
+                               case RANGESTRAT_ADJACENT:
+                                       if (empty)
+                                               break;                          /* Skip to strictEmpty check. */
+
+                                       /*
+                                        * which1 is bitmask for possibility to be adjacent with
+                                        * lower bound of argument. which2 is bitmask for
+                                        * possibility to be adjacent with upper bound of argument.
+                                        */
+                                       which1 = which2 = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
+
+                                       /*
+                                        * Previously selected quadrant could exclude possibility
+                                        * for lower or upper bounds to be adjacent. Deserialize
+                                        * previous centroid range if present for checking this.
+                                        */
+                                       if (in->reconstructedValue != (Datum) 0)
+                                       {
+                                               RangeType  *prevCentroid;
+                                               RangeBound      prevLower,
+                                                                       prevUpper;
+                                               bool            prevEmpty;
+                                               int                     cmp1,
+                                                                       cmp2;
+
+                                               prevCentroid = DatumGetRangeType(in->reconstructedValue);
+                                               range_deserialize(typcache, prevCentroid,
+                                                                                 &prevLower, &prevUpper, &prevEmpty);
+
+                                               /*
+                                                * Check if lower bound of argument is not in a
+                                                * quadrant we visited in the previous step.
+                                                */
+                                               cmp1 = range_cmp_bounds(typcache, &lower, &prevUpper);
+                                               cmp2 = range_cmp_bounds(typcache, &centroidUpper,
+                                                                                               &prevUpper);
+                                               if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
+                                                       which1 = 0;
+
+                                               /*
+                                                * Check if upper bound of argument is not in a
+                                                * quadrant we visited in the previous step.
+                                                */
+                                               cmp1 = range_cmp_bounds(typcache, &upper, &prevLower);
+                                               cmp2 = range_cmp_bounds(typcache, &centroidLower,
+                                                                                               &prevLower);
+                                               if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
+                                                       which2 = 0;
+                                       }
+
+                                       if (which1)
+                                       {
+                                               /*
+                                                * For a range's upper bound to be adjacent to the
+                                                * argument's lower bound, it will be found along the
+                                                * line adjacent to (and just below) Y=lower.
+                                                * Therefore, if the argument's lower bound is less
+                                                * than the centroid's upper bound, the line falls in
+                                                * quadrants 2 and 3; if greater, the line falls in
+                                                * quadrants 1 and 4.
+                                                *
+                                                * The above is true even when the argument's lower
+                                                * bound is greater and adjacent to the centroid's
+                                                * upper bound. If the argument's lower bound is
+                                                * greater than the centroid's upper bound, then the
+                                                * lowest value that an adjacent range could have is
+                                                * that of the centroid's upper bound, which still
+                                                * falls in quadrants 1 and 4.
+                                                *
+                                                * In the edge case, where the argument's lower bound
+                                                * is equal to the cetroid's upper bound, there may be
+                                                * adjacent ranges in any quadrant.
+                                                */
+                                               cmp = range_cmp_bounds(typcache, &lower,
+                                                                                          &centroidUpper);
+                                               if (cmp < 0)
+                                                       which1 &= (1 << 2) | (1 << 3);
+                                               else if (cmp > 0)
+                                                       which1 &= (1 << 1) | (1 << 4);
+                                       }
+
+                                       if (which2)
+                                       {
+                                               /*
+                                                * For a range's lower bound to be adjacent to the
+                                                * argument's upper bound, it will be found along the
+                                                * line adjacent to (and just right of)
+                                                * X=upper. Therefore, if the argument's upper bound is
+                                                * less than (and not adjacent to) the centroid's upper
+                                                * bound, the line falls in quadrants 3 and 4; if
+                                                * greater or equal to, the line falls in quadrants 1
+                                                * and 2.
+                                                *
+                                                * The edge case is when the argument's upper bound is
+                                                * less than and adjacent to the centroid's lower
+                                                * bound. In that case, adjacent ranges may be in any
+                                                * quadrant.
+                                                */
+                                               cmp = range_cmp_bounds(typcache, &lower,
+                                                                                          &centroidUpper);
+                                               if (cmp < 0 &&
+                                                       !bounds_adjacent(typcache, upper, centroidLower))
+                                                       which1 &= (1 << 3) | (1 << 4);
+                                               else if (cmp > 0)
+                                                       which1 &= (1 << 1) | (1 << 2);
+                                       }
+
+                                       which &= which1 | which2;
+
+                                       needPrevious = true;
+                                       break;
+
                                case RANGESTRAT_CONTAINS:
                                        /*
                                         * Non-empty range A contains non-empty range B if lower
@@ -652,11 +778,18 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
 
        /* We must descend into the quadrant(s) identified by 'which' */
        out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
+       if (needPrevious)
+               out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes);
        out->nNodes = 0;
        for (i = 1; i <= in->nNodes; i++)
        {
                if (which & (1 << i))
+               {
+                       /* Save previous prefix if needed */
+                       if (needPrevious)
+                               out->reconstructedValues[out->nNodes] = in->prefixDatum;
                        out->nodeNumbers[out->nNodes++] = i - 1;
+               }
        }
 
        PG_RETURN_VOID();
@@ -713,6 +846,10 @@ spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)
                                res = range_after_internal(typcache, leafRange,
                                                                                   DatumGetRangeType(keyDatum));
                                break;
+                       case RANGESTRAT_ADJACENT:
+                               res = range_adjacent_internal(typcache, leafRange,
+                                                                                  DatumGetRangeType(keyDatum));
+                               break;
                        case RANGESTRAT_CONTAINS:
                                res = range_contains_internal(typcache, leafRange,
                                                                                          DatumGetRangeType(keyDatum));
index 5d451e36c11b9a61f77e1d80cf224889deacafd2..d2003487472a0c1ba67d88cb3e9b70f2857e6a4b 100644 (file)
@@ -775,6 +775,7 @@ DATA(insert (       3474   3831 3831 2 s    3895 4000 0 ));
 DATA(insert (  3474   3831 3831 3 s    3888 4000 0 ));
 DATA(insert (  3474   3831 3831 4 s    3896 4000 0 ));
 DATA(insert (  3474   3831 3831 5 s    3894 4000 0 ));
+DATA(insert (  3474   3831 3831 6 s    3897 4000 0 ));
 DATA(insert (  3474   3831 3831 7 s    3890 4000 0 ));
 DATA(insert (  3474   3831 3831 8 s    3892 4000 0 ));
 DATA(insert (  3474   3831 2283 16 s   3889 4000 0 ));
index 67b1a7ce61101cc9fb4749fb977b52ba8b6471d3..f6b941d3acc7bcaaf890e69c93c8cdfe29896d0f 100644 (file)
@@ -201,6 +201,8 @@ extern int range_cmp_bounds(TypeCacheEntry *typcache, RangeBound *b1,
                                 RangeBound *b2);
 extern int range_cmp_bound_values(TypeCacheEntry *typcache, RangeBound *b1,
                                           RangeBound *b2);
+extern bool bounds_adjacent(TypeCacheEntry *typcache, RangeBound bound1,
+                               RangeBound bound2);
 extern RangeType *make_empty_range(TypeCacheEntry *typcache);
 
 /* GiST support (in rangetypes_gist.c) */
index 6faaf4272f8fe2f64b85dd98fa20d260c79d4cbf..256b7196d0d67ae0406af84c353483b6765e7538 100644 (file)
@@ -1076,6 +1076,7 @@ ORDER BY 1, 2, 3;
        4000 |            4 | ~>=~
        4000 |            5 | >>
        4000 |            5 | ~>~
+       4000 |            6 | -|-
        4000 |            6 | ~=
        4000 |            7 | @>
        4000 |            8 | <@
@@ -1087,7 +1088,7 @@ ORDER BY 1, 2, 3;
        4000 |           15 | >
        4000 |           16 | @>
        4000 |           18 | =
-(61 rows)
+(62 rows)
 
 -- Check that all opclass search operators have selectivity estimators.
 -- This is not absolutely required, but it seems a reasonable thing