]> granicus.if.org Git - postgresql/commitdiff
Fix GiST index build for NaN values in geometric types.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 14 Jul 2016 22:45:59 +0000 (18:45 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 14 Jul 2016 22:45:59 +0000 (18:45 -0400)
GiST index build could go into an infinite loop when presented with boxes
(or points, circles or polygons) containing NaN component values.  This
happened essentially because the code assumed that x == x is true for any
"double" value x; but it's not true for NaNs.  The looping behavior was not
the only problem though: we also attempted to sort the items using simple
double comparisons.  Since NaNs violate the trichotomy law, qsort could
(in principle at least) get arbitrarily confused and mess up the sorting of
ordinary values as well as NaNs.  And we based splitting choices on box size
calculations that could produce NaNs, again resulting in undesirable
behavior.

To fix, replace all comparisons of doubles in this logic with
float8_cmp_internal, which is NaN-aware and is careful to sort NaNs
consistently, higher than any non-NaN.  Also rearrange the box size
calculation to not produce NaNs; instead it should produce an infinity
for a box with NaN on one side and not-NaN on the other.

I don't by any means claim that this solves all problems with NaNs in
geometric values, but it should at least make GiST index insertion work
reliably with such data.  It's likely that the index search side of things
still needs some work, and probably regular geometric operations too.
But with this patch we're laying down a convention for how such cases
ought to behave.

Per bug #14238 from Guang-Dih Lei.  Back-patch to 9.2; the code used before
commit 7f3bd86843e5aad8 is quite different and doesn't lock up on my simple
test case, nor on the submitter's dataset.

Report: <20160708151747.1426.60150@wrigleys.postgresql.org>
Discussion: <28685.1468246504@sss.pgh.pa.us>

src/backend/access/gist/gistproc.c
src/backend/utils/adt/float.c
src/include/utils/builtins.h

index e8213e2baff89618a91aaa272d201b5a86187a20..d47211afc0e4f0338a8632905bb321b1f8b5673d 100644 (file)
  */
 #include "postgres.h"
 
+#include <math.h>
+
 #include "access/gist.h"
 #include "access/stratnum.h"
+#include "utils/builtins.h"
 #include "utils/geo_decls.h"
 
 
 static bool gist_box_leaf_consistent(BOX *key, BOX *query,
                                                 StrategyNumber strategy);
-static double size_box(BOX *box);
 static bool rtree_internal_consistent(BOX *key, BOX *query,
                                                  StrategyNumber strategy);
 
 /* Minimum accepted ratio of split */
 #define LIMIT_RATIO 0.3
 
+/* Convenience macros for NaN-aware comparisons */
+#define FLOAT8_EQ(a,b) (float8_cmp_internal(a, b) == 0)
+#define FLOAT8_LT(a,b) (float8_cmp_internal(a, b) < 0)
+#define FLOAT8_LE(a,b) (float8_cmp_internal(a, b) <= 0)
+#define FLOAT8_GT(a,b) (float8_cmp_internal(a, b) > 0)
+#define FLOAT8_GE(a,b) (float8_cmp_internal(a, b) >= 0)
+#define FLOAT8_MAX(a,b)  (FLOAT8_GT(a, b) ? (a) : (b))
+#define FLOAT8_MIN(a,b)  (FLOAT8_LT(a, b) ? (a) : (b))
+
 
 /**************************************************
  * Box ops
@@ -40,12 +51,53 @@ static bool rtree_internal_consistent(BOX *key, BOX *query,
  * Calculates union of two boxes, a and b. The result is stored in *n.
  */
 static void
-rt_box_union(BOX *n, BOX *a, BOX *b)
+rt_box_union(BOX *n, const BOX *a, const BOX *b)
+{
+       n->high.x = FLOAT8_MAX(a->high.x, b->high.x);
+       n->high.y = FLOAT8_MAX(a->high.y, b->high.y);
+       n->low.x = FLOAT8_MIN(a->low.x, b->low.x);
+       n->low.y = FLOAT8_MIN(a->low.y, b->low.y);
+}
+
+/*
+ * Size of a BOX for penalty-calculation purposes.
+ * The result can be +Infinity, but not NaN.
+ */
+static double
+size_box(const BOX *box)
+{
+       /*
+        * Check for zero-width cases.  Note that we define the size of a zero-
+        * by-infinity box as zero.  It's important to special-case this somehow,
+        * as naively multiplying infinity by zero will produce NaN.
+        *
+        * The less-than cases should not happen, but if they do, say "zero".
+        */
+       if (FLOAT8_LE(box->high.x, box->low.x) ||
+               FLOAT8_LE(box->high.y, box->low.y))
+               return 0.0;
+
+       /*
+        * We treat NaN as larger than +Infinity, so any distance involving a NaN
+        * and a non-NaN is infinite.  Note the previous check eliminated the
+        * possibility that the low fields are NaNs.
+        */
+       if (isnan(box->high.x) || isnan(box->high.y))
+               return get_float8_infinity();
+       return (box->high.x - box->low.x) * (box->high.y - box->low.y);
+}
+
+/*
+ * Return amount by which the union of the two boxes is larger than
+ * the original BOX's area.  The result can be +Infinity, but not NaN.
+ */
+static double
+box_penalty(const BOX *original, const BOX *new)
 {
-       n->high.x = Max(a->high.x, b->high.x);
-       n->high.y = Max(a->high.y, b->high.y);
-       n->low.x = Min(a->low.x, b->low.x);
-       n->low.y = Min(a->low.y, b->low.y);
+       BOX                     unionbox;
+
+       rt_box_union(&unionbox, original, new);
+       return size_box(&unionbox) - size_box(original);
 }
 
 /*
@@ -85,16 +137,19 @@ gist_box_consistent(PG_FUNCTION_ARGS)
                                                                                                 strategy));
 }
 
+/*
+ * Increase BOX b to include addon.
+ */
 static void
-adjustBox(BOX *b, BOX *addon)
+adjustBox(BOX *b, const BOX *addon)
 {
-       if (b->high.x < addon->high.x)
+       if (FLOAT8_LT(b->high.x, addon->high.x))
                b->high.x = addon->high.x;
-       if (b->low.x > addon->low.x)
+       if (FLOAT8_GT(b->low.x, addon->low.x))
                b->low.x = addon->low.x;
-       if (b->high.y < addon->high.y)
+       if (FLOAT8_LT(b->high.y, addon->high.y))
                b->high.y = addon->high.y;
-       if (b->low.y > addon->low.y)
+       if (FLOAT8_GT(b->low.y, addon->low.y))
                b->low.y = addon->low.y;
 }
 
@@ -174,10 +229,8 @@ gist_box_penalty(PG_FUNCTION_ARGS)
        float      *result = (float *) PG_GETARG_POINTER(2);
        BOX                *origbox = DatumGetBoxP(origentry->key);
        BOX                *newbox = DatumGetBoxP(newentry->key);
-       BOX                     unionbox;
 
-       rt_box_union(&unionbox, origbox, newbox);
-       *result = (float) (size_box(&unionbox) - size_box(origbox));
+       *result = (float) box_penalty(origbox, newbox);
        PG_RETURN_POINTER(result);
 }
 
