From df1a699e5ba3232f373790b2c9485ddf720c4a70 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Wed, 5 Apr 2017 23:51:27 -0400 Subject: [PATCH] Fix integer-overflow problems in interval comparison. When using integer timestamps, the interval-comparison functions tried to compute the overall magnitude of an interval as an int64 number of microseconds. As reported by Frazer McLean, this overflows for intervals exceeding about 296000 years, which is bad since we nominally allow intervals many times larger than that. That results in wrong comparison results, and possibly in corrupted btree indexes for columns containing such large interval values. To fix, compute the magnitude as int128 instead. Although some compilers have native support for int128 calculations, many don't, so create our own support functions that can do 128-bit addition and multiplication if the compiler support isn't there. These support functions are designed with an eye to allowing the int128 code paths in numeric.c to be rewritten for use on all platforms, although this patch doesn't do that, or even provide all the int128 primitives that will be needed for it. Back-patch as far as 9.4. Earlier releases did not guard against overflow of interval values at all (commit 146604ec4 fixed that), so it seems not very exciting to worry about overly-large intervals for them. Before 9.6, we did not assume that unreferenced "static inline" functions would not draw compiler warnings, so omit functions not directly referenced by timestamp.c, the only present consumer of int128.h. (We could have omitted these functions in HEAD too, but since they were written and debugged on the way to the present patch, and they look likely to be needed by numeric.c, let's keep them in HEAD.) I did not bother to try to prevent such warnings in a --disable-integer-datetimes build, though. Before 9.5, configure will never define HAVE_INT128, so the part of int128.h that exploits a native int128 implementation is dead code in the 9.4 branch. I didn't bother to remove it, thinking that keeping the file looking similar in different branches is more useful. In HEAD only, add a simple test harness for int128.h in src/tools/. In back branches, this does not change the float-timestamps code path. That's not subject to the same kind of overflow risk, since it computes the interval magnitude as float8. (No doubt, when this code was originally written, overflow was disregarded for exactly that reason.) There is a precision hazard instead :-(, but we'll avert our eyes from that question, since no complaints have been reported and that code's deprecated anyway. Kyotaro Horiguchi and Tom Lane Discussion: https://postgr.es/m/1490104629.422698.918452336.26FA96B7@webmail.messagingengine.com --- src/backend/utils/adt/timestamp.c | 50 ++++- src/include/common/int128.h | 276 +++++++++++++++++++++++++ src/test/regress/expected/interval.out | 64 ++++++ src/test/regress/sql/interval.sql | 27 +++ src/tools/testint128.c | 183 ++++++++++++++++ 5 files changed, 590 insertions(+), 10 deletions(-) create mode 100644 src/include/common/int128.h create mode 100644 src/tools/testint128.c diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index 4be1999119..3f6e0d4497 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -24,6 +24,7 @@ #include "access/hash.h" #include "access/xact.h" #include "catalog/pg_type.h" +#include "common/int128.h" #include "funcapi.h" #include "libpq/pqformat.h" #include "miscadmin.h" @@ -2288,15 +2289,35 @@ timestamptz_cmp_timestamp(PG_FUNCTION_ARGS) /* * interval_relop - is interval1 relop interval2 + * + * Interval comparison is based on converting interval values to a linear + * representation expressed in the units of the time field (microseconds, + * in the case of integer timestamps) with days assumed to be always 24 hours + * and months assumed to be always 30 days. To avoid overflow, we need a + * wider-than-int64 datatype for the linear representation, so use INT128. */ -static inline TimeOffset + +static inline INT128 interval_cmp_value(const Interval *interval) { - TimeOffset span; + INT128 span; + int64 dayfraction; + int64 days; + + /* + * Separate time field into days and dayfraction, then add the month and + * day fields to the days part. We cannot overflow int64 days here. + */ + dayfraction = interval->time % USECS_PER_DAY; + days = interval->time / USECS_PER_DAY; + days += interval->month * INT64CONST(30); + days += interval->day; - span = interval->time; - span += interval->month * INT64CONST(30) * USECS_PER_DAY; - span += interval->day * INT64CONST(24) * USECS_PER_HOUR; + /* Widen dayfraction to 128 bits */ + span = int64_to_int128(dayfraction); + + /* Scale up days to microseconds, forming a 128-bit product */ + int128_add_int64_mul_int64(&span, days, USECS_PER_DAY); return span; } @@ -2304,10 +2325,10 @@ interval_cmp_value(const Interval *interval) static int interval_cmp_internal(Interval *interval1, Interval *interval2) { - TimeOffset span1 = interval_cmp_value(interval1); - TimeOffset span2 = interval_cmp_value(interval2); + INT128 span1 = interval_cmp_value(interval1); + INT128 span2 = interval_cmp_value(interval2); - return ((span1 < span2) ? -1 : (span1 > span2) ? 1 : 0); + return int128_compare(span1, span2); } Datum @@ -2384,9 +2405,18 @@ Datum interval_hash(PG_FUNCTION_ARGS) { Interval *interval = PG_GETARG_INTERVAL_P(0); - TimeOffset span = interval_cmp_value(interval); + INT128 span = interval_cmp_value(interval); + int64 span64; + + /* + * Use only the least significant 64 bits for hashing. The upper 64 bits + * seldom add any useful information, and besides we must do it like this + * for compatibility with hashes calculated before use of INT128 was + * introduced. + */ + span64 = int128_to_int64(span); - return DirectFunctionCall1(hashint8, Int64GetDatumFast(span)); + return DirectFunctionCall1(hashint8, Int64GetDatumFast(span64)); } /* overlaps_timestamp() --- implements the SQL OVERLAPS operator. diff --git a/src/include/common/int128.h b/src/include/common/int128.h new file mode 100644 index 0000000000..4c46e26f40 --- /dev/null +++ b/src/include/common/int128.h @@ -0,0 +1,276 @@ +/*------------------------------------------------------------------------- + * + * int128.h + * Roll-our-own 128-bit integer arithmetic. + * + * We make use of the native int128 type if there is one, otherwise + * implement things the hard way based on two int64 halves. + * + * See src/tools/testint128.c for a simple test harness for this file. + * + * Copyright (c) 2017, PostgreSQL Global Development Group + * + * src/include/common/int128.h + * + *------------------------------------------------------------------------- + */ +#ifndef INT128_H +#define INT128_H + +/* + * For testing purposes, use of native int128 can be switched on/off by + * predefining USE_NATIVE_INT128. + */ +#ifndef USE_NATIVE_INT128 +#ifdef HAVE_INT128 +#define USE_NATIVE_INT128 1 +#else +#define USE_NATIVE_INT128 0 +#endif +#endif + + +#if USE_NATIVE_INT128 + +typedef int128 INT128; + +/* + * Add an unsigned int64 value into an INT128 variable. + */ +static inline void +int128_add_uint64(INT128 *i128, uint64 v) +{ + *i128 += v; +} + +/* + * Add a signed int64 value into an INT128 variable. + */ +static inline void +int128_add_int64(INT128 *i128, int64 v) +{ + *i128 += v; +} + +/* + * Add the 128-bit product of two int64 values into an INT128 variable. + * + * XXX with a stupid compiler, this could actually be less efficient than + * the other implementation; maybe we should do it by hand always? + */ +static inline void +int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y) +{ + *i128 += (int128) x *(int128) y; +} + +/* + * Compare two INT128 values, return -1, 0, or +1. + */ +static inline int +int128_compare(INT128 x, INT128 y) +{ + if (x < y) + return -1; + if (x > y) + return 1; + return 0; +} + +/* + * Widen int64 to INT128. + */ +static inline INT128 +int64_to_int128(int64 v) +{ + return (INT128) v; +} + +/* + * Convert INT128 to int64 (losing any high-order bits). + * This also works fine for casting down to uint64. + */ +static inline int64 +int128_to_int64(INT128 val) +{ + return (int64) val; +} + +#else /* !USE_NATIVE_INT128 */ + +/* + * We lay out the INT128 structure with the same content and byte ordering + * that a native int128 type would (probably) have. This makes no difference + * for ordinary use of INT128, but allows union'ing INT128 with int128 for + * testing purposes. + */ +typedef struct +{ +#ifdef WORDS_BIGENDIAN + int64 hi; /* most significant 64 bits, including sign */ + uint64 lo; /* least significant 64 bits, without sign */ +#else + uint64 lo; /* least significant 64 bits, without sign */ + int64 hi; /* most significant 64 bits, including sign */ +#endif +} INT128; + +/* + * Add an unsigned int64 value into an INT128 variable. + */ +static inline void +int128_add_uint64(INT128 *i128, uint64 v) +{ + /* + * First add the value to the .lo part, then check to see if a carry needs + * to be propagated into the .hi part. A carry is needed if both inputs + * have high bits set, or if just one input has high bit set while the new + * .lo part doesn't. Remember that .lo part is unsigned; we cast to + * signed here just as a cheap way to check the high bit. + */ + uint64 oldlo = i128->lo; + + i128->lo += v; + if (((int64) v < 0 && (int64) oldlo < 0) || + (((int64) v < 0 || (int64) oldlo < 0) && (int64) i128->lo >= 0)) + i128->hi++; +} + +/* + * Add a signed int64 value into an INT128 variable. + */ +static inline void +int128_add_int64(INT128 *i128, int64 v) +{ + /* + * This is much like the above except that the carry logic differs for + * negative v. Ordinarily we'd need to subtract 1 from the .hi part + * (corresponding to adding the sign-extended bits of v to it); but if + * there is a carry out of the .lo part, that cancels and we do nothing. + */ + uint64 oldlo = i128->lo; + + i128->lo += v; + if (v >= 0) + { + if ((int64) oldlo < 0 && (int64) i128->lo >= 0) + i128->hi++; + } + else + { + if (!((int64) oldlo < 0 || (int64) i128->lo >= 0)) + i128->hi--; + } +} + +/* + * INT64_AU32 extracts the most significant 32 bits of int64 as int64, while + * INT64_AL32 extracts the least significant 32 bits as uint64. + */ +#define INT64_AU32(i64) ((i64) >> 32) +#define INT64_AL32(i64) ((i64) & UINT64CONST(0xFFFFFFFF)) + +/* + * Add the 128-bit product of two int64 values into an INT128 variable. + */ +static inline void +int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y) +{ + /* INT64_AU32 must use arithmetic right shift */ + StaticAssertStmt(((int64) -1 >> 1) == (int64) -1, + "arithmetic right shift is needed"); + + /*---------- + * Form the 128-bit product x * y using 64-bit arithmetic. + * Considering each 64-bit input as having 32-bit high and low parts, + * we can compute + * + * x * y = ((x.hi << 32) + x.lo) * (((y.hi << 32) + y.lo) + * = (x.hi * y.hi) << 64 + + * (x.hi * y.lo) << 32 + + * (x.lo * y.hi) << 32 + + * x.lo * y.lo + * + * Each individual product is of 32-bit terms so it won't overflow when + * computed in 64-bit arithmetic. Then we just have to shift it to the + * correct position while adding into the 128-bit result. We must also + * keep in mind that the "lo" parts must be treated as unsigned. + *---------- + */ + + /* No need to work hard if product must be zero */ + if (x != 0 && y != 0) + { + int64 x_u32 = INT64_AU32(x); + uint64 x_l32 = INT64_AL32(x); + int64 y_u32 = INT64_AU32(y); + uint64 y_l32 = INT64_AL32(y); + int64 tmp; + + /* the first term */ + i128->hi += x_u32 * y_u32; + + /* the second term: sign-extend it only if x is negative */ + tmp = x_u32 * y_l32; + if (x < 0) + i128->hi += INT64_AU32(tmp); + else + i128->hi += ((uint64) tmp) >> 32; + int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32); + + /* the third term: sign-extend it only if y is negative */ + tmp = x_l32 * y_u32; + if (y < 0) + i128->hi += INT64_AU32(tmp); + else + i128->hi += ((uint64) tmp) >> 32; + int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32); + + /* the fourth term: always unsigned */ + int128_add_uint64(i128, x_l32 * y_l32); + } +} + +/* + * Compare two INT128 values, return -1, 0, or +1. + */ +static inline int +int128_compare(INT128 x, INT128 y) +{ + if (x.hi < y.hi) + return -1; + if (x.hi > y.hi) + return 1; + if (x.lo < y.lo) + return -1; + if (x.lo > y.lo) + return 1; + return 0; +} + +/* + * Widen int64 to INT128. + */ +static inline INT128 +int64_to_int128(int64 v) +{ + INT128 val; + + val.lo = (uint64) v; + val.hi = (v < 0) ? -INT64CONST(1) : INT64CONST(0); + return val; +} + +/* + * Convert INT128 to int64 (losing any high-order bits). + * This also works fine for casting down to uint64. + */ +static inline int64 +int128_to_int64(INT128 val) +{ + return (int64) val.lo; +} + +#endif /* USE_NATIVE_INT128 */ + +#endif /* INT128_H */ diff --git a/src/test/regress/expected/interval.out b/src/test/regress/expected/interval.out index 946c97ad92..f88f34550a 100644 --- a/src/test/regress/expected/interval.out +++ b/src/test/regress/expected/interval.out @@ -207,6 +207,70 @@ SELECT '' AS fortyfive, r1.*, r2.* | 34 years | 6 years (45 rows) +-- Test intervals that are large enough to overflow 64 bits in comparisons +CREATE TEMP TABLE INTERVAL_TBL_OF (f1 interval); +INSERT INTO INTERVAL_TBL_OF (f1) VALUES + ('2147483647 days 2147483647 months'), + ('2147483647 days -2147483648 months'), + ('1 year'), + ('-2147483648 days 2147483647 months'), + ('-2147483648 days -2147483648 months'); +-- these should fail as out-of-range +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483648 days'); +ERROR: interval field value out of range: "2147483648 days" +LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483648 days'); + ^ +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483649 days'); +ERROR: interval field value out of range: "-2147483649 days" +LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483649 days')... + ^ +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483647 years'); +ERROR: interval out of range +LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483647 years')... + ^ +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483648 years'); +ERROR: interval out of range +LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483648 years'... + ^ +SELECT r1.*, r2.* + FROM INTERVAL_TBL_OF r1, INTERVAL_TBL_OF r2 + WHERE r1.f1 > r2.f1 + ORDER BY r1.f1, r2.f1; + f1 | f1 +-------------------------------------------+------------------------------------------- + -178956970 years -8 mons +2147483647 days | -178956970 years -8 mons -2147483648 days + 1 year | -178956970 years -8 mons -2147483648 days + 1 year | -178956970 years -8 mons +2147483647 days + 178956970 years 7 mons -2147483648 days | -178956970 years -8 mons -2147483648 days + 178956970 years 7 mons -2147483648 days | -178956970 years -8 mons +2147483647 days + 178956970 years 7 mons -2147483648 days | 1 year + 178956970 years 7 mons 2147483647 days | -178956970 years -8 mons -2147483648 days + 178956970 years 7 mons 2147483647 days | -178956970 years -8 mons +2147483647 days + 178956970 years 7 mons 2147483647 days | 1 year + 178956970 years 7 mons 2147483647 days | 178956970 years 7 mons -2147483648 days +(10 rows) + +CREATE INDEX ON INTERVAL_TBL_OF USING btree (f1); +SET enable_seqscan TO false; +EXPLAIN (COSTS OFF) +SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1; + QUERY PLAN +-------------------------------------------------------------------- + Index Only Scan using interval_tbl_of_f1_idx on interval_tbl_of r1 +(1 row) + +SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1; + f1 +------------------------------------------- + -178956970 years -8 mons -2147483648 days + -178956970 years -8 mons +2147483647 days + 1 year + 178956970 years 7 mons -2147483648 days + 178956970 years 7 mons 2147483647 days +(5 rows) + +RESET enable_seqscan; +DROP TABLE INTERVAL_TBL_OF; -- Test multiplication and division with intervals. -- Floating point arithmetic rounding errors can lead to unexpected results, -- though the code attempts to do the right thing and round up to days and diff --git a/src/test/regress/sql/interval.sql b/src/test/regress/sql/interval.sql index cff9adab32..bc5537d1b9 100644 --- a/src/test/regress/sql/interval.sql +++ b/src/test/regress/sql/interval.sql @@ -59,6 +59,33 @@ SELECT '' AS fortyfive, r1.*, r2.* WHERE r1.f1 > r2.f1 ORDER BY r1.f1, r2.f1; +-- Test intervals that are large enough to overflow 64 bits in comparisons +CREATE TEMP TABLE INTERVAL_TBL_OF (f1 interval); +INSERT INTO INTERVAL_TBL_OF (f1) VALUES + ('2147483647 days 2147483647 months'), + ('2147483647 days -2147483648 months'), + ('1 year'), + ('-2147483648 days 2147483647 months'), + ('-2147483648 days -2147483648 months'); +-- these should fail as out-of-range +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483648 days'); +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483649 days'); +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483647 years'); +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483648 years'); + +SELECT r1.*, r2.* + FROM INTERVAL_TBL_OF r1, INTERVAL_TBL_OF r2 + WHERE r1.f1 > r2.f1 + ORDER BY r1.f1, r2.f1; + +CREATE INDEX ON INTERVAL_TBL_OF USING btree (f1); +SET enable_seqscan TO false; +EXPLAIN (COSTS OFF) +SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1; +SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1; +RESET enable_seqscan; + +DROP TABLE INTERVAL_TBL_OF; -- Test multiplication and division with intervals. -- Floating point arithmetic rounding errors can lead to unexpected results, diff --git a/src/tools/testint128.c b/src/tools/testint128.c new file mode 100644 index 0000000000..f57806367c --- /dev/null +++ b/src/tools/testint128.c @@ -0,0 +1,183 @@ +/*------------------------------------------------------------------------- + * + * testint128.c + * Testbed for roll-our-own 128-bit integer arithmetic. + * + * This is a standalone test program that compares the behavior of an + * implementation in int128.h to an (assumed correct) int128 native type. + * + * Copyright (c) 2017, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/tools/testint128.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +/* + * By default, we test the non-native implementation in int128.h; but + * by predefining USE_NATIVE_INT128 to 1, you can test the native + * implementation, just to be sure. + */ +#ifndef USE_NATIVE_INT128 +#define USE_NATIVE_INT128 0 +#endif + +#include "common/int128.h" + +/* + * We assume the parts of this union are laid out compatibly. + */ +typedef union +{ + int128 i128; + INT128 I128; + union + { +#ifdef WORDS_BIGENDIAN + int64 hi; + uint64 lo; +#else + uint64 lo; + int64 hi; +#endif + } hl; +} test128; + + +/* + * Control version of comparator. + */ +static inline int +my_int128_compare(int128 x, int128 y) +{ + if (x < y) + return -1; + if (x > y) + return 1; + return 0; +} + +/* + * Get a random uint64 value. + * We don't assume random() is good for more than 16 bits. + */ +static uint64 +get_random_uint64(void) +{ + uint64 x; + + x = (uint64) (random() & 0xFFFF) << 48; + x |= (uint64) (random() & 0xFFFF) << 32; + x |= (uint64) (random() & 0xFFFF) << 16; + x |= (uint64) (random() & 0xFFFF); + return x; +} + +/* + * Main program. + * + * Generates a lot of random numbers and tests the implementation for each. + * The results should be reproducible, since we don't call srandom(). + * + * You can give a loop count if you don't like the default 1B iterations. + */ +int +main(int argc, char **argv) +{ + long count; + + if (argc >= 2) + count = strtol(argv[1], NULL, 0); + else + count = 1000000000; + + while (count-- > 0) + { + int64 x = get_random_uint64(); + int64 y = get_random_uint64(); + int64 z = get_random_uint64(); + test128 t1; + test128 t2; + + /* check unsigned addition */ + t1.hl.hi = x; + t1.hl.lo = y; + t2 = t1; + t1.i128 += (int128) (uint64) z; + int128_add_uint64(&t2.I128, (uint64) z); + + if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo) + { + printf("%016lX%016lX + unsigned %lX\n", x, y, z); + printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); + printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); + return 1; + } + + /* check signed addition */ + t1.hl.hi = x; + t1.hl.lo = y; + t2 = t1; + t1.i128 += (int128) z; + int128_add_int64(&t2.I128, z); + + if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo) + { + printf("%016lX%016lX + signed %lX\n", x, y, z); + printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); + printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); + return 1; + } + + /* check multiplication */ + t1.i128 = (int128) x *(int128) y; + + t2.hl.hi = t2.hl.lo = 0; + int128_add_int64_mul_int64(&t2.I128, x, y); + + if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo) + { + printf("%lX * %lX\n", x, y); + printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); + printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); + return 1; + } + + /* check comparison */ + t1.hl.hi = x; + t1.hl.lo = y; + t2.hl.hi = z; + t2.hl.lo = get_random_uint64(); + + if (my_int128_compare(t1.i128, t2.i128) != + int128_compare(t1.I128, t2.I128)) + { + printf("comparison failure: %d vs %d\n", + my_int128_compare(t1.i128, t2.i128), + int128_compare(t1.I128, t2.I128)); + printf("arg1 = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); + printf("arg2 = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); + return 1; + } + + /* check case with identical hi parts; above will hardly ever hit it */ + t2.hl.hi = x; + + if (my_int128_compare(t1.i128, t2.i128) != + int128_compare(t1.I128, t2.I128)) + { + printf("comparison failure: %d vs %d\n", + my_int128_compare(t1.i128, t2.i128), + int128_compare(t1.I128, t2.I128)); + printf("arg1 = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); + printf("arg2 = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); + return 1; + } + } + + return 0; +} -- 2.40.0