]> granicus.if.org Git - postgresql/commitdiff
Fix integer-overflow problems in interval comparison.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 6 Apr 2017 03:51:28 +0000 (23:51 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 6 Apr 2017 03:51:28 +0000 (23:51 -0400)
When using integer timestamps, the interval-comparison functions tried
to compute the overall magnitude of an interval as an int64 number of
microseconds.  As reported by Frazer McLean, this overflows for intervals
exceeding about 296000 years, which is bad since we nominally allow
intervals many times larger than that.  That results in wrong comparison
results, and possibly in corrupted btree indexes for columns containing
such large interval values.

To fix, compute the magnitude as int128 instead.  Although some compilers
have native support for int128 calculations, many don't, so create our
own support functions that can do 128-bit addition and multiplication
if the compiler support isn't there.  These support functions are designed
with an eye to allowing the int128 code paths in numeric.c to be rewritten
for use on all platforms, although this patch doesn't do that, or even
provide all the int128 primitives that will be needed for it.

Back-patch as far as 9.4.  Earlier releases did not guard against overflow
of interval values at all (commit 146604ec4 fixed that), so it seems not
very exciting to worry about overly-large intervals for them.

Before 9.6, we did not assume that unreferenced "static inline" functions
would not draw compiler warnings, so omit functions not directly referenced
by timestamp.c, the only present consumer of int128.h.  (We could have
omitted these functions in HEAD too, but since they were written and
debugged on the way to the present patch, and they look likely to be needed
by numeric.c, let's keep them in HEAD.)  I did not bother to try to prevent
such warnings in a --disable-integer-datetimes build, though.

Before 9.5, configure will never define HAVE_INT128, so the part of
int128.h that exploits a native int128 implementation is dead code in the
9.4 branch.  I didn't bother to remove it, thinking that keeping the file
looking similar in different branches is more useful.

In HEAD only, add a simple test harness for int128.h in src/tools/.

In back branches, this does not change the float-timestamps code path.
That's not subject to the same kind of overflow risk, since it computes
the interval magnitude as float8.  (No doubt, when this code was originally
written, overflow was disregarded for exactly that reason.)  There is a
precision hazard instead :-(, but we'll avert our eyes from that question,
since no complaints have been reported and that code's deprecated anyway.

Kyotaro Horiguchi and Tom Lane

Discussion: https://postgr.es/m/1490104629.422698.918452336.26FA96B7@webmail.messagingengine.com

src/backend/utils/adt/timestamp.c
src/include/common/int128.h [new file with mode: 0644]
src/test/regress/expected/interval.out
src/test/regress/sql/interval.sql

index 5818f072321e4138677d3f0186567fdc2ffc4883..e847d0e452b506dcae2ef665c2898b769b9734bc 100644 (file)
@@ -24,6 +24,7 @@
 #include "access/hash.h"
 #include "access/xact.h"
 #include "catalog/pg_type.h"
+#include "common/int128.h"
 #include "funcapi.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
@@ -2562,19 +2563,47 @@ timestamptz_cmp_timestamp(PG_FUNCTION_ARGS)
 /*
  *             interval_relop  - is interval1 relop interval2
  *
- *             collate invalid interval at the end
+ * Interval comparison is based on converting interval values to a linear
+ * representation expressed in the units of the time field (microseconds,
+ * in the case of integer timestamps) with days assumed to be always 24 hours
+ * and months assumed to be always 30 days.  To avoid overflow, we need a
+ * wider-than-int64 datatype for the linear representation, so use INT128
+ * with integer timestamps.
+ *
+ * In the float8 case, our problems are not with overflow but with precision;
+ * but it's been like that since day one, so live with it.
  */
-static inline TimeOffset
+#ifdef HAVE_INT64_TIMESTAMP
+typedef INT128 IntervalOffset;
+#else
+typedef TimeOffset IntervalOffset;
+#endif
+
+static inline IntervalOffset
 interval_cmp_value(const Interval *interval)
 {
-       TimeOffset      span;
-
-       span = interval->time;
+       IntervalOffset span;
 
 #ifdef HAVE_INT64_TIMESTAMP
-       span += interval->month * INT64CONST(30) * USECS_PER_DAY;
-       span += interval->day * INT64CONST(24) * USECS_PER_HOUR;
+       int64           dayfraction;
+       int64           days;
+
+       /*
+        * Separate time field into days and dayfraction, then add the month and
+        * day fields to the days part.  We cannot overflow int64 days here.
+        */
+       dayfraction = interval->time % USECS_PER_DAY;
+       days = interval->time / USECS_PER_DAY;
+       days += interval->month * INT64CONST(30);
+       days += interval->day;
+
+       /* Widen dayfraction to 128 bits */
+       span = int64_to_int128(dayfraction);
+
+       /* Scale up days to microseconds, forming a 128-bit product */
+       int128_add_int64_mul_int64(&span, days, USECS_PER_DAY);
 #else
+       span = interval->time;
        span += interval->month * ((double) DAYS_PER_MONTH * SECS_PER_DAY);
        span += interval->day * ((double) HOURS_PER_DAY * SECS_PER_HOUR);
 #endif
@@ -2585,10 +2614,14 @@ interval_cmp_value(const Interval *interval)
 static int
 interval_cmp_internal(Interval *interval1, Interval *interval2)
 {
-       TimeOffset      span1 = interval_cmp_value(interval1);
-       TimeOffset      span2 = interval_cmp_value(interval2);
+       IntervalOffset span1 = interval_cmp_value(interval1);
+       IntervalOffset span2 = interval_cmp_value(interval2);
 
+#ifdef HAVE_INT64_TIMESTAMP
+       return int128_compare(span1, span2);
+#else
        return ((span1 < span2) ? -1 : (span1 > span2) ? 1 : 0);
+#endif
 }
 
 Datum
@@ -2665,10 +2698,20 @@ Datum
 interval_hash(PG_FUNCTION_ARGS)
 {
        Interval   *interval = PG_GETARG_INTERVAL_P(0);
-       TimeOffset      span = interval_cmp_value(interval);
+       IntervalOffset span = interval_cmp_value(interval);
 
 #ifdef HAVE_INT64_TIMESTAMP
-       return DirectFunctionCall1(hashint8, Int64GetDatumFast(span));
+       int64           span64;
+
+       /*
+        * Use only the least significant 64 bits for hashing.  The upper 64 bits
+        * seldom add any useful information, and besides we must do it like this
+        * for compatibility with hashes calculated before use of INT128 was
+        * introduced.
+        */
+       span64 = int128_to_int64(span);
+
+       return DirectFunctionCall1(hashint8, Int64GetDatumFast(span64));
 #else
        return DirectFunctionCall1(hashfloat8, Float8GetDatumFast(span));
 #endif
diff --git a/src/include/common/int128.h b/src/include/common/int128.h
new file mode 100644 (file)
index 0000000..4c46e26
--- /dev/null
@@ -0,0 +1,276 @@
+/*-------------------------------------------------------------------------
+ *
+ * int128.h
+ *       Roll-our-own 128-bit integer arithmetic.
+ *
+ * We make use of the native int128 type if there is one, otherwise
+ * implement things the hard way based on two int64 halves.
+ *
+ * See src/tools/testint128.c for a simple test harness for this file.
+ *
+ * Copyright (c) 2017, PostgreSQL Global Development Group
+ *
+ * src/include/common/int128.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef INT128_H
+#define INT128_H
+
+/*
+ * For testing purposes, use of native int128 can be switched on/off by
+ * predefining USE_NATIVE_INT128.
+ */
+#ifndef USE_NATIVE_INT128
+#ifdef HAVE_INT128
+#define USE_NATIVE_INT128 1
+#else
+#define USE_NATIVE_INT128 0
+#endif
+#endif
+
+
+#if USE_NATIVE_INT128
+
+typedef int128 INT128;
+
+/*
+ * Add an unsigned int64 value into an INT128 variable.
+ */
+static inline void
+int128_add_uint64(INT128 *i128, uint64 v)
+{
+       *i128 += v;
+}
+
+/*
+ * Add a signed int64 value into an INT128 variable.
+ */
+static inline void
+int128_add_int64(INT128 *i128, int64 v)
+{
+       *i128 += v;
+}
+
+/*
+ * Add the 128-bit product of two int64 values into an INT128 variable.
+ *
+ * XXX with a stupid compiler, this could actually be less efficient than
+ * the other implementation; maybe we should do it by hand always?
+ */
+static inline void
+int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y)
+{
+       *i128 += (int128) x *(int128) y;
+}
+
+/*
+ * Compare two INT128 values, return -1, 0, or +1.
+ */
+static inline int
+int128_compare(INT128 x, INT128 y)
+{
+       if (x < y)
+               return -1;
+       if (x > y)
+               return 1;
+       return 0;
+}
+
+/*
+ * Widen int64 to INT128.
+ */
+static inline INT128
+int64_to_int128(int64 v)
+{
+       return (INT128) v;
+}
+
+/*
+ * Convert INT128 to int64 (losing any high-order bits).
+ * This also works fine for casting down to uint64.
+ */
+static inline int64
+int128_to_int64(INT128 val)
+{
+       return (int64) val;
+}
+
+#else                                                  /* !USE_NATIVE_INT128 */
+
+/*
+ * We lay out the INT128 structure with the same content and byte ordering
+ * that a native int128 type would (probably) have.  This makes no difference
+ * for ordinary use of INT128, but allows union'ing INT128 with int128 for
+ * testing purposes.
+ */
+typedef struct
+{
+#ifdef WORDS_BIGENDIAN
+       int64           hi;                             /* most significant 64 bits, including sign */
+       uint64          lo;                             /* least significant 64 bits, without sign */
+#else
+       uint64          lo;                             /* least significant 64 bits, without sign */
+       int64           hi;                             /* most significant 64 bits, including sign */
+#endif
+} INT128;
+
+/*
+ * Add an unsigned int64 value into an INT128 variable.
+ */
+static inline void
+int128_add_uint64(INT128 *i128, uint64 v)
+{
+       /*
+        * First add the value to the .lo part, then check to see if a carry needs
+        * to be propagated into the .hi part.  A carry is needed if both inputs
+        * have high bits set, or if just one input has high bit set while the new
+        * .lo part doesn't.  Remember that .lo part is unsigned; we cast to
+        * signed here just as a cheap way to check the high bit.
+        */
+       uint64          oldlo = i128->lo;
+
+       i128->lo += v;
+       if (((int64) v < 0 && (int64) oldlo < 0) ||
+               (((int64) v < 0 || (int64) oldlo < 0) && (int64) i128->lo >= 0))
+               i128->hi++;
+}
+
+/*
+ * Add a signed int64 value into an INT128 variable.
+ */
+static inline void
+int128_add_int64(INT128 *i128, int64 v)
+{
+       /*
+        * This is much like the above except that the carry logic differs for
+        * negative v.  Ordinarily we'd need to subtract 1 from the .hi part
+        * (corresponding to adding the sign-extended bits of v to it); but if
+        * there is a carry out of the .lo part, that cancels and we do nothing.
+        */
+       uint64          oldlo = i128->lo;
+
+       i128->lo += v;
+       if (v >= 0)
+       {
+               if ((int64) oldlo < 0 && (int64) i128->lo >= 0)
+                       i128->hi++;
+       }
+       else
+       {
+               if (!((int64) oldlo < 0 || (int64) i128->lo >= 0))
+                       i128->hi--;
+       }
+}
+
+/*
+ * INT64_AU32 extracts the most significant 32 bits of int64 as int64, while
+ * INT64_AL32 extracts the least significant 32 bits as uint64.
+ */
+#define INT64_AU32(i64) ((i64) >> 32)
+#define INT64_AL32(i64) ((i64) & UINT64CONST(0xFFFFFFFF))
+
+/*
+ * Add the 128-bit product of two int64 values into an INT128 variable.
+ */
+static inline void
+int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y)
+{
+       /* INT64_AU32 must use arithmetic right shift */
+       StaticAssertStmt(((int64) -1 >> 1) == (int64) -1,
+                                        "arithmetic right shift is needed");
+
+       /*----------
+        * Form the 128-bit product x * y using 64-bit arithmetic.
+        * Considering each 64-bit input as having 32-bit high and low parts,
+        * we can compute
+        *
+        *       x * y = ((x.hi << 32) + x.lo) * (((y.hi << 32) + y.lo)
+        *                 = (x.hi * y.hi) << 64 +
+        *                       (x.hi * y.lo) << 32 +
+        *                       (x.lo * y.hi) << 32 +
+        *                       x.lo * y.lo
+        *
+        * Each individual product is of 32-bit terms so it won't overflow when
+        * computed in 64-bit arithmetic.  Then we just have to shift it to the
+        * correct position while adding into the 128-bit result.  We must also
+        * keep in mind that the "lo" parts must be treated as unsigned.
+        *----------
+        */
+
+       /* No need to work hard if product must be zero */
+       if (x != 0 && y != 0)
+       {
+               int64           x_u32 = INT64_AU32(x);
+               uint64          x_l32 = INT64_AL32(x);
+               int64           y_u32 = INT64_AU32(y);
+               uint64          y_l32 = INT64_AL32(y);
+               int64           tmp;
+
+               /* the first term */
+               i128->hi += x_u32 * y_u32;
+
+               /* the second term: sign-extend it only if x is negative */
+               tmp = x_u32 * y_l32;
+               if (x < 0)
+                       i128->hi += INT64_AU32(tmp);
+               else
+                       i128->hi += ((uint64) tmp) >> 32;
+               int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32);
+
+               /* the third term: sign-extend it only if y is negative */
+               tmp = x_l32 * y_u32;
+               if (y < 0)
+                       i128->hi += INT64_AU32(tmp);
+               else
+                       i128->hi += ((uint64) tmp) >> 32;
+               int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32);
+
+               /* the fourth term: always unsigned */
+               int128_add_uint64(i128, x_l32 * y_l32);
+       }
+}
+
+/*
+ * Compare two INT128 values, return -1, 0, or +1.
+ */
+static inline int
+int128_compare(INT128 x, INT128 y)
+{
+       if (x.hi < y.hi)
+               return -1;
+       if (x.hi > y.hi)
+               return 1;
+       if (x.lo < y.lo)
+               return -1;
+       if (x.lo > y.lo)
+               return 1;
+       return 0;
+}
+
+/*
+ * Widen int64 to INT128.
+ */
+static inline INT128
+int64_to_int128(int64 v)
+{
+       INT128          val;
+
+       val.lo = (uint64) v;
+       val.hi = (v < 0) ? -INT64CONST(1) : INT64CONST(0);
+       return val;
+}
+
+/*
+ * Convert INT128 to int64 (losing any high-order bits).
+ * This also works fine for casting down to uint64.
+ */
+static inline int64
+int128_to_int64(INT128 val)
+{
+       return (int64) val.lo;
+}
+
+#endif   /* USE_NATIVE_INT128 */
+
+#endif   /* INT128_H */
index 946c97ad9249b0d8b7051d7e3da4d8f8a076e0a7..f88f34550ad56b44d37785a772e329a87c0eedb9 100644 (file)
@@ -207,6 +207,70 @@ SELECT '' AS fortyfive, r1.*, r2.*
            | 34 years        | 6 years
 (45 rows)
 
+-- Test intervals that are large enough to overflow 64 bits in comparisons
+CREATE TEMP TABLE INTERVAL_TBL_OF (f1 interval);
+INSERT INTO INTERVAL_TBL_OF (f1) VALUES
+  ('2147483647 days 2147483647 months'),
+  ('2147483647 days -2147483648 months'),
+  ('1 year'),
+  ('-2147483648 days 2147483647 months'),
+  ('-2147483648 days -2147483648 months');
+-- these should fail as out-of-range
+INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483648 days');
+ERROR:  interval field value out of range: "2147483648 days"
+LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483648 days');
+                                                 ^
+INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483649 days');
+ERROR:  interval field value out of range: "-2147483649 days"
+LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483649 days')...
+                                                 ^
+INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483647 years');
+ERROR:  interval out of range
+LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483647 years')...
+                                                 ^
+INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483648 years');
+ERROR:  interval out of range
+LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483648 years'...
+                                                 ^
+SELECT r1.*, r2.*
+   FROM INTERVAL_TBL_OF r1, INTERVAL_TBL_OF r2
+   WHERE r1.f1 > r2.f1
+   ORDER BY r1.f1, r2.f1;
+                    f1                     |                    f1                     
+-------------------------------------------+-------------------------------------------
+ -178956970 years -8 mons +2147483647 days | -178956970 years -8 mons -2147483648 days
+ 1 year                                    | -178956970 years -8 mons -2147483648 days
+ 1 year                                    | -178956970 years -8 mons +2147483647 days
+ 178956970 years 7 mons -2147483648 days   | -178956970 years -8 mons -2147483648 days
+ 178956970 years 7 mons -2147483648 days   | -178956970 years -8 mons +2147483647 days
+ 178956970 years 7 mons -2147483648 days   | 1 year
+ 178956970 years 7 mons 2147483647 days    | -178956970 years -8 mons -2147483648 days
+ 178956970 years 7 mons 2147483647 days    | -178956970 years -8 mons +2147483647 days
+ 178956970 years 7 mons 2147483647 days    | 1 year
+ 178956970 years 7 mons 2147483647 days    | 178956970 years 7 mons -2147483648 days
+(10 rows)
+
+CREATE INDEX ON INTERVAL_TBL_OF USING btree (f1);
+SET enable_seqscan TO false;
+EXPLAIN (COSTS OFF)
+SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1;
+                             QUERY PLAN                             
+--------------------------------------------------------------------
+ Index Only Scan using interval_tbl_of_f1_idx on interval_tbl_of r1
+(1 row)
+
+SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1;
+                    f1                     
+-------------------------------------------
+ -178956970 years -8 mons -2147483648 days
+ -178956970 years -8 mons +2147483647 days
+ 1 year
+ 178956970 years 7 mons -2147483648 days
+ 178956970 years 7 mons 2147483647 days
+(5 rows)
+
+RESET enable_seqscan;
+DROP TABLE INTERVAL_TBL_OF;
 -- Test multiplication and division with intervals.
 -- Floating point arithmetic rounding errors can lead to unexpected results,
 -- though the code attempts to do the right thing and round up to days and
index cff9adab3213cafaaa7056fcd6380a137028b15d..bc5537d1b9c008dc9b63e732b6b31b338880f8af 100644 (file)
@@ -59,6 +59,33 @@ SELECT '' AS fortyfive, r1.*, r2.*
    WHERE r1.f1 > r2.f1
    ORDER BY r1.f1, r2.f1;
 
+-- Test intervals that are large enough to overflow 64 bits in comparisons
+CREATE TEMP TABLE INTERVAL_TBL_OF (f1 interval);
+INSERT INTO INTERVAL_TBL_OF (f1) VALUES
+  ('2147483647 days 2147483647 months'),
+  ('2147483647 days -2147483648 months'),
+  ('1 year'),
+  ('-2147483648 days 2147483647 months'),
+  ('-2147483648 days -2147483648 months');
+-- these should fail as out-of-range
+INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483648 days');
+INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483649 days');
+INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483647 years');
+INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483648 years');
+
+SELECT r1.*, r2.*
+   FROM INTERVAL_TBL_OF r1, INTERVAL_TBL_OF r2
+   WHERE r1.f1 > r2.f1
+   ORDER BY r1.f1, r2.f1;
+
+CREATE INDEX ON INTERVAL_TBL_OF USING btree (f1);
+SET enable_seqscan TO false;
+EXPLAIN (COSTS OFF)
+SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1;
+SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1;
+RESET enable_seqscan;
+
+DROP TABLE INTERVAL_TBL_OF;
 
 -- Test multiplication and division with intervals.
 -- Floating point arithmetic rounding errors can lead to unexpected results,