@@ -290,12 +343,7 @@ interval_cmp_lower(const void *i1, const void *i2)
        double          lower1 = ((const SplitInterval *) i1)->lower,
                                lower2 = ((const SplitInterval *) i2)->lower;
 
-       if (lower1 < lower2)
-               return -1;
-       else if (lower1 > lower2)
-               return 1;
-       else
-               return 0;
+       return float8_cmp_internal(lower1, lower2);
 }
 
 /*
@@ -307,16 +355,11 @@ interval_cmp_upper(const void *i1, const void *i2)
        double          upper1 = ((const SplitInterval *) i1)->upper,
                                upper2 = ((const SplitInterval *) i2)->upper;
 
-       if (upper1 < upper2)
-               return -1;
-       else if (upper1 > upper2)
-               return 1;
-       else
-               return 0;
+       return float8_cmp_internal(upper1, upper2);
 }
 
 /*
- * Replace negative value with zero.
+ * Replace negative (or NaN) value with zero.
  */
 static inline float
 non_negative(float val)
@@ -435,25 +478,9 @@ g_box_consider_split(ConsiderSplitContext *context, int dimNum,
        }
 }
 
-/*
- * Return increase of original BOX area by new BOX area insertion.
- */
-static double
-box_penalty(BOX *original, BOX *new)
-{
-       double          union_width,
-                               union_height;
-
-       union_width = Max(original->high.x, new->high.x) -
-               Min(original->low.x, new->low.x);
-       union_height = Max(original->high.y, new->high.y) -
-               Min(original->low.y, new->low.y);
-       return union_width * union_height - (original->high.x - original->low.x) *
-               (original->high.y - original->low.y);
-}
-
 /*
  * Compare common entries by their deltas.
+ * (We assume the deltas can't be NaN.)
  */
 static int
 common_entry_cmp(const void *i1, const void *i2)
@@ -615,9 +642,11 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
                        /*
                         * Find next lower bound of right group.
                         */
-                       while (i1 < nentries && rightLower == intervalsLower[i1].lower)
+                       while (i1 < nentries &&
+                                  FLOAT8_EQ(rightLower, intervalsLower[i1].lower))
                        {
-                               leftUpper = Max(leftUpper, intervalsLower[i1].upper);
+                               if (FLOAT8_LT(leftUpper, intervalsLower[i1].upper))
+                                       leftUpper = intervalsLower[i1].upper;
                                i1++;
                        }
                        if (i1 >= nentries)
@@ -628,7 +657,8 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
                         * Find count of intervals which anyway should be placed to the
                         * left group.
                         */
-                       while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
+                       while (i2 < nentries &&
+                                  FLOAT8_LE(intervalsUpper[i2].upper, leftUpper))
                                i2++;
 
                        /*
@@ -650,9 +680,10 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
                        /*
                         * Find next upper bound of left group.
                         */
-                       while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
+                       while (i2 >= 0 && FLOAT8_EQ(leftUpper, intervalsUpper[i2].upper))
                        {
-                               rightLower = Min(rightLower, intervalsUpper[i2].lower);
+                               if (FLOAT8_GT(rightLower, intervalsUpper[i2].lower))
+                                       rightLower = intervalsUpper[i2].lower;
                                i2--;
                        }
                        if (i2 < 0)
@@ -663,7 +694,7 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
                         * Find count of intervals which anyway should be placed to the
                         * right group.
                         */
-                       while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
+                       while (i1 >= 0 && FLOAT8_GE(intervalsLower[i1].lower, rightLower))
                                i1--;
 
                        /*
@@ -751,10 +782,10 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
                        upper = box->high.y;
                }
 
-               if (upper <= context.leftUpper)
+               if (FLOAT8_LE(upper, context.leftUpper))
                {
                        /* Fits to the left group */
-                       if (lower >= context.rightLower)
+                       if (FLOAT8_GE(lower, context.rightLower))
                        {
                                /* Fits also to the right group, so "common entry" */
                                commonEntries[commonEntriesCount++].index = i;
@@ -772,7 +803,7 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
                         * entry didn't fit on the left group, it better fit in the right
                         * group.
                         */
-                       Assert(lower >= context.rightLower);
+                       Assert(FLOAT8_GE(lower, context.rightLower));
 
                        /* Doesn't fit to the left group, so join to the right group */
                        PLACE_RIGHT(box, i);
@@ -856,8 +887,10 @@ gist_box_same(PG_FUNCTION_ARGS)
        bool       *result = (bool *) PG_GETARG_POINTER(2);
 
        if (b1 && b2)
-               *result = (b1->low.x == b2->low.x && b1->low.y == b2->low.y &&
-                                  b1->high.x == b2->high.x && b1->high.y == b2->high.y);
+               *result = (FLOAT8_EQ(b1->low.x, b2->low.x) &&
+                                  FLOAT8_EQ(b1->low.y, b2->low.y) &&
+                                  FLOAT8_EQ(b1->high.x, b2->high.x) &&
+                                  FLOAT8_EQ(b1->high.y, b2->high.y));
        else
                *result = (b1 == NULL && b2 == NULL);
        PG_RETURN_POINTER(result);
@@ -943,14 +976,6 @@ gist_box_leaf_consistent(BOX *key, BOX *query, StrategyNumber strategy)
        return retval;
 }
 
-static double
-size_box(BOX *box)
-{
-       if (box->high.x <= box->low.x || box->high.y <= box->low.y)
-               return 0.0;
-       return (box->high.x - box->low.x) * (box->high.y - box->low.y);
-}
-
 /*****************************************
  * Common rtree functions (for boxes, polygons, and circles)
  *****************************************/
index cdc5d52b63342223874b68ad15062a88ff27ca94..8aa17e1dcb903a6051afb2ff7429808b5b816c54 100644 (file)
@@ -90,8 +90,6 @@ float8                degree_c_one_half = 0.5;
 float8         degree_c_one = 1.0;
 
 /* Local function prototypes */
-static int     float4_cmp_internal(float4 a, float4 b);
-static int     float8_cmp_internal(float8 a, float8 b);
 static double sind_q1(double x);
 static double cosd_q1(double x);
 static void init_degree_constants(void);
@@ -936,7 +934,7 @@ float8div(PG_FUNCTION_ARGS)
 /*
  *             float4{eq,ne,lt,le,gt,ge}               - float4/float4 comparison operations
  */
-static int
+int
 float4_cmp_internal(float4 a, float4 b)
 {
        /*
@@ -1050,7 +1048,7 @@ btfloat4sortsupport(PG_FUNCTION_ARGS)
 /*
  *             float8{eq,ne,lt,le,gt,ge}               - float8/float8 comparison operations
  */
-static int
+int
 float8_cmp_internal(float8 a, float8 b)
 {
        /*
index 01976a1f9b91d03d97c6d60c9e6adf42b0b27125..8cebc861f47a568fb7e2046fa20ebfddc6702a31 100644 (file)
@@ -346,6 +346,8 @@ extern int  is_infinite(double val);
 extern double float8in_internal(char *num, char **endptr_p,
                                  const char *type_name, const char *orig_string);
 extern char *float8out_internal(double num);
+extern int     float4_cmp_internal(float4 a, float4 b);
+extern int     float8_cmp_internal(float8 a, float8 b);
 
 extern Datum float4in(PG_FUNCTION_ARGS);
 extern Datum float4out(PG_FUNCTION_ARGS);