-/* ----------
+/*-------------------------------------------------------------------------
+ *
* numeric.c
+ * An exact numeric data type for the Postgres database system
*
- * An exact numeric data type for the Postgres database system
+ * Original coding 1998, Jan Wieck. Heavily revised 2003, Tom Lane.
*
- * 1998 Jan Wieck
+ * Many of the algorithmic ideas are borrowed from David M. Smith's "FM"
+ * multiple-precision math library, most recently published as Algorithm
+ * 786: Multiple-Precision Complex Arithmetic and Functions, ACM
+ * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998,
+ * pages 359-367.
*
- * $Header: /cvsroot/pgsql/src/backend/utils/adt/numeric.c,v 1.41 2001/05/03 19:00:36 tgl Exp $
+ * Copyright (c) 1998-2010, PostgreSQL Global Development Group
*
- * ----------
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.121 2010/01/07 04:53:34 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <ctype.h>
#include <float.h>
+#include <limits.h>
#include <math.h>
-#include <errno.h>
-#include <sys/types.h>
+#include "access/hash.h"
+#include "catalog/pg_type.h"
+#include "libpq/pqformat.h"
+#include "miscadmin.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/int8.h"
/* ----------
- * Local definitions
+ * Local data types
+ *
+ * Numeric values are represented in a base-NBASE floating point format.
+ * Each "digit" ranges from 0 to NBASE-1. The type NumericDigit is signed
+ * and wide enough to store a digit. We assume that NBASE*NBASE can fit in
+ * an int. Although the purely calculational routines could handle any even
+ * NBASE that's less than sqrt(INT_MAX), in practice we are only interested
+ * in NBASE a power of ten, so that I/O conversions and decimal rounding
+ * are easy. Also, it's actually more efficient if NBASE is rather less than
+ * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var_fast to
+ * postpone processing carries.
* ----------
*/
-#ifndef MIN
-#define MIN(a,b) (((a)<(b)) ? (a) : (b))
+
+#if 0
+#define NBASE 10
+#define HALF_NBASE 5
+#define DEC_DIGITS 1 /* decimal digits per NBASE digit */
+#define MUL_GUARD_DIGITS 4 /* these are measured in NBASE digits */
+#define DIV_GUARD_DIGITS 8
+
+typedef signed char NumericDigit;
#endif
-#ifndef MAX
-#define MAX(a,b) (((a)>(b)) ? (a) : (b))
+
+#if 0
+#define NBASE 100
+#define HALF_NBASE 50
+#define DEC_DIGITS 2 /* decimal digits per NBASE digit */
+#define MUL_GUARD_DIGITS 3 /* these are measured in NBASE digits */
+#define DIV_GUARD_DIGITS 6
+
+typedef signed char NumericDigit;
#endif
-#ifndef NAN
-#define NAN (0.0/0.0)
+#if 1
+#define NBASE 10000
+#define HALF_NBASE 5000
+#define DEC_DIGITS 4 /* decimal digits per NBASE digit */
+#define MUL_GUARD_DIGITS 2 /* these are measured in NBASE digits */
+#define DIV_GUARD_DIGITS 4
+
+typedef int16 NumericDigit;
#endif
/* ----------
- * Local data types
- *
- * Note: the first digit of a NumericVar's value is assumed to be multiplied
- * by 10 ** weight. Another way to say it is that there are weight+1 digits
- * before the decimal point. It is possible to have weight < 0.
+ * NumericVar is the format we use for arithmetic. The digit-array part
+ * is the same as the NumericData storage format, but the header is more
+ * complex.
*
* The value represented by a NumericVar is determined by the sign, weight,
- * ndigits, and digits[] array. The rscale and dscale are carried along,
- * but they are just auxiliary information until rounding is done before
- * final storage or display. (Scales are the number of digits wanted
- * *after* the decimal point. Scales are always >= 0.)
+ * ndigits, and digits[] array.
+ * Note: the first digit of a NumericVar's value is assumed to be multiplied
+ * by NBASE ** weight. Another way to say it is that there are weight+1
+ * digits before the decimal point. It is possible to have weight < 0.
*
* buf points at the physical start of the palloc'd digit buffer for the
* NumericVar. digits points at the first digit in actual use (the one
- * with the specified weight). We normally leave an unused byte or two
+ * with the specified weight). We normally leave an unused digit or two
* (preset to zeroes) between buf and digits, so that there is room to store
- * a carry out of the top digit without special pushups. We just need to
+ * a carry out of the top digit without reallocating space. We just need to
* decrement digits (and increment weight) to make room for the carry digit.
+ * (There is no such extra space in a numeric value stored in the database,
+ * only in a NumericVar in memory.)
*
* If buf is NULL then the digit buffer isn't actually palloc'd and should
* not be freed --- see the constants below for an example.
*
+ * dscale, or display scale, is the nominal precision expressed as number
+ * of digits after the decimal point (it must always be >= 0 at present).
+ * dscale may be more than the number of physically stored fractional digits,
+ * implying that we have suppressed storage of significant trailing zeroes.
+ * It should never be less than the number of stored digits, since that would
+ * imply hiding digits that are present. NOTE that dscale is always expressed
+ * in *decimal* digits, and so it may correspond to a fractional number of
+ * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits.
+ *
+ * rscale, or result scale, is the target precision for a computation.
+ * Like dscale it is expressed as number of *decimal* digits after the decimal
+ * point, and is always >= 0 at present.
+ * Note that rscale is not stored in variables --- it's figured on-the-fly
+ * from the dscales of the inputs.
+ *
* NB: All the variable-level functions are written in a style that makes it
* possible to give one and the same variable as argument and destination.
* This is feasible because the digit buffer is separate from the variable.
* ----------
*/
-typedef unsigned char NumericDigit;
-
typedef struct NumericVar
{
- int ndigits; /* number of digits in digits[] - can be
- * 0! */
+ int ndigits; /* # of digits in digits[] - can be 0! */
int weight; /* weight of first digit */
- int rscale; /* result scale */
+ int sign; /* NUMERIC_POS, NUMERIC_NEG, or NUMERIC_NAN */
int dscale; /* display scale */
- int sign; /* NUMERIC_POS, NUMERIC_NEG, or
- * NUMERIC_NAN */
NumericDigit *buf; /* start of palloc'd space for digits[] */
- NumericDigit *digits; /* decimal digits */
+ NumericDigit *digits; /* base-NBASE digits */
} NumericVar;
/* ----------
- * Local data
- * ----------
- */
-static int global_rscale = NUMERIC_MIN_RESULT_SCALE;
-
-/* ----------
- * Some preinitialized variables we need often
+ * Some preinitialized constants
* ----------
*/
static NumericDigit const_zero_data[1] = {0};
static NumericVar const_zero =
-{0, 0, 0, 0, NUMERIC_POS, NULL, const_zero_data};
+{0, 0, NUMERIC_POS, 0, NULL, const_zero_data};
static NumericDigit const_one_data[1] = {1};
static NumericVar const_one =
-{1, 0, 0, 0, NUMERIC_POS, NULL, const_one_data};
+{1, 0, NUMERIC_POS, 0, NULL, const_one_data};
static NumericDigit const_two_data[1] = {2};
static NumericVar const_two =
-{1, 0, 0, 0, NUMERIC_POS, NULL, const_two_data};
+{1, 0, NUMERIC_POS, 0, NULL, const_two_data};
+
+#if DEC_DIGITS == 4
+static NumericDigit const_zero_point_five_data[1] = {5000};
+#elif DEC_DIGITS == 2
+static NumericDigit const_zero_point_five_data[1] = {50};
+#elif DEC_DIGITS == 1
+static NumericDigit const_zero_point_five_data[1] = {5};
+#endif
+static NumericVar const_zero_point_five =
+{1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data};
+
+#if DEC_DIGITS == 4
+static NumericDigit const_zero_point_nine_data[1] = {9000};
+#elif DEC_DIGITS == 2
+static NumericDigit const_zero_point_nine_data[1] = {90};
+#elif DEC_DIGITS == 1
+static NumericDigit const_zero_point_nine_data[1] = {9};
+#endif
+static NumericVar const_zero_point_nine =
+{1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data};
+
+#if DEC_DIGITS == 4
+static NumericDigit const_zero_point_01_data[1] = {100};
+static NumericVar const_zero_point_01 =
+{1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
+#elif DEC_DIGITS == 2
+static NumericDigit const_zero_point_01_data[1] = {1};
+static NumericVar const_zero_point_01 =
+{1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
+#elif DEC_DIGITS == 1
+static NumericDigit const_zero_point_01_data[1] = {1};
+static NumericVar const_zero_point_01 =
+{1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
+#endif
+
+#if DEC_DIGITS == 4
+static NumericDigit const_one_point_one_data[2] = {1, 1000};
+#elif DEC_DIGITS == 2
+static NumericDigit const_one_point_one_data[2] = {1, 10};
+#elif DEC_DIGITS == 1
+static NumericDigit const_one_point_one_data[2] = {1, 1};
+#endif
+static NumericVar const_one_point_one =
+{2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data};
static NumericVar const_nan =
-{0, 0, 0, 0, NUMERIC_NAN, NULL, NULL};
+{0, 0, NUMERIC_NAN, 0, NULL, NULL};
+#if DEC_DIGITS == 4
+static const int round_powers[4] = {0, 1000, 100, 10};
+#endif
/* ----------
*/
#ifdef NUMERIC_DEBUG
-static void dump_numeric(char *str, Numeric num);
-static void dump_var(char *str, NumericVar *var);
-
+static void dump_numeric(const char *str, Numeric num);
+static void dump_var(const char *str, NumericVar *var);
#else
#define dump_numeric(s,n)
#define dump_var(s,v)
#endif
-#define digitbuf_alloc(size) ((NumericDigit *) palloc(size))
+#define digitbuf_alloc(ndigits) \
+ ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit)))
#define digitbuf_free(buf) \
do { \
if ((buf) != NULL) \
pfree(buf); \
} while (0)
-#define init_var(v) memset(v,0,sizeof(NumericVar))
+#define init_var(v) MemSetAligned(v, 0, sizeof(NumericVar))
+
+#define NUMERIC_DIGITS(num) ((NumericDigit *)(num)->n_data)
+#define NUMERIC_NDIGITS(num) \
+ ((VARSIZE(num) - NUMERIC_HDRSZ) / sizeof(NumericDigit))
+
static void alloc_var(NumericVar *var, int ndigits);
static void free_var(NumericVar *var);
static void zero_var(NumericVar *var);
-static void set_var_from_str(char *str, NumericVar *dest);
+static const char *set_var_from_str(const char *str, const char *cp,
+ NumericVar *dest);
static void set_var_from_num(Numeric value, NumericVar *dest);
static void set_var_from_var(NumericVar *value, NumericVar *dest);
static char *get_str_from_var(NumericVar *var, int dscale);
+static char *get_str_from_var_sci(NumericVar *var, int rscale);
static Numeric make_result(NumericVar *var);
static void apply_typmod(NumericVar *var, int32 typmod);
+static int32 numericvar_to_int4(NumericVar *var);
+static bool numericvar_to_int8(NumericVar *var, int64 *result);
+static void int8_to_numericvar(int64 val, NumericVar *var);
+static double numeric_to_double_no_overflow(Numeric num);
+static double numericvar_to_double_no_overflow(NumericVar *var);
+
static int cmp_numerics(Numeric num1, Numeric num2);
static int cmp_var(NumericVar *var1, NumericVar *var2);
+static int cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
+ int var1weight, int var1sign,
+ const NumericDigit *var2digits, int var2ndigits,
+ int var2weight, int var2sign);
static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
-static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
-static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
+static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
+ int rscale);
+static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
+ int rscale, bool round);
+static void div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result,
+ int rscale, bool round);
+static int select_div_scale(NumericVar *var1, NumericVar *var2);
static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
static void ceil_var(NumericVar *var, NumericVar *result);
static void floor_var(NumericVar *var, NumericVar *result);
-static void sqrt_var(NumericVar *arg, NumericVar *result);
-static void exp_var(NumericVar *arg, NumericVar *result);
-static void ln_var(NumericVar *arg, NumericVar *result);
+static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale);
+static void exp_var(NumericVar *arg, NumericVar *result, int rscale);
+static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale);
+static void ln_var(NumericVar *arg, NumericVar *result, int rscale);
static void log_var(NumericVar *base, NumericVar *num, NumericVar *result);
static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result);
+static void power_var_int(NumericVar *base, int exp, NumericVar *result,
+ int rscale);
static int cmp_abs(NumericVar *var1, NumericVar *var2);
+static int cmp_abs_common(const NumericDigit *var1digits, int var1ndigits,
+ int var1weight,
+ const NumericDigit *var2digits, int var2ndigits,
+ int var2weight);
static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
-
+static void round_var(NumericVar *var, int rscale);
+static void trunc_var(NumericVar *var, int rscale);
+static void strip_var(NumericVar *var);
+static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
+ NumericVar *count_var, NumericVar *result_var);
/* ----------------------------------------------------------------------
*/
-/* ----------
+/*
* numeric_in() -
*
* Input function for numeric data type
- * ----------
*/
Datum
numeric_in(PG_FUNCTION_ARGS)
#ifdef NOT_USED
Oid typelem = PG_GETARG_OID(1);
-
#endif
int32 typmod = PG_GETARG_INT32(2);
- NumericVar value;
Numeric res;
+ const char *cp;
+
+ /* Skip leading spaces */
+ cp = str;
+ while (*cp)
+ {
+ if (!isspace((unsigned char) *cp))
+ break;
+ cp++;
+ }
/*
* Check for NaN
*/
- if (strcmp(str, "NaN") == 0)
- PG_RETURN_NUMERIC(make_result(&const_nan));
+ if (pg_strncasecmp(cp, "NaN", 3) == 0)
+ {
+ res = make_result(&const_nan);
- /*
- * Use set_var_from_str() to parse the input string and return it in
- * the packed DB storage format
- */
- init_var(&value);
- set_var_from_str(str, &value);
+ /* Should be nothing left but spaces */
+ cp += 3;
+ while (*cp)
+ {
+ if (!isspace((unsigned char) *cp))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid input syntax for type numeric: \"%s\"",
+ str)));
+ cp++;
+ }
+ }
+ else
+ {
+ /*
+ * Use set_var_from_str() to parse a normal numeric value
+ */
+ NumericVar value;
- apply_typmod(&value, typmod);
+ init_var(&value);
- res = make_result(&value);
- free_var(&value);
+ cp = set_var_from_str(str, cp, &value);
+
+ /*
+ * We duplicate a few lines of code here because we would like to
+ * throw any trailing-junk syntax error before any semantic error
+ * resulting from apply_typmod. We can't easily fold the two cases
+ * together because we mustn't apply apply_typmod to a NaN.
+ */
+ while (*cp)
+ {
+ if (!isspace((unsigned char) *cp))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid input syntax for type numeric: \"%s\"",
+ str)));
+ cp++;
+ }
+
+ apply_typmod(&value, typmod);
+
+ res = make_result(&value);
+ free_var(&value);
+ }
PG_RETURN_NUMERIC(res);
}
-/* ----------
+/*
* numeric_out() -
*
* Output function for numeric data type
- * ----------
*/
Datum
numeric_out(PG_FUNCTION_ARGS)
* Get the number in the variable format.
*
* Even if we didn't need to change format, we'd still need to copy the
- * value to have a modifiable copy for rounding. set_var_from_num()
- * also guarantees there is extra digit space in case we produce a
- * carry out from rounding.
+ * value to have a modifiable copy for rounding. set_var_from_num() also
+ * guarantees there is extra digit space in case we produce a carry out
+ * from rounding.
*/
init_var(&x);
set_var_from_num(num, &x);
PG_RETURN_CSTRING(str);
}
+/*
+ * numeric_out_sci() -
+ *
+ * Output function for numeric data type in scientific notation.
+ */
+char *
+numeric_out_sci(Numeric num, int scale)
+{
+ NumericVar x;
+ char *str;
+
+ /*
+ * Handle NaN
+ */
+ if (NUMERIC_IS_NAN(num))
+ return pstrdup("NaN");
-/* ----------
+ init_var(&x);
+ set_var_from_num(num, &x);
+
+ str = get_str_from_var_sci(&x, scale);
+
+ free_var(&x);
+ return str;
+}
+
+/*
+ * numeric_recv - converts external binary format to numeric
+ *
+ * External format is a sequence of int16's:
+ * ndigits, weight, sign, dscale, NumericDigits.
+ */
+Datum
+numeric_recv(PG_FUNCTION_ARGS)
+{
+ StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
+
+#ifdef NOT_USED
+ Oid typelem = PG_GETARG_OID(1);
+#endif
+ int32 typmod = PG_GETARG_INT32(2);
+ NumericVar value;
+ Numeric res;
+ int len,
+ i;
+
+ init_var(&value);
+
+ len = (uint16) pq_getmsgint(buf, sizeof(uint16));
+ if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("invalid length in external \"numeric\" value")));
+
+ alloc_var(&value, len);
+
+ value.weight = (int16) pq_getmsgint(buf, sizeof(int16));
+ value.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
+ if (!(value.sign == NUMERIC_POS ||
+ value.sign == NUMERIC_NEG ||
+ value.sign == NUMERIC_NAN))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("invalid sign in external \"numeric\" value")));
+
+ value.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
+ for (i = 0; i < len; i++)
+ {
+ NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
+
+ if (d < 0 || d >= NBASE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("invalid digit in external \"numeric\" value")));
+ value.digits[i] = d;
+ }
+
+ apply_typmod(&value, typmod);
+
+ res = make_result(&value);
+ free_var(&value);
+
+ PG_RETURN_NUMERIC(res);
+}
+
+/*
+ * numeric_send - converts numeric to binary format
+ */
+Datum
+numeric_send(PG_FUNCTION_ARGS)
+{
+ Numeric num = PG_GETARG_NUMERIC(0);
+ NumericVar x;
+ StringInfoData buf;
+ int i;
+
+ init_var(&x);
+ set_var_from_num(num, &x);
+
+ pq_begintypsend(&buf);
+
+ pq_sendint(&buf, x.ndigits, sizeof(int16));
+ pq_sendint(&buf, x.weight, sizeof(int16));
+ pq_sendint(&buf, x.sign, sizeof(int16));
+ pq_sendint(&buf, x.dscale, sizeof(int16));
+ for (i = 0; i < x.ndigits; i++)
+ pq_sendint(&buf, x.digits[i], sizeof(NumericDigit));
+
+ free_var(&x);
+
+ PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
+}
+
+
+/*
* numeric() -
*
* This is a special function called by the Postgres database system
- * before a value is stored in a tuples attribute. The precision and
+ * before a value is stored in a tuple's attribute. The precision and
* scale of the attribute have to be applied on the value.
- * ----------
*/
Datum
-numeric(PG_FUNCTION_ARGS)
+numeric (PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
int32 typmod = PG_GETARG_INT32(1);
int32 tmp_typmod;
int precision;
int scale;
- int maxweight;
+ int ddigits;
+ int maxdigits;
NumericVar var;
/*
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
- * If the value isn't a valid type modifier, simply return a copy of
- * the input value
+ * If the value isn't a valid type modifier, simply return a copy of the
+ * input value
*/
if (typmod < (int32) (VARHDRSZ))
{
- new = (Numeric) palloc(num->varlen);
- memcpy(new, num, num->varlen);
+ new = (Numeric) palloc(VARSIZE(num));
+ memcpy(new, num, VARSIZE(num));
PG_RETURN_NUMERIC(new);
}
tmp_typmod = typmod - VARHDRSZ;
precision = (tmp_typmod >> 16) & 0xffff;
scale = tmp_typmod & 0xffff;
- maxweight = precision - scale;
+ maxdigits = precision - scale;
/*
- * If the number is in bounds and due to the present result scale no
- * rounding could be necessary, just make a copy of the input and
- * modify its scale fields.
+ * If the number is certainly in bounds and due to the target scale no
+ * rounding could be necessary, just make a copy of the input and modify
+ * its scale fields. (Note we assume the existing dscale is honest...)
*/
- if (num->n_weight < maxweight && scale >= num->n_rscale)
+ ddigits = (num->n_weight + 1) * DEC_DIGITS;
+ if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num))
{
- new = (Numeric) palloc(num->varlen);
- memcpy(new, num, num->varlen);
- new->n_rscale = scale;
+ new = (Numeric) palloc(VARSIZE(num));
+ memcpy(new, num, VARSIZE(num));
new->n_sign_dscale = NUMERIC_SIGN(new) |
((uint16) scale & NUMERIC_DSCALE_MASK);
PG_RETURN_NUMERIC(new);
PG_RETURN_NUMERIC(new);
}
+Datum
+numerictypmodin(PG_FUNCTION_ARGS)
+{
+ ArrayType *ta = PG_GETARG_ARRAYTYPE_P(0);
+ int32 *tl;
+ int n;
+ int32 typmod;
+
+ tl = ArrayGetIntegerTypmods(ta, &n);
+
+ if (n == 2)
+ {
+ if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("NUMERIC precision %d must be between 1 and %d",
+ tl[0], NUMERIC_MAX_PRECISION)));
+ if (tl[1] < 0 || tl[1] > tl[0])
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("NUMERIC scale %d must be between 0 and precision %d",
+ tl[1], tl[0])));
+ typmod = ((tl[0] << 16) | tl[1]) + VARHDRSZ;
+ }
+ else if (n == 1)
+ {
+ if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("NUMERIC precision %d must be between 1 and %d",
+ tl[0], NUMERIC_MAX_PRECISION)));
+ /* scale defaults to zero */
+ typmod = (tl[0] << 16) + VARHDRSZ;
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid NUMERIC type modifier")));
+ typmod = 0; /* keep compiler quiet */
+ }
+
+ PG_RETURN_INT32(typmod);
+}
+
+Datum
+numerictypmodout(PG_FUNCTION_ARGS)
+{
+ int32 typmod = PG_GETARG_INT32(0);
+ char *res = (char *) palloc(64);
+
+ if (typmod >= 0)
+ snprintf(res, 64, "(%d,%d)",
+ ((typmod - VARHDRSZ) >> 16) & 0xffff,
+ (typmod - VARHDRSZ) & 0xffff);
+ else
+ *res = '\0';
+
+ PG_RETURN_CSTRING(res);
+}
+
/* ----------------------------------------------------------------------
*
/*
* Do it the easy way directly on the packed format
*/
- res = (Numeric) palloc(num->varlen);
- memcpy(res, num, num->varlen);
+ res = (Numeric) palloc(VARSIZE(num));
+ memcpy(res, num, VARSIZE(num));
res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
/*
* Do it the easy way directly on the packed format
*/
- res = (Numeric) palloc(num->varlen);
- memcpy(res, num, num->varlen);
+ res = (Numeric) palloc(VARSIZE(num));
+ memcpy(res, num, VARSIZE(num));
/*
- * The packed format is known to be totally zero digit trimmed always.
- * So we can identify a ZERO by the fact that there are no digits at
- * all. Do nothing to a zero.
+ * The packed format is known to be totally zero digit trimmed always. So
+ * we can identify a ZERO by the fact that there are no digits at all. Do
+ * nothing to a zero.
*/
- if (num->varlen != NUMERIC_HDRSZ)
+ if (VARSIZE(num) != NUMERIC_HDRSZ)
{
/* Else, flip the sign */
if (NUMERIC_SIGN(num) == NUMERIC_POS)
}
+Datum
+numeric_uplus(PG_FUNCTION_ARGS)
+{
+ Numeric num = PG_GETARG_NUMERIC(0);
+ Numeric res;
+
+ res = (Numeric) palloc(VARSIZE(num));
+ memcpy(res, num, VARSIZE(num));
+
+ PG_RETURN_NUMERIC(res);
+}
+
+/*
+ * numeric_sign() -
+ *
+ * returns -1 if the argument is less than 0, 0 if the argument is equal
+ * to 0, and 1 if the argument is greater than zero.
+ */
Datum
numeric_sign(PG_FUNCTION_ARGS)
{
init_var(&result);
/*
- * The packed format is known to be totally zero digit trimmed always.
- * So we can identify a ZERO by the fact that there are no digits at
- * all.
+ * The packed format is known to be totally zero digit trimmed always. So
+ * we can identify a ZERO by the fact that there are no digits at all.
*/
- if (num->varlen == NUMERIC_HDRSZ)
+ if (VARSIZE(num) == NUMERIC_HDRSZ)
set_var_from_var(&const_zero, &result);
else
{
-
/*
- * And if there are some, we return a copy of ONE with the sign of
- * our argument
+ * And if there are some, we return a copy of ONE with the sign of our
+ * argument
*/
set_var_from_var(&const_one, &result);
result.sign = NUMERIC_SIGN(num);
}
-/* ----------
+/*
* numeric_round() -
*
* Round a value to have 'scale' digits after the decimal point.
* We allow negative 'scale', implying rounding before the decimal
* point --- Oracle interprets rounding that way.
- * ----------
*/
Datum
numeric_round(PG_FUNCTION_ARGS)
int32 scale = PG_GETARG_INT32(1);
Numeric res;
NumericVar arg;
- int i;
/*
* Handle NaN
/*
* Limit the scale value to avoid possible overflow in calculations
- * below.
*/
- scale = MIN(NUMERIC_MAX_RESULT_SCALE,
- MAX(-NUMERIC_MAX_RESULT_SCALE, scale));
+ scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
+ scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
/*
* Unpack the argument and round it at the proper digit position
init_var(&arg);
set_var_from_num(num, &arg);
- i = arg.weight + scale + 1;
-
- if (i < arg.ndigits)
- {
-
- /*
- * If i = 0, the value loses all digits, but could round up if its
- * first digit is more than 4. If i < 0 the result must be 0.
- */
- if (i < 0)
- arg.ndigits = 0;
- else
- {
- int carry = (arg.digits[i] > 4) ? 1 : 0;
-
- arg.ndigits = i;
-
- while (carry)
- {
- carry += arg.digits[--i];
- arg.digits[i] = carry % 10;
- carry /= 10;
- }
-
- if (i < 0)
- {
- Assert(i == -1);/* better not have added more than 1 digit */
- Assert(arg.digits > arg.buf);
- arg.digits--;
- arg.ndigits++;
- arg.weight++;
- }
- }
- }
+ round_var(&arg, scale);
- /*
- * Set result's scale to something reasonable.
- */
- scale = MIN(NUMERIC_MAX_DISPLAY_SCALE, MAX(0, scale));
- arg.rscale = scale;
- arg.dscale = scale;
+ /* We don't allow negative output dscale */
+ if (scale < 0)
+ arg.dscale = 0;
/*
* Return the rounded result
}
-/* ----------
+/*
* numeric_trunc() -
*
* Truncate a value to have 'scale' digits after the decimal point.
* We allow negative 'scale', implying a truncation before the decimal
* point --- Oracle interprets truncation that way.
- * ----------
*/
Datum
numeric_trunc(PG_FUNCTION_ARGS)
/*
* Limit the scale value to avoid possible overflow in calculations
- * below.
*/
- scale = MIN(NUMERIC_MAX_RESULT_SCALE,
- MAX(-NUMERIC_MAX_RESULT_SCALE, scale));
+ scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
+ scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
/*
* Unpack the argument and truncate it at the proper digit position
init_var(&arg);
set_var_from_num(num, &arg);
- arg.ndigits = MIN(arg.ndigits, MAX(0, arg.weight + scale + 1));
+ trunc_var(&arg, scale);
- /*
- * Set result's scale to something reasonable.
- */
- scale = MIN(NUMERIC_MAX_DISPLAY_SCALE, MAX(0, scale));
- arg.rscale = scale;
- arg.dscale = scale;
+ /* We don't allow negative output dscale */
+ if (scale < 0)
+ arg.dscale = 0;
/*
* Return the truncated result
}
-/* ----------
+/*
* numeric_ceil() -
*
* Return the smallest integer greater than or equal to the argument
- * ----------
*/
Datum
numeric_ceil(PG_FUNCTION_ARGS)
set_var_from_num(num, &result);
ceil_var(&result, &result);
- result.dscale = 0;
-
res = make_result(&result);
free_var(&result);
}
-/* ----------
+/*
* numeric_floor() -
*
* Return the largest integer equal to or less than the argument
- * ----------
*/
Datum
numeric_floor(PG_FUNCTION_ARGS)
set_var_from_num(num, &result);
floor_var(&result, &result);
- result.dscale = 0;
-
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
+/*
+ * Implements the numeric version of the width_bucket() function
+ * defined by SQL2003. See also width_bucket_float8().
+ *
+ * 'bound1' and 'bound2' are the lower and upper bounds of the
+ * histogram's range, respectively. 'count' is the number of buckets
+ * in the histogram. width_bucket() returns an integer indicating the
+ * bucket number that 'operand' belongs to in an equiwidth histogram
+ * with the specified characteristics. An operand smaller than the
+ * lower bound is assigned to bucket 0. An operand greater than the
+ * upper bound is assigned to an additional bucket (with number
+ * count+1). We don't allow "NaN" for any of the numeric arguments.
+ */
+Datum
+width_bucket_numeric(PG_FUNCTION_ARGS)
+{
+ Numeric operand = PG_GETARG_NUMERIC(0);
+ Numeric bound1 = PG_GETARG_NUMERIC(1);
+ Numeric bound2 = PG_GETARG_NUMERIC(2);
+ int32 count = PG_GETARG_INT32(3);
+ NumericVar count_var;
+ NumericVar result_var;
+ int32 result;
+
+ if (count <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
+ errmsg("count must be greater than zero")));
+
+ if (NUMERIC_IS_NAN(operand) ||
+ NUMERIC_IS_NAN(bound1) ||
+ NUMERIC_IS_NAN(bound2))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
+ errmsg("operand, lower bound and upper bound cannot be NaN")));
+
+ init_var(&result_var);
+ init_var(&count_var);
+
+ /* Convert 'count' to a numeric, for ease of use later */
+ int8_to_numericvar((int64) count, &count_var);
+
+ switch (cmp_numerics(bound1, bound2))
+ {
+ case 0:
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
+ errmsg("lower bound cannot equal upper bound")));
+
+ /* bound1 < bound2 */
+ case -1:
+ if (cmp_numerics(operand, bound1) < 0)
+ set_var_from_var(&const_zero, &result_var);
+ else if (cmp_numerics(operand, bound2) >= 0)
+ add_var(&count_var, &const_one, &result_var);
+ else
+ compute_bucket(operand, bound1, bound2,
+ &count_var, &result_var);
+ break;
+
+ /* bound1 > bound2 */
+ case 1:
+ if (cmp_numerics(operand, bound1) > 0)
+ set_var_from_var(&const_zero, &result_var);
+ else if (cmp_numerics(operand, bound2) <= 0)
+ add_var(&count_var, &const_one, &result_var);
+ else
+ compute_bucket(operand, bound1, bound2,
+ &count_var, &result_var);
+ break;
+ }
+
+ /* if result exceeds the range of a legal int4, we ereport here */
+ result = numericvar_to_int4(&result_var);
+
+ free_var(&count_var);
+ free_var(&result_var);
+
+ PG_RETURN_INT32(result);
+}
+
+/*
+ * If 'operand' is not outside the bucket range, determine the correct
+ * bucket for it to go. The calculations performed by this function
+ * are derived directly from the SQL2003 spec.
+ */
+static void
+compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
+ NumericVar *count_var, NumericVar *result_var)
+{
+ NumericVar bound1_var;
+ NumericVar bound2_var;
+ NumericVar operand_var;
+
+ init_var(&bound1_var);
+ init_var(&bound2_var);
+ init_var(&operand_var);
+
+ set_var_from_num(bound1, &bound1_var);
+ set_var_from_num(bound2, &bound2_var);
+ set_var_from_num(operand, &operand_var);
+
+ if (cmp_var(&bound1_var, &bound2_var) < 0)
+ {
+ sub_var(&operand_var, &bound1_var, &operand_var);
+ sub_var(&bound2_var, &bound1_var, &bound2_var);
+ div_var(&operand_var, &bound2_var, result_var,
+ select_div_scale(&operand_var, &bound2_var), true);
+ }
+ else
+ {
+ sub_var(&bound1_var, &operand_var, &operand_var);
+ sub_var(&bound1_var, &bound2_var, &bound1_var);
+ div_var(&operand_var, &bound1_var, result_var,
+ select_div_scale(&operand_var, &bound1_var), true);
+ }
+
+ mul_var(result_var, count_var, result_var,
+ result_var->dscale + count_var->dscale);
+ add_var(result_var, &const_one, result_var);
+ floor_var(result_var, result_var);
+
+ free_var(&bound1_var);
+ free_var(&bound2_var);
+ free_var(&operand_var);
+}
/* ----------------------------------------------------------------------
*
int result;
/*
- * We consider all NANs to be equal and larger than any non-NAN.
- * This is somewhat arbitrary; the important thing is to have a
- * consistent sort order.
+ * We consider all NANs to be equal and larger than any non-NAN. This is
+ * somewhat arbitrary; the important thing is to have a consistent sort
+ * order.
*/
if (NUMERIC_IS_NAN(num1))
{
}
else
{
- NumericVar arg1;
- NumericVar arg2;
-
- init_var(&arg1);
- init_var(&arg2);
-
- set_var_from_num(num1, &arg1);
- set_var_from_num(num2, &arg2);
-
- result = cmp_var(&arg1, &arg2);
-
- free_var(&arg1);
- free_var(&arg2);
+ result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
+ num1->n_weight, NUMERIC_SIGN(num1),
+ NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
+ num2->n_weight, NUMERIC_SIGN(num2));
}
return result;
}
+Datum
+hash_numeric(PG_FUNCTION_ARGS)
+{
+ Numeric key = PG_GETARG_NUMERIC(0);
+ Datum digit_hash;
+ Datum result;
+ int weight;
+ int start_offset;
+ int end_offset;
+ int i;
+ int hash_len;
-/* ----------------------------------------------------------------------
- *
- * Arithmetic base functions
- *
+ /* If it's NaN, don't try to hash the rest of the fields */
+ if (NUMERIC_IS_NAN(key))
+ PG_RETURN_UINT32(0);
+
+ weight = key->n_weight;
+ start_offset = 0;
+ end_offset = 0;
+
+ /*
+ * Omit any leading or trailing zeros from the input to the hash. The
+ * numeric implementation *should* guarantee that leading and trailing
+ * zeros are suppressed, but we're paranoid. Note that we measure the
+ * starting and ending offsets in units of NumericDigits, not bytes.
+ */
+ for (i = 0; i < NUMERIC_NDIGITS(key); i++)
+ {
+ if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
+ break;
+
+ start_offset++;
+
+ /*
+ * The weight is effectively the # of digits before the decimal point,
+ * so decrement it for each leading zero we skip.
+ */
+ weight--;
+ }
+
+ /*
+ * If there are no non-zero digits, then the value of the number is zero,
+ * regardless of any other fields.
+ */
+ if (NUMERIC_NDIGITS(key) == start_offset)
+ PG_RETURN_UINT32(-1);
+
+ for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--)
+ {
+ if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
+ break;
+
+ end_offset++;
+ }
+
+ /* If we get here, there should be at least one non-zero digit */
+ Assert(start_offset + end_offset < NUMERIC_NDIGITS(key));
+
+ /*
+ * Note that we don't hash on the Numeric's scale, since two numerics can
+ * compare equal but have different scales. We also don't hash on the
+ * sign, although we could: since a sign difference implies inequality,
+ * this shouldn't affect correctness.
+ */
+ hash_len = NUMERIC_NDIGITS(key) - start_offset - end_offset;
+ digit_hash = hash_any((unsigned char *) (NUMERIC_DIGITS(key) + start_offset),
+ hash_len * sizeof(NumericDigit));
+
+ /* Mix in the weight, via XOR */
+ result = digit_hash ^ weight;
+
+ PG_RETURN_DATUM(result);
+}
+
+
+/* ----------------------------------------------------------------------
+ *
+ * Basic arithmetic functions
+ *
* ----------------------------------------------------------------------
*/
-/* ----------
+/*
* numeric_add() -
*
* Add two numerics
- * ----------
*/
Datum
numeric_add(PG_FUNCTION_ARGS)
/*
* Unpack the values, let add_var() compute the result and return it.
- * The internals of add_var() will automatically set the correct
- * result and display scales in the result.
*/
init_var(&arg1);
init_var(&arg2);
set_var_from_num(num2, &arg2);
add_var(&arg1, &arg2, &result);
+
res = make_result(&result);
free_var(&arg1);
}
-/* ----------
+/*
* numeric_sub() -
*
* Subtract one numeric from another
- * ----------
*/
Datum
numeric_sub(PG_FUNCTION_ARGS)
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
- * Unpack the two arguments, let sub_var() compute the result and
- * return it.
+ * Unpack the values, let sub_var() compute the result and return it.
*/
init_var(&arg1);
init_var(&arg2);
set_var_from_num(num2, &arg2);
sub_var(&arg1, &arg2, &result);
+
res = make_result(&result);
free_var(&arg1);
}
-/* ----------
+/*
* numeric_mul() -
*
* Calculate the product of two numerics
- * ----------
*/
Datum
numeric_mul(PG_FUNCTION_ARGS)
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
- * Unpack the arguments, let mul_var() compute the result and return
- * it. Unlike add_var() and sub_var(), mul_var() will round the result
- * to the scale stored in global_rscale. In the case of numeric_mul(),
- * which is invoked for the * operator on numerics, we set it to the
- * exact representation for the product (rscale = sum(rscale of arg1,
- * rscale of arg2) and the same for the dscale).
+ * Unpack the values, let mul_var() compute the result and return it.
+ * Unlike add_var() and sub_var(), mul_var() will round its result. In the
+ * case of numeric_mul(), which is invoked for the * operator on numerics,
+ * we request exact representation for the product (rscale = sum(dscale of
+ * arg1, dscale of arg2)).
*/
init_var(&arg1);
init_var(&arg2);
set_var_from_num(num1, &arg1);
set_var_from_num(num2, &arg2);
- global_rscale = arg1.rscale + arg2.rscale;
-
- mul_var(&arg1, &arg2, &result);
-
- result.dscale = arg1.dscale + arg2.dscale;
+ mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
res = make_result(&result);
}
-/* ----------
+/*
* numeric_div() -
*
* Divide one numeric into another
- * ----------
*/
Datum
numeric_div(PG_FUNCTION_ARGS)
NumericVar arg2;
NumericVar result;
Numeric res;
- int res_dscale;
+ int rscale;
/*
* Handle NaN
set_var_from_num(num1, &arg1);
set_var_from_num(num2, &arg2);
- /* ----------
- * The result scale of a division isn't specified in any
- * SQL standard. For Postgres it is the following (where
- * SR, DR are the result- and display-scales of the returned
- * value, S1, D1, S2 and D2 are the scales of the two arguments,
- * The minimum and maximum scales are compile time options from
- * numeric.h):
- *
- * DR = MIN(MAX(D1 + D2, MIN_DISPLAY_SCALE), MAX_DISPLAY_SCALE)
- * SR = MIN(MAX(MAX(S1 + S2, MIN_RESULT_SCALE), DR + 4), MAX_RESULT_SCALE)
- *
- * By default, any result is computed with a minimum of 34 digits
- * after the decimal point or at least with 4 digits more than
- * displayed.
- * ----------
+ /*
+ * Select scale for division result
+ */
+ rscale = select_div_scale(&arg1, &arg2);
+
+ /*
+ * Do the divide and return the result
+ */
+ div_var(&arg1, &arg2, &result, rscale, true);
+
+ res = make_result(&result);
+
+ free_var(&arg1);
+ free_var(&arg2);
+ free_var(&result);
+
+ PG_RETURN_NUMERIC(res);
+}
+
+
+/*
+ * numeric_div_trunc() -
+ *
+ * Divide one numeric into another, truncating the result to an integer
+ */
+Datum
+numeric_div_trunc(PG_FUNCTION_ARGS)
+{
+ Numeric num1 = PG_GETARG_NUMERIC(0);
+ Numeric num2 = PG_GETARG_NUMERIC(1);
+ NumericVar arg1;
+ NumericVar arg2;
+ NumericVar result;
+ Numeric res;
+
+ /*
+ * Handle NaN
*/
- res_dscale = MAX(arg1.dscale + arg2.dscale, NUMERIC_MIN_DISPLAY_SCALE);
- res_dscale = MIN(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
- global_rscale = MAX(arg1.rscale + arg2.rscale,
- NUMERIC_MIN_RESULT_SCALE);
- global_rscale = MAX(global_rscale, res_dscale + 4);
- global_rscale = MIN(global_rscale, NUMERIC_MAX_RESULT_SCALE);
+ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
+ PG_RETURN_NUMERIC(make_result(&const_nan));
/*
- * Do the divide, set the display scale and return the result
+ * Unpack the arguments
*/
- div_var(&arg1, &arg2, &result);
+ init_var(&arg1);
+ init_var(&arg2);
+ init_var(&result);
+
+ set_var_from_num(num1, &arg1);
+ set_var_from_num(num2, &arg2);
- result.dscale = res_dscale;
+ /*
+ * Do the divide and return the result
+ */
+ div_var(&arg1, &arg2, &result, 0, false);
res = make_result(&result);
}
-/* ----------
+/*
* numeric_mod() -
*
* Calculate the modulo of two numerics
- * ----------
*/
Datum
numeric_mod(PG_FUNCTION_ARGS)
}
-/* ----------
+/*
* numeric_inc() -
*
* Increment a number by one
- * ----------
*/
Datum
numeric_inc(PG_FUNCTION_ARGS)
set_var_from_num(num, &arg);
add_var(&arg, &const_one, &arg);
+
res = make_result(&arg);
free_var(&arg);
}
-/* ----------
+/*
* numeric_smaller() -
*
* Return the smaller of two numbers
- * ----------
*/
Datum
numeric_smaller(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
- NumericVar arg1;
- NumericVar arg2;
- Numeric res;
-
- /*
- * Handle NaN
- */
- if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
- PG_RETURN_NUMERIC(make_result(&const_nan));
/*
- * Unpack the values, and decide which is the smaller one
+ * Use cmp_numerics so that this will agree with the comparison operators,
+ * particularly as regards comparisons involving NaN.
*/
- init_var(&arg1);
- init_var(&arg2);
-
- set_var_from_num(num1, &arg1);
- set_var_from_num(num2, &arg2);
-
- if (cmp_var(&arg1, &arg2) <= 0)
- res = make_result(&arg1);
+ if (cmp_numerics(num1, num2) < 0)
+ PG_RETURN_NUMERIC(num1);
else
- res = make_result(&arg2);
-
- free_var(&arg1);
- free_var(&arg2);
-
- PG_RETURN_NUMERIC(res);
+ PG_RETURN_NUMERIC(num2);
}
-/* ----------
+/*
* numeric_larger() -
*
* Return the larger of two numbers
- * ----------
*/
Datum
numeric_larger(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
- NumericVar arg1;
- NumericVar arg2;
- Numeric res;
-
- /*
- * Handle NaN
- */
- if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
- PG_RETURN_NUMERIC(make_result(&const_nan));
/*
- * Unpack the values, and decide which is the larger one
+ * Use cmp_numerics so that this will agree with the comparison operators,
+ * particularly as regards comparisons involving NaN.
*/
- init_var(&arg1);
- init_var(&arg2);
-
- set_var_from_num(num1, &arg1);
- set_var_from_num(num2, &arg2);
-
- if (cmp_var(&arg1, &arg2) >= 0)
- res = make_result(&arg1);
+ if (cmp_numerics(num1, num2) > 0)
+ PG_RETURN_NUMERIC(num1);
else
- res = make_result(&arg2);
-
- free_var(&arg1);
- free_var(&arg2);
-
- PG_RETURN_NUMERIC(res);
+ PG_RETURN_NUMERIC(num2);
}
/* ----------------------------------------------------------------------
*
- * Complex math functions
+ * Advanced math functions
*
* ----------------------------------------------------------------------
*/
+/*
+ * numeric_fac()
+ *
+ * Compute factorial
+ */
+Datum
+numeric_fac(PG_FUNCTION_ARGS)
+{
+ int64 num = PG_GETARG_INT64(0);
+ Numeric res;
+ NumericVar fact;
+ NumericVar result;
+
+ if (num <= 1)
+ {
+ res = make_result(&const_one);
+ PG_RETURN_NUMERIC(res);
+ }
+ /* Fail immediately if the result would overflow */
+ if (num > 32177)
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("value overflows numeric format")));
-/* ----------
+ init_var(&fact);
+ init_var(&result);
+
+ int8_to_numericvar(num, &result);
+
+ for (num = num - 1; num > 1; num--)
+ {
+ /* this loop can take awhile, so allow it to be interrupted */
+ CHECK_FOR_INTERRUPTS();
+
+ int8_to_numericvar(num, &fact);
+
+ mul_var(&result, &fact, &result, 0);
+ }
+
+ res = make_result(&result);
+
+ free_var(&fact);
+ free_var(&result);
+
+ PG_RETURN_NUMERIC(res);
+}
+
+
+/*
* numeric_sqrt() -
*
* Compute the square root of a numeric.
- * ----------
*/
Datum
numeric_sqrt(PG_FUNCTION_ARGS)
Numeric res;
NumericVar arg;
NumericVar result;
- int res_dscale;
+ int sweight;
+ int rscale;
/*
* Handle NaN
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
- * Unpack the argument, determine the scales like for divide, let
- * sqrt_var() do the calculation and return the result.
+ * Unpack the argument and determine the result scale. We choose a scale
+ * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
+ * case not less than the input's dscale.
*/
init_var(&arg);
init_var(&result);
set_var_from_num(num, &arg);
- res_dscale = MAX(arg.dscale, NUMERIC_MIN_DISPLAY_SCALE);
- res_dscale = MIN(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
- global_rscale = MAX(arg.rscale, NUMERIC_MIN_RESULT_SCALE);
- global_rscale = MAX(global_rscale, res_dscale + 4);
- global_rscale = MIN(global_rscale, NUMERIC_MAX_RESULT_SCALE);
+ /* Assume the input was normalized, so arg.weight is accurate */
+ sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
- sqrt_var(&arg, &result);
+ rscale = NUMERIC_MIN_SIG_DIGITS - sweight;
+ rscale = Max(rscale, arg.dscale);
+ rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+ rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
- result.dscale = res_dscale;
+ /*
+ * Let sqrt_var() do the calculation and return the result.
+ */
+ sqrt_var(&arg, &result, rscale);
res = make_result(&result);
}
-/* ----------
+/*
* numeric_exp() -
*
* Raise e to the power of x
- * ----------
*/
Datum
numeric_exp(PG_FUNCTION_ARGS)
Numeric res;
NumericVar arg;
NumericVar result;
- int res_dscale;
+ int rscale;
+ double val;
/*
* Handle NaN
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
- * Same procedure like for sqrt().
+ * Unpack the argument and determine the result scale. We choose a scale
+ * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
+ * case not less than the input's dscale.
*/
init_var(&arg);
init_var(&result);
+
set_var_from_num(num, &arg);
- res_dscale = MAX(arg.dscale, NUMERIC_MIN_DISPLAY_SCALE);
- res_dscale = MIN(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
- global_rscale = MAX(arg.rscale, NUMERIC_MIN_RESULT_SCALE);
- global_rscale = MAX(global_rscale, res_dscale + 4);
- global_rscale = MIN(global_rscale, NUMERIC_MAX_RESULT_SCALE);
+ /* convert input to float8, ignoring overflow */
+ val = numericvar_to_double_no_overflow(&arg);
+
+ /*
+ * log10(result) = num * log10(e), so this is approximately the decimal
+ * weight of the result:
+ */
+ val *= 0.434294481903252;
- exp_var(&arg, &result);
+ /* limit to something that won't cause integer overflow */
+ val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
+ val = Min(val, NUMERIC_MAX_RESULT_SCALE);
- result.dscale = res_dscale;
+ rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
+ rscale = Max(rscale, arg.dscale);
+ rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+ rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
+
+ /*
+ * Let exp_var() do the calculation and return the result.
+ */
+ exp_var(&arg, &result, rscale);
res = make_result(&result);
}
-/* ----------
+/*
* numeric_ln() -
*
* Compute the natural logarithm of x
- * ----------
*/
Datum
numeric_ln(PG_FUNCTION_ARGS)
Numeric res;
NumericVar arg;
NumericVar result;
- int res_dscale;
+ int dec_digits;
+ int rscale;
/*
* Handle NaN
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
- /*
- * Same procedure like for sqrt()
- */
init_var(&arg);
init_var(&result);
+
set_var_from_num(num, &arg);
- res_dscale = MAX(arg.dscale, NUMERIC_MIN_DISPLAY_SCALE);
- res_dscale = MIN(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
- global_rscale = MAX(arg.rscale, NUMERIC_MIN_RESULT_SCALE);
- global_rscale = MAX(global_rscale, res_dscale + 4);
- global_rscale = MIN(global_rscale, NUMERIC_MAX_RESULT_SCALE);
+ /* Approx decimal digits before decimal point */
+ dec_digits = (arg.weight + 1) * DEC_DIGITS;
- ln_var(&arg, &result);
+ if (dec_digits > 1)
+ rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
+ else if (dec_digits < 1)
+ rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
+ else
+ rscale = NUMERIC_MIN_SIG_DIGITS;
+
+ rscale = Max(rscale, arg.dscale);
+ rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+ rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
- result.dscale = res_dscale;
+ ln_var(&arg, &result, rscale);
res = make_result(&result);
}
-/* ----------
+/*
* numeric_log() -
*
* Compute the logarithm of x in a given base
- * ----------
*/
Datum
numeric_log(PG_FUNCTION_ARGS)
NumericVar arg1;
NumericVar arg2;
NumericVar result;
- int res_dscale;
/*
* Handle NaN
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
- * Initialize things and calculate scales
+ * Initialize things
*/
init_var(&arg1);
init_var(&arg2);
init_var(&result);
+
set_var_from_num(num1, &arg1);
set_var_from_num(num2, &arg2);
- res_dscale = MAX(arg1.dscale + arg2.dscale, NUMERIC_MIN_DISPLAY_SCALE);
- res_dscale = MIN(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
- global_rscale = MAX(arg1.rscale + arg2.rscale, NUMERIC_MIN_RESULT_SCALE);
- global_rscale = MAX(global_rscale, res_dscale + 4);
- global_rscale = MIN(global_rscale, NUMERIC_MAX_RESULT_SCALE);
-
/*
- * Call log_var() to compute and return the result
+ * Call log_var() to compute and return the result; note it handles scale
+ * selection itself.
*/
log_var(&arg1, &arg2, &result);
- result.dscale = res_dscale;
-
res = make_result(&result);
free_var(&result);
}
-/* ----------
+/*
* numeric_power() -
*
- * Raise m to the power of x
- * ----------
+ * Raise b to the power of x
*/
Datum
numeric_power(PG_FUNCTION_ARGS)
Numeric res;
NumericVar arg1;
NumericVar arg2;
+ NumericVar arg2_trunc;
NumericVar result;
- int res_dscale;
/*
* Handle NaN
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
- * Initialize things and calculate scales
+ * Initialize things
*/
init_var(&arg1);
init_var(&arg2);
+ init_var(&arg2_trunc);
init_var(&result);
+
set_var_from_num(num1, &arg1);
set_var_from_num(num2, &arg2);
+ set_var_from_var(&arg2, &arg2_trunc);
- res_dscale = MAX(arg1.dscale + arg2.dscale, NUMERIC_MIN_DISPLAY_SCALE);
- res_dscale = MIN(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
- global_rscale = MAX(arg1.rscale + arg2.rscale, NUMERIC_MIN_RESULT_SCALE);
- global_rscale = MAX(global_rscale, res_dscale + 4);
- global_rscale = MIN(global_rscale, NUMERIC_MAX_RESULT_SCALE);
+ trunc_var(&arg2_trunc, 0);
/*
- * Call log_var() to compute and return the result
+ * The SQL spec requires that we emit a particular SQLSTATE error code for
+ * certain error conditions. Specifically, we don't return a
+ * divide-by-zero error code for 0 ^ -1.
*/
- power_var(&arg1, &arg2, &result);
+ if (cmp_var(&arg1, &const_zero) == 0 &&
+ cmp_var(&arg2, &const_zero) < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
+ errmsg("zero raised to a negative power is undefined")));
+
+ if (cmp_var(&arg1, &const_zero) < 0 &&
+ cmp_var(&arg2, &arg2_trunc) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
+ errmsg("a negative number raised to a non-integer power yields a complex result")));
- result.dscale = res_dscale;
+ /*
+ * Call power_var() to compute and return the result; note it handles
+ * scale selection itself.
+ */
+ power_var(&arg1, &arg2, &result);
res = make_result(&result);
free_var(&result);
free_var(&arg2);
+ free_var(&arg2_trunc);
free_var(&arg1);
PG_RETURN_NUMERIC(res);
int32 val = PG_GETARG_INT32(0);
Numeric res;
NumericVar result;
- char *tmp;
init_var(&result);
- tmp = DatumGetCString(DirectFunctionCall1(int4out,
- Int32GetDatum(val)));
- set_var_from_str(tmp, &result);
+ int8_to_numericvar((int64) val, &result);
+
res = make_result(&result);
free_var(&result);
- pfree(tmp);
PG_RETURN_NUMERIC(res);
}
{
Numeric num = PG_GETARG_NUMERIC(0);
NumericVar x;
- char *str;
- Datum result;
+ int32 result;
/* XXX would it be better to return NULL? */
if (NUMERIC_IS_NAN(num))
- elog(ERROR, "Cannot convert NaN to int4");
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot convert NaN to integer")));
- /*
- * Get the number in the variable format so we can round to integer.
- */
+ /* Convert to variable format, then convert to int4 */
init_var(&x);
set_var_from_num(num, &x);
+ result = numericvar_to_int4(&x);
+ free_var(&x);
+ PG_RETURN_INT32(result);
+}
- str = get_str_from_var(&x, 0); /* dscale = 0 produces rounding */
+/*
+ * Given a NumericVar, convert it to an int32. If the NumericVar
+ * exceeds the range of an int32, raise the appropriate error via
+ * ereport(). The input NumericVar is *not* free'd.
+ */
+static int32
+numericvar_to_int4(NumericVar *var)
+{
+ int32 result;
+ int64 val;
- free_var(&x);
+ if (!numericvar_to_int8(var, &val))
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("integer out of range")));
- result = DirectFunctionCall1(int4in, CStringGetDatum(str));
- pfree(str);
+ /* Down-convert to int4 */
+ result = (int32) val;
- PG_RETURN_DATUM(result);
-}
+ /* Test for overflow by reverse-conversion. */
+ if ((int64) result != val)
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("integer out of range")));
+ return result;
+}
Datum
int8_numeric(PG_FUNCTION_ARGS)
{
- Datum val = PG_GETARG_DATUM(0);
+ int64 val = PG_GETARG_INT64(0);
Numeric res;
NumericVar result;
- char *tmp;
init_var(&result);
- tmp = DatumGetCString(DirectFunctionCall1(int8out, val));
- set_var_from_str(tmp, &result);
+ int8_to_numericvar(val, &result);
+
res = make_result(&result);
free_var(&result);
- pfree(tmp);
PG_RETURN_NUMERIC(res);
}
{
Numeric num = PG_GETARG_NUMERIC(0);
NumericVar x;
- char *str;
- Datum result;
+ int64 result;
/* XXX would it be better to return NULL? */
if (NUMERIC_IS_NAN(num))
- elog(ERROR, "Cannot convert NaN to int8");
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot convert NaN to bigint")));
- /*
- * Get the number in the variable format so we can round to integer.
- */
+ /* Convert to variable format and thence to int8 */
init_var(&x);
set_var_from_num(num, &x);
- str = get_str_from_var(&x, 0); /* dscale = 0 produces rounding */
+ if (!numericvar_to_int8(&x, &result))
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("bigint out of range")));
free_var(&x);
- result = DirectFunctionCall1(int8in, CStringGetDatum(str));
- pfree(str);
-
- PG_RETURN_DATUM(result);
+ PG_RETURN_INT64(result);
}
int16 val = PG_GETARG_INT16(0);
Numeric res;
NumericVar result;
- char *tmp;
init_var(&result);
- tmp = DatumGetCString(DirectFunctionCall1(int2out,
- Int16GetDatum(val)));
- set_var_from_str(tmp, &result);
+ int8_to_numericvar((int64) val, &result);
+
res = make_result(&result);
free_var(&result);
- pfree(tmp);
PG_RETURN_NUMERIC(res);
}
{
Numeric num = PG_GETARG_NUMERIC(0);
NumericVar x;
- char *str;
- Datum result;
+ int64 val;
+ int16 result;
/* XXX would it be better to return NULL? */
if (NUMERIC_IS_NAN(num))
- elog(ERROR, "Cannot convert NaN to int2");
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot convert NaN to smallint")));
- /*
- * Get the number in the variable format so we can round to integer.
- */
+ /* Convert to variable format and thence to int8 */
init_var(&x);
set_var_from_num(num, &x);
- str = get_str_from_var(&x, 0); /* dscale = 0 produces rounding */
+ if (!numericvar_to_int8(&x, &val))
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("smallint out of range")));
free_var(&x);
- result = DirectFunctionCall1(int2in, CStringGetDatum(str));
- pfree(str);
+ /* Down-convert to int2 */
+ result = (int16) val;
- return result;
+ /* Test for overflow by reverse-conversion. */
+ if ((int64) result != val)
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("smallint out of range")));
+
+ PG_RETURN_INT16(result);
}
init_var(&result);
- set_var_from_str(buf, &result);
+ /* Assume we need not worry about leading/trailing spaces */
+ (void) set_var_from_str(buf, buf, &result);
+
res = make_result(&result);
free_var(&result);
Datum result;
if (NUMERIC_IS_NAN(num))
- PG_RETURN_FLOAT8(NAN);
+ PG_RETURN_FLOAT8(get_float8_nan());
tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
NumericGetDatum(num)));
}
+/* Convert numeric to float8; if out of range, return +/- HUGE_VAL */
Datum
-float4_numeric(PG_FUNCTION_ARGS)
+numeric_float8_no_overflow(PG_FUNCTION_ARGS)
+{
+ Numeric num = PG_GETARG_NUMERIC(0);
+ double val;
+
+ if (NUMERIC_IS_NAN(num))
+ PG_RETURN_FLOAT8(get_float8_nan());
+
+ val = numeric_to_double_no_overflow(num);
+
+ PG_RETURN_FLOAT8(val);
+}
+
+Datum
+float4_numeric(PG_FUNCTION_ARGS)
{
float4 val = PG_GETARG_FLOAT4(0);
Numeric res;
init_var(&result);
- set_var_from_str(buf, &result);
+ /* Assume we need not worry about leading/trailing spaces */
+ (void) set_var_from_str(buf, buf, &result);
+
res = make_result(&result);
free_var(&result);
Datum result;
if (NUMERIC_IS_NAN(num))
- PG_RETURN_FLOAT4((float4) NAN);
+ PG_RETURN_FLOAT4(get_float4_nan());
tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
NumericGetDatum(num)));
/* We assume the input is array of numeric */
deconstruct_array(transarray,
- false, -1, 'i',
- &transdatums, &ndatums);
+ NUMERICOID, -1, false, 'i',
+ &transdatums, NULL, &ndatums);
if (ndatums != 3)
- elog(ERROR, "do_numeric_accum: expected 3-element numeric array");
+ elog(ERROR, "expected 3-element numeric array");
N = transdatums[0];
sumX = transdatums[1];
sumX2 = transdatums[2];
NumericGetDatum(newval));
sumX2 = DirectFunctionCall2(numeric_add, sumX2,
DirectFunctionCall2(numeric_mul,
- NumericGetDatum(newval),
- NumericGetDatum(newval)));
+ NumericGetDatum(newval),
+ NumericGetDatum(newval)));
transdatums[0] = N;
transdatums[1] = sumX;
transdatums[2] = sumX2;
result = construct_array(transdatums, 3,
- false, -1, 'i');
+ NUMERICOID, -1, false, 'i');
+
+ return result;
+}
+
+/*
+ * Improve avg performance by not caclulating sum(X*X).
+ */
+static ArrayType *
+do_numeric_avg_accum(ArrayType *transarray, Numeric newval)
+{
+ Datum *transdatums;
+ int ndatums;
+ Datum N,
+ sumX;
+ ArrayType *result;
+
+ /* We assume the input is array of numeric */
+ deconstruct_array(transarray,
+ NUMERICOID, -1, false, 'i',
+ &transdatums, NULL, &ndatums);
+ if (ndatums != 2)
+ elog(ERROR, "expected 2-element numeric array");
+ N = transdatums[0];
+ sumX = transdatums[1];
+
+ N = DirectFunctionCall1(numeric_inc, N);
+ sumX = DirectFunctionCall2(numeric_add, sumX,
+ NumericGetDatum(newval));
+
+ transdatums[0] = N;
+ transdatums[1] = sumX;
+
+ result = construct_array(transdatums, 2,
+ NUMERICOID, -1, false, 'i');
return result;
}
PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
}
+/*
+ * Optimized case for average of numeric.
+ */
+Datum
+numeric_avg_accum(PG_FUNCTION_ARGS)
+{
+ ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0);
+ Numeric newval = PG_GETARG_NUMERIC(1);
+
+ PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
+}
+
/*
* Integer data types all use Numeric accumulators to share code and
- * avoid risk of overflow.
+ * avoid risk of overflow. For int2 and int4 inputs, Numeric accumulation
+ * is overkill for the N and sum(X) values, but definitely not overkill
+ * for the sum(X*X) value. Hence, we use int2_accum and int4_accum only
+ * for stddev/variance --- there are faster special-purpose accumulator
+ * routines for SUM and AVG of these datatypes.
*/
Datum
PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
}
+/*
+ * Optimized case for average of int8.
+ */
+Datum
+int8_avg_accum(PG_FUNCTION_ARGS)
+{
+ ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0);
+ Datum newval8 = PG_GETARG_DATUM(1);
+ Numeric newval;
+
+ newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
+
+ PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
+}
+
+
Datum
numeric_avg(PG_FUNCTION_ARGS)
{
/* We assume the input is array of numeric */
deconstruct_array(transarray,
- false, -1, 'i',
- &transdatums, &ndatums);
- if (ndatums != 3)
- elog(ERROR, "numeric_avg: expected 3-element numeric array");
+ NUMERICOID, -1, false, 'i',
+ &transdatums, NULL, &ndatums);
+ if (ndatums != 2)
+ elog(ERROR, "expected 2-element numeric array");
N = DatumGetNumeric(transdatums[0]);
sumX = DatumGetNumeric(transdatums[1]);
- /* ignore sumX2 */
/* SQL92 defines AVG of no values to be NULL */
/* N is zero iff no digits (cf. numeric_uminus) */
- if (N->varlen == NUMERIC_HDRSZ)
+ if (VARSIZE(N) == NUMERIC_HDRSZ)
PG_RETURN_NULL();
PG_RETURN_DATUM(DirectFunctionCall2(numeric_div,
NumericGetDatum(N)));
}
-Datum
-numeric_variance(PG_FUNCTION_ARGS)
+/*
+ * Workhorse routine for the standard deviance and variance
+ * aggregates. 'transarray' is the aggregate's transition
+ * array. 'variance' specifies whether we should calculate the
+ * variance or the standard deviation. 'sample' indicates whether the
+ * caller is interested in the sample or the population
+ * variance/stddev.
+ *
+ * If appropriate variance statistic is undefined for the input,
+ * *is_null is set to true and NULL is returned.
+ */
+static Numeric
+numeric_stddev_internal(ArrayType *transarray,
+ bool variance, bool sample,
+ bool *is_null)
{
- ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0);
Datum *transdatums;
int ndatums;
Numeric N,
vsumX,
vsumX2,
vNminus1;
+ NumericVar *comp;
+ int rscale;
+
+ *is_null = false;
/* We assume the input is array of numeric */
deconstruct_array(transarray,
- false, -1, 'i',
- &transdatums, &ndatums);
+ NUMERICOID, -1, false, 'i',
+ &transdatums, NULL, &ndatums);
if (ndatums != 3)
- elog(ERROR, "numeric_variance: expected 3-element numeric array");
+ elog(ERROR, "expected 3-element numeric array");
N = DatumGetNumeric(transdatums[0]);
sumX = DatumGetNumeric(transdatums[1]);
sumX2 = DatumGetNumeric(transdatums[2]);
if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2))
- PG_RETURN_NUMERIC(make_result(&const_nan));
-
- /* We define VARIANCE of no values to be NULL, of 1 value to be 0 */
- /* N is zero iff no digits (cf. numeric_uminus) */
- if (N->varlen == NUMERIC_HDRSZ)
- PG_RETURN_NULL();
+ return make_result(&const_nan);
init_var(&vN);
set_var_from_num(N, &vN);
- init_var(&vNminus1);
- sub_var(&vN, &const_one, &vNminus1);
+ /*
+ * Sample stddev and variance are undefined when N <= 1; population stddev
+ * is undefined when N == 0. Return NULL in either case.
+ */
+ if (sample)
+ comp = &const_one;
+ else
+ comp = &const_zero;
- if (cmp_var(&vNminus1, &const_zero) <= 0)
+ if (cmp_var(&vN, comp) <= 0)
{
free_var(&vN);
- free_var(&vNminus1);
- PG_RETURN_NUMERIC(make_result(&const_zero));
+ *is_null = true;
+ return NULL;
}
+ init_var(&vNminus1);
+ sub_var(&vN, &const_one, &vNminus1);
+
init_var(&vsumX);
set_var_from_num(sumX, &vsumX);
init_var(&vsumX2);
set_var_from_num(sumX2, &vsumX2);
- mul_var(&vsumX, &vsumX, &vsumX); /* now vsumX contains sumX * sumX */
- mul_var(&vN, &vsumX2, &vsumX2); /* now vsumX2 contains N * sumX2 */
+ /* compute rscale for mul_var calls */
+ rscale = vsumX.dscale * 2;
+
+ mul_var(&vsumX, &vsumX, &vsumX, rscale); /* vsumX = sumX * sumX */
+ mul_var(&vN, &vsumX2, &vsumX2, rscale); /* vsumX2 = N * sumX2 */
sub_var(&vsumX2, &vsumX, &vsumX2); /* N * sumX2 - sumX * sumX */
- mul_var(&vN, &vNminus1, &vNminus1); /* N * (N - 1) */
- div_var(&vsumX2, &vNminus1, &vsumX); /* variance */
- res = make_result(&vsumX);
+ if (cmp_var(&vsumX2, &const_zero) <= 0)
+ {
+ /* Watch out for roundoff error producing a negative numerator */
+ res = make_result(&const_zero);
+ }
+ else
+ {
+ if (sample)
+ mul_var(&vN, &vNminus1, &vNminus1, 0); /* N * (N - 1) */
+ else
+ mul_var(&vN, &vN, &vNminus1, 0); /* N * N */
+ rscale = select_div_scale(&vsumX2, &vNminus1);
+ div_var(&vsumX2, &vNminus1, &vsumX, rscale, true); /* variance */
+ if (!variance)
+ sqrt_var(&vsumX, &vsumX, rscale); /* stddev */
+
+ res = make_result(&vsumX);
+ }
free_var(&vN);
free_var(&vNminus1);
free_var(&vsumX);
free_var(&vsumX2);
- PG_RETURN_NUMERIC(res);
+ return res;
}
Datum
-numeric_stddev(PG_FUNCTION_ARGS)
+numeric_var_samp(PG_FUNCTION_ARGS)
{
- ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0);
- Datum *transdatums;
- int ndatums;
- Numeric N,
- sumX,
- sumX2,
- res;
- NumericVar vN,
- vsumX,
- vsumX2,
- vNminus1;
-
- /* We assume the input is array of numeric */
- deconstruct_array(transarray,
- false, -1, 'i',
- &transdatums, &ndatums);
- if (ndatums != 3)
- elog(ERROR, "numeric_stddev: expected 3-element numeric array");
- N = DatumGetNumeric(transdatums[0]);
- sumX = DatumGetNumeric(transdatums[1]);
- sumX2 = DatumGetNumeric(transdatums[2]);
+ Numeric res;
+ bool is_null;
- if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2))
- PG_RETURN_NUMERIC(make_result(&const_nan));
+ res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
+ true, true, &is_null);
- /* We define STDDEV of no values to be NULL, of 1 value to be 0 */
- /* N is zero iff no digits (cf. numeric_uminus) */
- if (N->varlen == NUMERIC_HDRSZ)
+ if (is_null)
PG_RETURN_NULL();
+ else
+ PG_RETURN_NUMERIC(res);
+}
- init_var(&vN);
- set_var_from_num(N, &vN);
+Datum
+numeric_stddev_samp(PG_FUNCTION_ARGS)
+{
+ Numeric res;
+ bool is_null;
- init_var(&vNminus1);
- sub_var(&vN, &const_one, &vNminus1);
+ res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
+ false, true, &is_null);
- if (cmp_var(&vNminus1, &const_zero) <= 0)
- {
- free_var(&vN);
- free_var(&vNminus1);
- PG_RETURN_NUMERIC(make_result(&const_zero));
- }
+ if (is_null)
+ PG_RETURN_NULL();
+ else
+ PG_RETURN_NUMERIC(res);
+}
- init_var(&vsumX);
- set_var_from_num(sumX, &vsumX);
- init_var(&vsumX2);
- set_var_from_num(sumX2, &vsumX2);
+Datum
+numeric_var_pop(PG_FUNCTION_ARGS)
+{
+ Numeric res;
+ bool is_null;
- mul_var(&vsumX, &vsumX, &vsumX); /* now vsumX contains sumX * sumX */
- mul_var(&vN, &vsumX2, &vsumX2); /* now vsumX2 contains N * sumX2 */
- sub_var(&vsumX2, &vsumX, &vsumX2); /* N * sumX2 - sumX * sumX */
- mul_var(&vN, &vNminus1, &vNminus1); /* N * (N - 1) */
- div_var(&vsumX2, &vNminus1, &vsumX); /* variance */
- sqrt_var(&vsumX, &vsumX); /* stddev */
+ res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
+ true, false, &is_null);
- res = make_result(&vsumX);
+ if (is_null)
+ PG_RETURN_NULL();
+ else
+ PG_RETURN_NUMERIC(res);
+}
- free_var(&vN);
- free_var(&vNminus1);
- free_var(&vsumX);
- free_var(&vsumX2);
+Datum
+numeric_stddev_pop(PG_FUNCTION_ARGS)
+{
+ Numeric res;
+ bool is_null;
- PG_RETURN_NUMERIC(res);
-}
+ res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
+ false, false, &is_null);
+ if (is_null)
+ PG_RETURN_NULL();
+ else
+ PG_RETURN_NUMERIC(res);
+}
/*
* SUM transition functions for integer datatypes.
*
- * We use a Numeric accumulator to avoid overflow. Because SQL92 defines
- * the SUM() of no values to be NULL, not zero, the initial condition of
- * the transition data value needs to be NULL. This means we can't rely
- * on ExecAgg to automatically insert the first non-null data value into
- * the transition data: it doesn't know how to do the type conversion.
- * The upshot is that these routines have to be marked non-strict and
- * handle substitution of the first non-null input themselves.
+ * To avoid overflow, we use accumulators wider than the input datatype.
+ * A Numeric accumulator is needed for int8 input; for int4 and int2
+ * inputs, we use int8 accumulators which should be sufficient for practical
+ * purposes. (The latter two therefore don't really belong in this file,
+ * but we keep them here anyway.)
+ *
+ * Because SQL92 defines the SUM() of no values to be NULL, not zero,
+ * the initial condition of the transition data value needs to be NULL. This
+ * means we can't rely on ExecAgg to automatically insert the first non-null
+ * data value into the transition data: it doesn't know how to do the type
+ * conversion. The upshot is that these routines have to be marked non-strict
+ * and handle substitution of the first non-null input themselves.
*/
Datum
int2_sum(PG_FUNCTION_ARGS)
{
- Numeric oldsum;
- Datum newval;
+ int64 newval;
if (PG_ARGISNULL(0))
{
if (PG_ARGISNULL(1))
PG_RETURN_NULL(); /* still no non-null */
/* This is the first non-null input. */
- newval = DirectFunctionCall1(int2_numeric, PG_GETARG_DATUM(1));
- PG_RETURN_DATUM(newval);
+ newval = (int64) PG_GETARG_INT16(1);
+ PG_RETURN_INT64(newval);
}
- oldsum = PG_GETARG_NUMERIC(0);
+ /*
+ * If we're invoked by nodeAgg, we can cheat and modify our first
+ * parameter in-place to avoid palloc overhead. If not, we need to return
+ * the new value of the transition variable. (If int8 is pass-by-value,
+ * then of course this is useless as well as incorrect, so just ifdef it
+ * out.)
+ */
+#ifndef USE_FLOAT8_BYVAL /* controls int8 too */
+ if (fcinfo->context &&
+ (IsA(fcinfo->context, AggState) ||
+ IsA(fcinfo->context, WindowAggState)))
+ {
+ int64 *oldsum = (int64 *) PG_GETARG_POINTER(0);
- /* Leave sum unchanged if new input is null. */
- if (PG_ARGISNULL(1))
- PG_RETURN_NUMERIC(oldsum);
+ /* Leave the running sum unchanged in the new input is null */
+ if (!PG_ARGISNULL(1))
+ *oldsum = *oldsum + (int64) PG_GETARG_INT16(1);
- /* OK to do the addition. */
- newval = DirectFunctionCall1(int2_numeric, PG_GETARG_DATUM(1));
+ PG_RETURN_POINTER(oldsum);
+ }
+ else
+#endif
+ {
+ int64 oldsum = PG_GETARG_INT64(0);
- PG_RETURN_DATUM(DirectFunctionCall2(numeric_add,
- NumericGetDatum(oldsum), newval));
+ /* Leave sum unchanged if new input is null. */
+ if (PG_ARGISNULL(1))
+ PG_RETURN_INT64(oldsum);
+
+ /* OK to do the addition. */
+ newval = oldsum + (int64) PG_GETARG_INT16(1);
+
+ PG_RETURN_INT64(newval);
+ }
}
Datum
int4_sum(PG_FUNCTION_ARGS)
{
- Numeric oldsum;
- Datum newval;
+ int64 newval;
if (PG_ARGISNULL(0))
{
if (PG_ARGISNULL(1))
PG_RETURN_NULL(); /* still no non-null */
/* This is the first non-null input. */
- newval = DirectFunctionCall1(int4_numeric, PG_GETARG_DATUM(1));
- PG_RETURN_DATUM(newval);
+ newval = (int64) PG_GETARG_INT32(1);
+ PG_RETURN_INT64(newval);
}
- oldsum = PG_GETARG_NUMERIC(0);
+ /*
+ * If we're invoked by nodeAgg, we can cheat and modify our first
+ * parameter in-place to avoid palloc overhead. If not, we need to return
+ * the new value of the transition variable. (If int8 is pass-by-value,
+ * then of course this is useless as well as incorrect, so just ifdef it
+ * out.)
+ */
+#ifndef USE_FLOAT8_BYVAL /* controls int8 too */
+ if (fcinfo->context &&
+ (IsA(fcinfo->context, AggState) ||
+ IsA(fcinfo->context, WindowAggState)))
+ {
+ int64 *oldsum = (int64 *) PG_GETARG_POINTER(0);
- /* Leave sum unchanged if new input is null. */
- if (PG_ARGISNULL(1))
- PG_RETURN_NUMERIC(oldsum);
+ /* Leave the running sum unchanged in the new input is null */
+ if (!PG_ARGISNULL(1))
+ *oldsum = *oldsum + (int64) PG_GETARG_INT32(1);
- /* OK to do the addition. */
- newval = DirectFunctionCall1(int4_numeric, PG_GETARG_DATUM(1));
+ PG_RETURN_POINTER(oldsum);
+ }
+ else
+#endif
+ {
+ int64 oldsum = PG_GETARG_INT64(0);
- PG_RETURN_DATUM(DirectFunctionCall2(numeric_add,
- NumericGetDatum(oldsum), newval));
+ /* Leave sum unchanged if new input is null. */
+ if (PG_ARGISNULL(1))
+ PG_RETURN_INT64(oldsum);
+
+ /* OK to do the addition. */
+ newval = oldsum + (int64) PG_GETARG_INT32(1);
+
+ PG_RETURN_INT64(newval);
+ }
}
Datum
PG_RETURN_DATUM(newval);
}
+ /*
+ * Note that we cannot special-case the nodeAgg case here, as we do for
+ * int2_sum and int4_sum: numeric is of variable size, so we cannot modify
+ * our first parameter in-place.
+ */
+
oldsum = PG_GETARG_NUMERIC(0);
/* Leave sum unchanged if new input is null. */
}
+/*
+ * Routines for avg(int2) and avg(int4). The transition datatype
+ * is a two-element int8 array, holding count and sum.
+ */
+
+typedef struct Int8TransTypeData
+{
+ int64 count;
+ int64 sum;
+} Int8TransTypeData;
+
+Datum
+int2_avg_accum(PG_FUNCTION_ARGS)
+{
+ ArrayType *transarray;
+ int16 newval = PG_GETARG_INT16(1);
+ Int8TransTypeData *transdata;
+
+ /*
+ * If we're invoked by nodeAgg, we can cheat and modify our first
+ * parameter in-place to reduce palloc overhead. Otherwise we need to make
+ * a copy of it before scribbling on it.
+ */
+ if (fcinfo->context &&
+ (IsA(fcinfo->context, AggState) ||
+ IsA(fcinfo->context, WindowAggState)))
+ transarray = PG_GETARG_ARRAYTYPE_P(0);
+ else
+ transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
+
+ if (ARR_HASNULL(transarray) ||
+ ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
+ elog(ERROR, "expected 2-element int8 array");
+
+ transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
+ transdata->count++;
+ transdata->sum += newval;
+
+ PG_RETURN_ARRAYTYPE_P(transarray);
+}
+
+Datum
+int4_avg_accum(PG_FUNCTION_ARGS)
+{
+ ArrayType *transarray;
+ int32 newval = PG_GETARG_INT32(1);
+ Int8TransTypeData *transdata;
+
+ /*
+ * If we're invoked by nodeAgg, we can cheat and modify our first
+ * parameter in-place to reduce palloc overhead. Otherwise we need to make
+ * a copy of it before scribbling on it.
+ */
+ if (fcinfo->context &&
+ (IsA(fcinfo->context, AggState) ||
+ IsA(fcinfo->context, WindowAggState)))
+ transarray = PG_GETARG_ARRAYTYPE_P(0);
+ else
+ transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
+
+ if (ARR_HASNULL(transarray) ||
+ ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
+ elog(ERROR, "expected 2-element int8 array");
+
+ transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
+ transdata->count++;
+ transdata->sum += newval;
+
+ PG_RETURN_ARRAYTYPE_P(transarray);
+}
+
+Datum
+int8_avg(PG_FUNCTION_ARGS)
+{
+ ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0);
+ Int8TransTypeData *transdata;
+ Datum countd,
+ sumd;
+
+ if (ARR_HASNULL(transarray) ||
+ ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
+ elog(ERROR, "expected 2-element int8 array");
+ transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
+
+ /* SQL92 defines AVG of no values to be NULL */
+ if (transdata->count == 0)
+ PG_RETURN_NULL();
+
+ countd = DirectFunctionCall1(int8_numeric,
+ Int64GetDatumFast(transdata->count));
+ sumd = DirectFunctionCall1(int8_numeric,
+ Int64GetDatumFast(transdata->sum));
+
+ PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumd, countd));
+}
+
+
/* ----------------------------------------------------------------------
*
- * Local functions follow
+ * Debug support
*
* ----------------------------------------------------------------------
*/
-
#ifdef NUMERIC_DEBUG
-/* ----------
+/*
* dump_numeric() - Dump a value in the db storage format for debugging
- * ----------
*/
static void
-dump_numeric(char *str, Numeric num)
+dump_numeric(const char *str, Numeric num)
{
+ NumericDigit *digits = NUMERIC_DIGITS(num);
+ int ndigits;
int i;
- printf("%s: NUMERIC w=%d r=%d d=%d ", str, num->n_weight, num->n_rscale,
- NUMERIC_DSCALE(num));
+ ndigits = NUMERIC_NDIGITS(num);
+
+ printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num));
switch (NUMERIC_SIGN(num))
{
case NUMERIC_POS:
break;
}
- for (i = 0; i < num->varlen - NUMERIC_HDRSZ; i++)
- printf(" %d %d", (num->n_data[i] >> 4) & 0x0f, num->n_data[i] & 0x0f);
+ for (i = 0; i < ndigits; i++)
+ printf(" %0*d", DEC_DIGITS, digits[i]);
printf("\n");
}
-/* ----------
+/*
* dump_var() - Dump a value in the variable format for debugging
- * ----------
*/
static void
-dump_var(char *str, NumericVar *var)
+dump_var(const char *str, NumericVar *var)
{
int i;
- printf("%s: VAR w=%d r=%d d=%d ", str, var->weight, var->rscale,
- var->dscale);
+ printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale);
switch (var->sign)
{
case NUMERIC_POS:
}
for (i = 0; i < var->ndigits; i++)
- printf(" %d", var->digits[i]);
+ printf(" %0*d", DEC_DIGITS, var->digits[i]);
printf("\n");
}
+#endif /* NUMERIC_DEBUG */
-#endif /* NUMERIC_DEBUG */
+
+/* ----------------------------------------------------------------------
+ *
+ * Local functions follow
+ *
+ * In general, these do not support NaNs --- callers must eliminate
+ * the possibility of NaN first. (make_result() is an exception.)
+ *
+ * ----------------------------------------------------------------------
+ */
-/* ----------
+/*
* alloc_var() -
*
* Allocate a digit buffer of ndigits digits (plus a spare digit for rounding)
- * ----------
*/
static void
alloc_var(NumericVar *var, int ndigits)
{
digitbuf_free(var->buf);
var->buf = digitbuf_alloc(ndigits + 1);
- var->buf[0] = 0;
+ var->buf[0] = 0; /* spare digit for rounding */
var->digits = var->buf + 1;
var->ndigits = ndigits;
}
-/* ----------
+/*
* free_var() -
*
* Return the digit buffer of a variable to the free pool
- * ----------
*/
static void
free_var(NumericVar *var)
}
-/* ----------
+/*
* zero_var() -
*
* Set a variable to ZERO.
- * Note: rscale and dscale are not touched.
- * ----------
+ * Note: its dscale is not touched.
*/
static void
zero_var(NumericVar *var)
}
-/* ----------
+/*
* set_var_from_str()
*
* Parse a string and put the number into a variable
- * ----------
+ *
+ * This function does not handle leading or trailing spaces, and it doesn't
+ * accept "NaN" either. It returns the end+1 position so that caller can
+ * check for trailing spaces/garbage if deemed necessary.
+ *
+ * cp is the place to actually start parsing; str is what to use in error
+ * reports. (Typically cp would be the same except advanced over spaces.)
*/
-static void
-set_var_from_str(char *str, NumericVar *dest)
+static const char *
+set_var_from_str(const char *str, const char *cp, NumericVar *dest)
{
- char *cp = str;
bool have_dp = FALSE;
- int i = 0;
-
- while (*cp)
- {
- if (!isspace((unsigned char) *cp))
- break;
- cp++;
- }
-
- alloc_var(dest, strlen(cp));
- dest->weight = -1;
- dest->dscale = 0;
- dest->sign = NUMERIC_POS;
+ int i;
+ unsigned char *decdigits;
+ int sign = NUMERIC_POS;
+ int dweight = -1;
+ int ddigits;
+ int dscale = 0;
+ int weight;
+ int ndigits;
+ int offset;
+ NumericDigit *digits;
+ /*
+ * We first parse the string to extract decimal digits and determine the
+ * correct decimal weight. Then convert to NBASE representation.
+ */
switch (*cp)
{
case '+':
- dest->sign = NUMERIC_POS;
+ sign = NUMERIC_POS;
cp++;
break;
case '-':
- dest->sign = NUMERIC_NEG;
+ sign = NUMERIC_NEG;
cp++;
break;
}
}
if (!isdigit((unsigned char) *cp))
- elog(ERROR, "Bad numeric input format '%s'", str);
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid input syntax for type numeric: \"%s\"", str)));
+
+ decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS * 2);
+
+ /* leading padding for digit alignment later */
+ memset(decdigits, 0, DEC_DIGITS);
+ i = DEC_DIGITS;
while (*cp)
{
if (isdigit((unsigned char) *cp))
{
- dest->digits[i++] = *cp++ - '0';
+ decdigits[i++] = *cp++ - '0';
if (!have_dp)
- dest->weight++;
+ dweight++;
else
- dest->dscale++;
+ dscale++;
}
else if (*cp == '.')
{
if (have_dp)
- elog(ERROR, "Bad numeric input format '%s'", str);
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid input syntax for type numeric: \"%s\"",
+ str)));
have_dp = TRUE;
cp++;
}
else
break;
}
- dest->ndigits = i;
+
+ ddigits = i - DEC_DIGITS;
+ /* trailing padding for digit alignment later */
+ memset(decdigits + i, 0, DEC_DIGITS - 1);
/* Handle exponent, if any */
if (*cp == 'e' || *cp == 'E')
cp++;
exponent = strtol(cp, &endptr, 10);
if (endptr == cp)
- elog(ERROR, "Bad numeric input format '%s'", str);
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid input syntax for type numeric: \"%s\"",
+ str)));
cp = endptr;
if (exponent > NUMERIC_MAX_PRECISION ||
exponent < -NUMERIC_MAX_PRECISION)
- elog(ERROR, "Bad numeric input format '%s'", str);
- dest->weight += (int) exponent;
- dest->dscale -= (int) exponent;
- if (dest->dscale < 0)
- dest->dscale = 0;
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid input syntax for type numeric: \"%s\"",
+ str)));
+ dweight += (int) exponent;
+ dscale -= (int) exponent;
+ if (dscale < 0)
+ dscale = 0;
}
- /* Should be nothing left but spaces */
- while (*cp)
- {
- if (!isspace((unsigned char) *cp))
- elog(ERROR, "Bad numeric input format '%s'", str);
- cp++;
- }
+ /*
+ * Okay, convert pure-decimal representation to base NBASE. First we need
+ * to determine the converted weight and ndigits. offset is the number of
+ * decimal zeroes to insert before the first given digit to have a
+ * correctly aligned first NBASE digit.
+ */
+ if (dweight >= 0)
+ weight = (dweight + 1 + DEC_DIGITS - 1) / DEC_DIGITS - 1;
+ else
+ weight = -((-dweight - 1) / DEC_DIGITS + 1);
+ offset = (weight + 1) * DEC_DIGITS - (dweight + 1);
+ ndigits = (ddigits + offset + DEC_DIGITS - 1) / DEC_DIGITS;
+
+ alloc_var(dest, ndigits);
+ dest->sign = sign;
+ dest->weight = weight;
+ dest->dscale = dscale;
- /* Strip any leading zeroes */
- while (dest->ndigits > 0 && *(dest->digits) == 0)
+ i = DEC_DIGITS - offset;
+ digits = dest->digits;
+
+ while (ndigits-- > 0)
{
- (dest->digits)++;
- (dest->weight)--;
- (dest->ndigits)--;
+#if DEC_DIGITS == 4
+ *digits++ = ((decdigits[i] * 10 + decdigits[i + 1]) * 10 +
+ decdigits[i + 2]) * 10 + decdigits[i + 3];
+#elif DEC_DIGITS == 2
+ *digits++ = decdigits[i] * 10 + decdigits[i + 1];
+#elif DEC_DIGITS == 1
+ *digits++ = decdigits[i];
+#else
+#error unsupported NBASE
+#endif
+ i += DEC_DIGITS;
}
- if (dest->ndigits == 0)
- dest->weight = 0;
- dest->rscale = dest->dscale;
+ pfree(decdigits);
+
+ /* Strip any leading/trailing zeroes, and normalize weight if zero */
+ strip_var(dest);
+
+ /* Return end+1 position for caller */
+ return cp;
}
/*
* set_var_from_num() -
*
- * Parse back the packed db format into a variable
- *
+ * Convert the packed db format into a variable
*/
static void
set_var_from_num(Numeric num, NumericVar *dest)
{
- NumericDigit *digit;
- int i;
- int n;
+ int ndigits;
- n = num->varlen - NUMERIC_HDRSZ; /* number of digit-pairs in packed
- * fmt */
+ ndigits = NUMERIC_NDIGITS(num);
- alloc_var(dest, n * 2);
+ alloc_var(dest, ndigits);
dest->weight = num->n_weight;
- dest->rscale = num->n_rscale;
- dest->dscale = NUMERIC_DSCALE(num);
dest->sign = NUMERIC_SIGN(num);
+ dest->dscale = NUMERIC_DSCALE(num);
- digit = dest->digits;
-
- for (i = 0; i < n; i++)
- {
- unsigned char digitpair = num->n_data[i];
-
- *digit++ = (digitpair >> 4) & 0x0f;
- *digit++ = digitpair & 0x0f;
- }
+ memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit));
}
-/* ----------
+/*
* set_var_from_var() -
*
* Copy one variable into another
- * ----------
*/
static void
set_var_from_var(NumericVar *value, NumericVar *dest)
newbuf = digitbuf_alloc(value->ndigits + 1);
newbuf[0] = 0; /* spare digit for rounding */
- memcpy(newbuf + 1, value->digits, value->ndigits);
+ memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit));
digitbuf_free(dest->buf);
- memcpy(dest, value, sizeof(NumericVar));
+ memmove(dest, value, sizeof(NumericVar));
dest->buf = newbuf;
dest->digits = newbuf + 1;
}
-/* ----------
+/*
* get_str_from_var() -
*
* Convert a var to text representation (guts of numeric_out).
* CAUTION: var's contents may be modified by rounding!
- * Caller must have checked for NaN case.
* Returns a palloc'd string.
- * ----------
*/
static char *
get_str_from_var(NumericVar *var, int dscale)
{
char *str;
char *cp;
+ char *endcp;
int i;
int d;
+ NumericDigit dig;
+
+#if DEC_DIGITS > 1
+ NumericDigit d1;
+#endif
+
+ if (dscale < 0)
+ dscale = 0;
/*
* Check if we must round up before printing the value and do so.
*/
- i = dscale + var->weight + 1;
- if (i >= 0 && var->ndigits > i)
- {
- int carry = (var->digits[i] > 4) ? 1 : 0;
-
- var->ndigits = i;
-
- while (carry)
- {
- carry += var->digits[--i];
- var->digits[i] = carry % 10;
- carry /= 10;
- }
-
- if (i < 0)
- {
- Assert(i == -1); /* better not have added more than 1 digit */
- Assert(var->digits > var->buf);
- var->digits--;
- var->ndigits++;
- var->weight++;
- }
- }
- else
- var->ndigits = MAX(0, MIN(i, var->ndigits));
+ round_var(var, dscale);
/*
- * Allocate space for the result
+ * Allocate space for the result.
+ *
+ * i is set to to # of decimal digits before decimal point. dscale is the
+ * # of decimal digits we will print after decimal point. We may generate
+ * as many as DEC_DIGITS-1 excess digits at the end, and in addition we
+ * need room for sign, decimal point, null terminator.
*/
- str = palloc(MAX(0, dscale) + MAX(0, var->weight) + 4);
+ i = (var->weight + 1) * DEC_DIGITS;
+ if (i <= 0)
+ i = 1;
+
+ str = palloc(i + dscale + DEC_DIGITS + 2);
cp = str;
/*
/*
* Output all digits before the decimal point
*/
- i = MAX(var->weight, 0);
- d = 0;
-
- while (i >= 0)
+ if (var->weight < 0)
{
- if (i <= var->weight && d < var->ndigits)
- *cp++ = var->digits[d++] + '0';
- else
- *cp++ = '0';
- i--;
+ d = var->weight + 1;
+ *cp++ = '0';
+ }
+ else
+ {
+ for (d = 0; d <= var->weight; d++)
+ {
+ dig = (d < var->ndigits) ? var->digits[d] : 0;
+ /* In the first digit, suppress extra leading decimal zeroes */
+#if DEC_DIGITS == 4
+ {
+ bool putit = (d > 0);
+
+ d1 = dig / 1000;
+ dig -= d1 * 1000;
+ putit |= (d1 > 0);
+ if (putit)
+ *cp++ = d1 + '0';
+ d1 = dig / 100;
+ dig -= d1 * 100;
+ putit |= (d1 > 0);
+ if (putit)
+ *cp++ = d1 + '0';
+ d1 = dig / 10;
+ dig -= d1 * 10;
+ putit |= (d1 > 0);
+ if (putit)
+ *cp++ = d1 + '0';
+ *cp++ = dig + '0';
+ }
+#elif DEC_DIGITS == 2
+ d1 = dig / 10;
+ dig -= d1 * 10;
+ if (d1 > 0 || d > 0)
+ *cp++ = d1 + '0';
+ *cp++ = dig + '0';
+#elif DEC_DIGITS == 1
+ *cp++ = dig + '0';
+#else
+#error unsupported NBASE
+#endif
+ }
}
/*
- * If requested, output a decimal point and all the digits that follow
- * it.
+ * If requested, output a decimal point and all the digits that follow it.
+ * We initially put out a multiple of DEC_DIGITS digits, then truncate if
+ * needed.
*/
if (dscale > 0)
{
*cp++ = '.';
- while (i >= -dscale)
+ endcp = cp + dscale;
+ for (i = 0; i < dscale; d++, i += DEC_DIGITS)
{
- if (i <= var->weight && d < var->ndigits)
- *cp++ = var->digits[d++] + '0';
- else
- *cp++ = '0';
- i--;
+ dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0;
+#if DEC_DIGITS == 4
+ d1 = dig / 1000;
+ dig -= d1 * 1000;
+ *cp++ = d1 + '0';
+ d1 = dig / 100;
+ dig -= d1 * 100;
+ *cp++ = d1 + '0';
+ d1 = dig / 10;
+ dig -= d1 * 10;
+ *cp++ = d1 + '0';
+ *cp++ = dig + '0';
+#elif DEC_DIGITS == 2
+ d1 = dig / 10;
+ dig -= d1 * 10;
+ *cp++ = d1 + '0';
+ *cp++ = dig + '0';
+#elif DEC_DIGITS == 1
+ *cp++ = dig + '0';
+#else
+#error unsupported NBASE
+#endif
}
+ cp = endcp;
}
/*
return str;
}
+/*
+ * get_str_from_var_sci() -
+ *
+ * Convert a var to a normalised scientific notation text representation.
+ * This function does the heavy lifting for numeric_out_sci().
+ *
+ * This notation has the general form a * 10^b, where a is known as the
+ * "significand" and b is known as the "exponent".
+ *
+ * Because we can't do superscript in ASCII (and because we want to copy
+ * printf's behaviour) we display the exponent using E notation, with a
+ * minimum of two exponent digits.
+ *
+ * For example, the value 1234 could be output as 1.2e+03.
+ *
+ * We assume that the exponent can fit into an int32.
+ *
+ * rscale is the number of decimal digits desired after the decimal point in
+ * the output, negative values will be treated as meaning zero.
+ *
+ * CAUTION: var's contents may be modified by rounding!
+ *
+ * Returns a palloc'd string.
+ */
+static char *
+get_str_from_var_sci(NumericVar *var, int rscale)
+{
+ int32 exponent;
+ NumericVar denominator;
+ NumericVar significand;
+ int denom_scale;
+ size_t len;
+ char *str;
+ char *sig_out;
+
+ if (rscale < 0)
+ rscale = 0;
-/* ----------
+ /*
+ * Determine the exponent of this number in normalised form.
+ *
+ * This is the exponent required to represent the number with only one
+ * significant digit before the decimal place.
+ */
+ if (var->ndigits > 0)
+ {
+ exponent = (var->weight + 1) * DEC_DIGITS;
+
+ /*
+ * Compensate for leading decimal zeroes in the first numeric digit by
+ * decrementing the exponent.
+ */
+ exponent -= DEC_DIGITS - (int) log10(var->digits[0]);
+ }
+ else
+ {
+ /*
+ * If var has no digits, then it must be zero.
+ *
+ * Zero doesn't technically have a meaningful exponent in normalised
+ * notation, but we just display the exponent as zero for consistency
+ * of output.
+ */
+ exponent = 0;
+ }
+
+ /*
+ * The denominator is set to 10 raised to the power of the exponent.
+ *
+ * We then divide var by the denominator to get the significand, rounding
+ * to rscale decimal digits in the process.
+ */
+ if (exponent < 0)
+ denom_scale = -exponent;
+ else
+ denom_scale = 0;
+
+ init_var(&denominator);
+ init_var(&significand);
+
+ int8_to_numericvar((int64) 10, &denominator);
+ power_var_int(&denominator, exponent, &denominator, denom_scale);
+ div_var(var, &denominator, &significand, rscale, true);
+ sig_out = get_str_from_var(&significand, rscale);
+
+ free_var(&denominator);
+ free_var(&significand);
+
+ /*
+ * Allocate space for the result.
+ *
+ * In addition to the significand, we need room for the exponent decoration
+ * ("e"), the sign of the exponent, up to 10 digits for the exponent
+ * itself, and of course the null terminator.
+ */
+ len = strlen(sig_out) + 13;
+ str = palloc(len);
+ snprintf(str, len, "%se%+03d", sig_out, exponent);
+
+ pfree(sig_out);
+
+ return str;
+}
+
+
+/*
* make_result() -
*
* Create the packed db numeric format in palloc()'d memory from
- * a variable. The var's rscale determines the number of digits kept.
- * ----------
+ * a variable.
*/
static Numeric
make_result(NumericVar *var)
{
Numeric result;
- NumericDigit *digit = var->digits;
+ NumericDigit *digits = var->digits;
int weight = var->weight;
int sign = var->sign;
int n;
- int i,
- j;
+ Size len;
if (sign == NUMERIC_NAN)
{
result = (Numeric) palloc(NUMERIC_HDRSZ);
- result->varlen = NUMERIC_HDRSZ;
+ SET_VARSIZE(result, NUMERIC_HDRSZ);
result->n_weight = 0;
- result->n_rscale = 0;
result->n_sign_dscale = NUMERIC_NAN;
dump_numeric("make_result()", result);
return result;
}
- n = MAX(0, MIN(var->ndigits, var->weight + var->rscale + 1));
+ n = var->ndigits;
/* truncate leading zeroes */
- while (n > 0 && *digit == 0)
+ while (n > 0 && *digits == 0)
{
- digit++;
+ digits++;
weight--;
n--;
}
/* truncate trailing zeroes */
- while (n > 0 && digit[n - 1] == 0)
+ while (n > 0 && digits[n - 1] == 0)
n--;
/* If zero result, force to weight=0 and positive sign */
sign = NUMERIC_POS;
}
- result = (Numeric) palloc(NUMERIC_HDRSZ + (n + 1) / 2);
- result->varlen = NUMERIC_HDRSZ + (n + 1) / 2;
+ /* Build the result */
+ len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
+ result = (Numeric) palloc(len);
+ SET_VARSIZE(result, len);
result->n_weight = weight;
- result->n_rscale = var->rscale;
- result->n_sign_dscale = sign |
- ((uint16) var->dscale & NUMERIC_DSCALE_MASK);
+ result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK);
- i = 0;
- j = 0;
- while (j < n)
- {
- unsigned char digitpair = digit[j++] << 4;
+ memcpy(result->n_data, digits, n * sizeof(NumericDigit));
- if (j < n)
- digitpair |= digit[j++];
- result->n_data[i++] = digitpair;
- }
+ /* Check for overflow of int16 fields */
+ if (result->n_weight != weight ||
+ NUMERIC_DSCALE(result) != var->dscale)
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("value overflows numeric format")));
dump_numeric("make_result()", result);
return result;
}
-/* ----------
+/*
* apply_typmod() -
*
* Do bounds checking and rounding according to the attributes
* typmod field.
- * ----------
*/
static void
apply_typmod(NumericVar *var, int32 typmod)
{
int precision;
int scale;
- int maxweight;
+ int maxdigits;
+ int ddigits;
int i;
/* Do nothing if we have a default typmod (-1) */
typmod -= VARHDRSZ;
precision = (typmod >> 16) & 0xffff;
scale = typmod & 0xffff;
- maxweight = precision - scale;
-
- /* Round to target scale */
- i = scale + var->weight + 1;
- if (i >= 0 && var->ndigits > i)
- {
- int carry = (var->digits[i] > 4) ? 1 : 0;
+ maxdigits = precision - scale;
- var->ndigits = i;
+ /* Round to target scale (and set var->dscale) */
+ round_var(var, scale);
- while (carry)
+ /*
+ * Check for overflow - note we can't do this before rounding, because
+ * rounding could raise the weight. Also note that the var's weight could
+ * be inflated by leading zeroes, which will be stripped before storage
+ * but perhaps might not have been yet. In any case, we must recognize a
+ * true zero, whose weight doesn't mean anything.
+ */
+ ddigits = (var->weight + 1) * DEC_DIGITS;
+ if (ddigits > maxdigits)
+ {
+ /* Determine true weight; and check for all-zero result */
+ for (i = 0; i < var->ndigits; i++)
{
- carry += var->digits[--i];
- var->digits[i] = carry % 10;
- carry /= 10;
- }
+ NumericDigit dig = var->digits[i];
- if (i < 0)
- {
- Assert(i == -1); /* better not have added more than 1 digit */
- Assert(var->digits > var->buf);
- var->digits--;
- var->ndigits++;
- var->weight++;
+ if (dig)
+ {
+ /* Adjust for any high-order decimal zero digits */
+#if DEC_DIGITS == 4
+ if (dig < 10)
+ ddigits -= 3;
+ else if (dig < 100)
+ ddigits -= 2;
+ else if (dig < 1000)
+ ddigits -= 1;
+#elif DEC_DIGITS == 2
+ if (dig < 10)
+ ddigits -= 1;
+#elif DEC_DIGITS == 1
+ /* no adjustment */
+#else
+#error unsupported NBASE
+#endif
+ if (ddigits > maxdigits)
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("numeric field overflow"),
+ errdetail("A field with precision %d, scale %d must round to an absolute value less than %s%d.",
+ precision, scale,
+ /* Display 10^0 as 1 */
+ maxdigits ? "10^" : "",
+ maxdigits ? maxdigits : 1
+ )));
+ break;
+ }
+ ddigits -= DEC_DIGITS;
}
}
- else
- var->ndigits = MAX(0, MIN(i, var->ndigits));
+}
+
+/*
+ * Convert numeric to int8, rounding if needed.
+ *
+ * If overflow, return FALSE (no error is raised). Return TRUE if okay.
+ *
+ * CAUTION: var's contents may be modified by rounding!
+ */
+static bool
+numericvar_to_int8(NumericVar *var, int64 *result)
+{
+ NumericDigit *digits;
+ int ndigits;
+ int weight;
+ int i;
+ int64 val,
+ oldval;
+ bool neg;
+
+ /* Round to nearest integer */
+ round_var(var, 0);
+
+ /* Check for zero input */
+ strip_var(var);
+ ndigits = var->ndigits;
+ if (ndigits == 0)
+ {
+ *result = 0;
+ return true;
+ }
/*
- * Check for overflow - note we can't do this before rounding, because
- * rounding could raise the weight. Also note that the var's weight
- * could be inflated by leading zeroes, which will be stripped before
- * storage but perhaps might not have been yet. In any case, we must
- * recognize a true zero, whose weight doesn't mean anything.
+ * For input like 10000000000, we must treat stripped digits as real. So
+ * the loop assumes there are weight+1 digits before the decimal point.
*/
- if (var->weight >= maxweight)
+ weight = var->weight;
+ Assert(weight >= 0 && ndigits <= weight + 1);
+
+ /* Construct the result */
+ digits = var->digits;
+ neg = (var->sign == NUMERIC_NEG);
+ val = digits[0];
+ for (i = 1; i <= weight; i++)
{
- /* Determine true weight; and check for all-zero result */
- int tweight = var->weight;
+ oldval = val;
+ val *= NBASE;
+ if (i < ndigits)
+ val += digits[i];
- for (i = 0; i < var->ndigits; i++)
+ /*
+ * The overflow check is a bit tricky because we want to accept
+ * INT64_MIN, which will overflow the positive accumulator. We can
+ * detect this case easily though because INT64_MIN is the only
+ * nonzero value for which -val == val (on a two's complement machine,
+ * anyway).
+ */
+ if ((val / NBASE) != oldval) /* possible overflow? */
{
- if (var->digits[i])
- break;
- tweight--;
+ if (!neg || (-val) != val || val == 0 || oldval < 0)
+ return false;
}
+ }
+
+ *result = neg ? -val : val;
+ return true;
+}
+
+/*
+ * Convert int8 value to numeric.
+ */
+static void
+int8_to_numericvar(int64 val, NumericVar *var)
+{
+ uint64 uval,
+ newuval;
+ NumericDigit *ptr;
+ int ndigits;
+
+ /* int8 can require at most 19 decimal digits; add one for safety */
+ alloc_var(var, 20 / DEC_DIGITS);
+ if (val < 0)
+ {
+ var->sign = NUMERIC_NEG;
+ uval = -val;
+ }
+ else
+ {
+ var->sign = NUMERIC_POS;
+ uval = val;
+ }
+ var->dscale = 0;
+ if (val == 0)
+ {
+ var->ndigits = 0;
+ var->weight = 0;
+ return;
+ }
+ ptr = var->digits + var->ndigits;
+ ndigits = 0;
+ do
+ {
+ ptr--;
+ ndigits++;
+ newuval = uval / NBASE;
+ *ptr = uval - newuval * NBASE;
+ uval = newuval;
+ } while (uval);
+ var->digits = ptr;
+ var->ndigits = ndigits;
+ var->weight = ndigits - 1;
+}
+
+/*
+ * Convert numeric to float8; if out of range, return +/- HUGE_VAL
+ */
+static double
+numeric_to_double_no_overflow(Numeric num)
+{
+ char *tmp;
+ double val;
+ char *endptr;
- if (tweight >= maxweight && i < var->ndigits)
- elog(ERROR, "overflow on numeric "
- "ABS(value) >= 10^%d for field with precision %d scale %d",
- tweight, precision, scale);
+ tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
+ NumericGetDatum(num)));
+
+ /* unlike float8in, we ignore ERANGE from strtod */
+ val = strtod(tmp, &endptr);
+ if (*endptr != '\0')
+ {
+ /* shouldn't happen ... */
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid input syntax for type double precision: \"%s\"",
+ tmp)));
}
- var->rscale = scale;
- var->dscale = scale;
+ pfree(tmp);
+
+ return val;
}
+/* As above, but work from a NumericVar */
+static double
+numericvar_to_double_no_overflow(NumericVar *var)
+{
+ char *tmp;
+ double val;
+ char *endptr;
+
+ tmp = get_str_from_var(var, var->dscale);
-/* ----------
+ /* unlike float8in, we ignore ERANGE from strtod */
+ val = strtod(tmp, &endptr);
+ if (*endptr != '\0')
+ {
+ /* shouldn't happen ... */
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid input syntax for type double precision: \"%s\"",
+ tmp)));
+ }
+
+ pfree(tmp);
+
+ return val;
+}
+
+
+/*
* cmp_var() -
*
- * Compare two values on variable level
- * ----------
+ * Compare two values on variable level. We assume zeroes have been
+ * truncated to no digits.
*/
static int
cmp_var(NumericVar *var1, NumericVar *var2)
{
- if (var1->ndigits == 0)
+ return cmp_var_common(var1->digits, var1->ndigits,
+ var1->weight, var1->sign,
+ var2->digits, var2->ndigits,
+ var2->weight, var2->sign);
+}
+
+/*
+ * cmp_var_common() -
+ *
+ * Main routine of cmp_var(). This function can be used by both
+ * NumericVar and Numeric.
+ */
+static int
+cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
+ int var1weight, int var1sign,
+ const NumericDigit *var2digits, int var2ndigits,
+ int var2weight, int var2sign)
+{
+ if (var1ndigits == 0)
{
- if (var2->ndigits == 0)
+ if (var2ndigits == 0)
return 0;
- if (var2->sign == NUMERIC_NEG)
+ if (var2sign == NUMERIC_NEG)
return 1;
return -1;
}
- if (var2->ndigits == 0)
+ if (var2ndigits == 0)
{
- if (var1->sign == NUMERIC_POS)
+ if (var1sign == NUMERIC_POS)
return 1;
return -1;
}
- if (var1->sign == NUMERIC_POS)
+ if (var1sign == NUMERIC_POS)
{
- if (var2->sign == NUMERIC_NEG)
+ if (var2sign == NUMERIC_NEG)
return 1;
- return cmp_abs(var1, var2);
+ return cmp_abs_common(var1digits, var1ndigits, var1weight,
+ var2digits, var2ndigits, var2weight);
}
- if (var2->sign == NUMERIC_POS)
+ if (var2sign == NUMERIC_POS)
return -1;
- return cmp_abs(var2, var1);
+ return cmp_abs_common(var2digits, var2ndigits, var2weight,
+ var1digits, var1ndigits, var1weight);
}
-/* ----------
+/*
* add_var() -
*
* Full version of add functionality on variable level (handling signs).
* result might point to one of the operands too without danger.
- * ----------
*/
static void
add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
{
-
/*
* Decide on the signs of the two variables what to do
*/
{
if (var2->sign == NUMERIC_POS)
{
-
/*
* Both are positive result = +(ABS(var1) + ABS(var2))
*/
}
else
{
-
/*
- * var1 is positive, var2 is negative Must compare absolute
- * values
+ * var1 is positive, var2 is negative Must compare absolute values
*/
switch (cmp_abs(var1, var2))
{
* ----------
*/
zero_var(result);
- result->rscale = MAX(var1->rscale, var2->rscale);
- result->dscale = MAX(var1->dscale, var2->dscale);
+ result->dscale = Max(var1->dscale, var2->dscale);
break;
case 1:
* ----------
*/
zero_var(result);
- result->rscale = MAX(var1->rscale, var2->rscale);
- result->dscale = MAX(var1->dscale, var2->dscale);
+ result->dscale = Max(var1->dscale, var2->dscale);
break;
case 1:
}
-/* ----------
+/*
* sub_var() -
*
* Full version of sub functionality on variable level (handling signs).
* result might point to one of the operands too without danger.
- * ----------
*/
static void
sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
{
-
/*
* Decide on the signs of the two variables what to do
*/
* ----------
*/
zero_var(result);
- result->rscale = MAX(var1->rscale, var2->rscale);
- result->dscale = MAX(var1->dscale, var2->dscale);
+ result->dscale = Max(var1->dscale, var2->dscale);
break;
case 1:
* ----------
*/
zero_var(result);
- result->rscale = MAX(var1->rscale, var2->rscale);
- result->dscale = MAX(var1->dscale, var2->dscale);
+ result->dscale = Max(var1->dscale, var2->dscale);
break;
case 1:
}
-/* ----------
+/*
* mul_var() -
*
* Multiplication on variable level. Product of var1 * var2 is stored
- * in result.
- * ----------
+ * in result. Result is rounded to no more than rscale fractional digits.
*/
static void
-mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
+mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
+ int rscale)
{
- NumericDigit *res_buf;
- NumericDigit *res_digits;
int res_ndigits;
- int res_weight;
int res_sign;
+ int res_weight;
+ int maxdigits;
+ int *dig;
+ int carry;
+ int maxdig;
+ int newdig;
+ NumericDigit *res_digits;
int i,
ri,
i1,
i2;
- long sum = 0;
- res_weight = var1->weight + var2->weight + 2;
- res_ndigits = var1->ndigits + var2->ndigits + 1;
+ /* copy these values into local vars for speed in inner loop */
+ int var1ndigits = var1->ndigits;
+ int var2ndigits = var2->ndigits;
+ NumericDigit *var1digits = var1->digits;
+ NumericDigit *var2digits = var2->digits;
+
+ if (var1ndigits == 0 || var2ndigits == 0)
+ {
+ /* one or both inputs is zero; so is result */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /* Determine result sign and (maximum possible) weight */
if (var1->sign == var2->sign)
res_sign = NUMERIC_POS;
else
res_sign = NUMERIC_NEG;
+ res_weight = var1->weight + var2->weight + 2;
- res_buf = digitbuf_alloc(res_ndigits);
- res_digits = res_buf;
- memset(res_digits, 0, res_ndigits);
-
- ri = res_ndigits;
- for (i1 = var1->ndigits - 1; i1 >= 0; i1--)
+ /*
+ * Determine number of result digits to compute. If the exact result
+ * would have more than rscale fractional digits, truncate the computation
+ * with MUL_GUARD_DIGITS guard digits. We do that by pretending that one
+ * or both inputs have fewer digits than they really do.
+ */
+ res_ndigits = var1ndigits + var2ndigits + 1;
+ maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS;
+ if (res_ndigits > maxdigits)
{
- sum = 0;
- i = --ri;
-
- for (i2 = var2->ndigits - 1; i2 >= 0; i2--)
+ if (maxdigits < 3)
+ {
+ /* no useful precision at all in the result... */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+ /* force maxdigits odd so that input ndigits can be equal */
+ if ((maxdigits & 1) == 0)
+ maxdigits++;
+ if (var1ndigits > var2ndigits)
+ {
+ var1ndigits -= res_ndigits - maxdigits;
+ if (var1ndigits < var2ndigits)
+ var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
+ }
+ else
{
- sum += res_digits[i] + var1->digits[i1] * var2->digits[i2];
- res_digits[i--] = sum % 10;
- sum /= 10;
+ var2ndigits -= res_ndigits - maxdigits;
+ if (var2ndigits < var1ndigits)
+ var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
}
- res_digits[i] = sum;
+ res_ndigits = maxdigits;
+ Assert(res_ndigits == var1ndigits + var2ndigits + 1);
}
- i = res_weight + global_rscale + 2;
- if (i >= 0 && i < res_ndigits)
+ /*
+ * We do the arithmetic in an array "dig[]" of signed int's. Since
+ * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
+ * to avoid normalizing carries immediately.
+ *
+ * maxdig tracks the maximum possible value of any dig[] entry; when this
+ * threatens to exceed INT_MAX, we take the time to propagate carries. To
+ * avoid overflow in maxdig itself, it actually represents the max
+ * possible value divided by NBASE-1.
+ */
+ dig = (int *) palloc0(res_ndigits * sizeof(int));
+ maxdig = 0;
+
+ ri = res_ndigits - 1;
+ for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--)
{
- sum = (res_digits[i] > 4) ? 1 : 0;
- res_ndigits = i;
- i--;
- while (sum)
+ int var1digit = var1digits[i1];
+
+ if (var1digit == 0)
+ continue;
+
+ /* Time to normalize? */
+ maxdig += var1digit;
+ if (maxdig > INT_MAX / (NBASE - 1))
{
- sum += res_digits[i];
- res_digits[i--] = sum % 10;
- sum /= 10;
+ /* Yes, do it */
+ carry = 0;
+ for (i = res_ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ dig[i] = newdig;
+ }
+ Assert(carry == 0);
+ /* Reset maxdig to indicate new worst-case */
+ maxdig = 1 + var1digit;
}
- }
- while (res_ndigits > 0 && *res_digits == 0)
- {
- res_digits++;
- res_weight--;
- res_ndigits--;
+ /* Add appropriate multiple of var2 into the accumulator */
+ i = ri;
+ for (i2 = var2ndigits - 1; i2 >= 0; i2--)
+ dig[i--] += var1digit * var2digits[i2];
}
- while (res_ndigits > 0 && res_digits[res_ndigits - 1] == 0)
- res_ndigits--;
- if (res_ndigits == 0)
+ /*
+ * Now we do a final carry propagation pass to normalize the result, which
+ * we combine with storing the result digits into the output. Note that
+ * this is still done at full precision w/guard digits.
+ */
+ alloc_var(result, res_ndigits);
+ res_digits = result->digits;
+ carry = 0;
+ for (i = res_ndigits - 1; i >= 0; i--)
{
- res_sign = NUMERIC_POS;
- res_weight = 0;
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ res_digits[i] = newdig;
}
+ Assert(carry == 0);
- digitbuf_free(result->buf);
- result->buf = res_buf;
- result->digits = res_digits;
- result->ndigits = res_ndigits;
+ pfree(dig);
+
+ /*
+ * Finally, round the result to the requested precision.
+ */
result->weight = res_weight;
- result->rscale = global_rscale;
result->sign = res_sign;
+
+ /* Round to target rscale (and set result->dscale) */
+ round_var(result, rscale);
+
+ /* Strip leading and trailing zeroes */
+ strip_var(result);
}
-/* ----------
+/*
* div_var() -
*
- * Division on variable level.
- * ----------
+ * Division on variable level. Quotient of var1 / var2 is stored in result.
+ * The quotient is figured to exactly rscale fractional digits.
+ * If round is true, it is rounded at the rscale'th digit; if false, it
+ * is truncated (towards zero) at that digit.
*/
static void
-div_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
+div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
+ int rscale, bool round)
{
- NumericDigit *res_digits;
+ int div_ndigits;
int res_ndigits;
int res_sign;
int res_weight;
- NumericVar dividend;
- NumericVar divisor[10];
- int ndigits_tmp;
- int weight_tmp;
- int rscale_tmp;
- int ri;
+ int carry;
+ int borrow;
+ int divisor1;
+ int divisor2;
+ NumericDigit *dividend;
+ NumericDigit *divisor;
+ NumericDigit *res_digits;
int i;
- long guess;
- long first_have;
- long first_div;
- int first_nextdigit;
- int stat = 0;
+ int j;
- /*
- * First of all division by zero check
- */
- ndigits_tmp = var2->ndigits + 1;
- if (ndigits_tmp == 1)
- elog(ERROR, "division by zero on numeric");
+ /* copy these values into local vars for speed in inner loop */
+ int var1ndigits = var1->ndigits;
+ int var2ndigits = var2->ndigits;
/*
- * Determine the result sign, weight and number of digits to calculate
+ * First of all division by zero check; we must not be handed an
+ * unnormalized divisor.
*/
- if (var1->sign == var2->sign)
- res_sign = NUMERIC_POS;
- else
- res_sign = NUMERIC_NEG;
- res_weight = var1->weight - var2->weight + 1;
- res_ndigits = global_rscale + res_weight;
- if (res_ndigits <= 0)
- res_ndigits = 1;
+ if (var2ndigits == 0 || var2->digits[0] == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DIVISION_BY_ZERO),
+ errmsg("division by zero")));
/*
* Now result zero check
*/
- if (var1->ndigits == 0)
+ if (var1ndigits == 0)
{
zero_var(result);
- result->rscale = global_rscale;
+ result->dscale = rscale;
return;
}
/*
- * Initialize local variables
+ * Determine the result sign, weight and number of digits to calculate.
+ * The weight figured here is correct if the emitted quotient has no
+ * leading zero digits; otherwise strip_var() will fix things up.
*/
- init_var(÷nd);
- for (i = 1; i < 10; i++)
- init_var(&divisor[i]);
+ if (var1->sign == var2->sign)
+ res_sign = NUMERIC_POS;
+ else
+ res_sign = NUMERIC_NEG;
+ res_weight = var1->weight - var2->weight;
+ /* The number of accurate result digits we need to produce: */
+ res_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
+ /* ... but always at least 1 */
+ res_ndigits = Max(res_ndigits, 1);
+ /* If rounding needed, figure one more digit to ensure correct result */
+ if (round)
+ res_ndigits++;
/*
- * Make a copy of the divisor which has one leading zero digit
+ * The working dividend normally requires res_ndigits + var2ndigits
+ * digits, but make it at least var1ndigits so we can load all of var1
+ * into it. (There will be an additional digit dividend[0] in the
+ * dividend space, but for consistency with Knuth's notation we don't
+ * count that in div_ndigits.)
*/
- divisor[1].ndigits = ndigits_tmp;
- divisor[1].rscale = var2->ndigits;
- divisor[1].sign = NUMERIC_POS;
- divisor[1].buf = digitbuf_alloc(ndigits_tmp);
- divisor[1].digits = divisor[1].buf;
- divisor[1].digits[0] = 0;
- memcpy(&(divisor[1].digits[1]), var2->digits, ndigits_tmp - 1);
+ div_ndigits = res_ndigits + var2ndigits;
+ div_ndigits = Max(div_ndigits, var1ndigits);
/*
- * Make a copy of the dividend
+ * We need a workspace with room for the working dividend (div_ndigits+1
+ * digits) plus room for the possibly-normalized divisor (var2ndigits
+ * digits). It is convenient also to have a zero at divisor[0] with the
+ * actual divisor data in divisor[1 .. var2ndigits]. Transferring the
+ * digits into the workspace also allows us to realloc the result (which
+ * might be the same as either input var) before we begin the main loop.
+ * Note that we use palloc0 to ensure that divisor[0], dividend[0], and
+ * any additional dividend positions beyond var1ndigits, start out 0.
*/
- dividend.ndigits = var1->ndigits;
- dividend.weight = 0;
- dividend.rscale = var1->ndigits;
- dividend.sign = NUMERIC_POS;
- dividend.buf = digitbuf_alloc(var1->ndigits);
- dividend.digits = dividend.buf;
- memcpy(dividend.digits, var1->digits, var1->ndigits);
+ dividend = (NumericDigit *)
+ palloc0((div_ndigits + var2ndigits + 2) * sizeof(NumericDigit));
+ divisor = dividend + (div_ndigits + 1);
+ memcpy(dividend + 1, var1->digits, var1ndigits * sizeof(NumericDigit));
+ memcpy(divisor + 1, var2->digits, var2ndigits * sizeof(NumericDigit));
/*
- * Setup the result
+ * Now we can realloc the result to hold the generated quotient digits.
+ */
+ alloc_var(result, res_ndigits);
+ res_digits = result->digits;
+
+ if (var2ndigits == 1)
+ {
+ /*
+ * If there's only a single divisor digit, we can use a fast path (cf.
+ * Knuth section 4.3.1 exercise 16).
+ */
+ divisor1 = divisor[1];
+ carry = 0;
+ for (i = 0; i < res_ndigits; i++)
+ {
+ carry = carry * NBASE + dividend[i + 1];
+ res_digits[i] = carry / divisor1;
+ carry = carry % divisor1;
+ }
+ }
+ else
+ {
+ /*
+ * The full multiple-place algorithm is taken from Knuth volume 2,
+ * Algorithm 4.3.1D.
+ *
+ * We need the first divisor digit to be >= NBASE/2. If it isn't,
+ * make it so by scaling up both the divisor and dividend by the
+ * factor "d". (The reason for allocating dividend[0] above is to
+ * leave room for possible carry here.)
+ */
+ if (divisor[1] < HALF_NBASE)
+ {
+ int d = NBASE / (divisor[1] + 1);
+
+ carry = 0;
+ for (i = var2ndigits; i > 0; i--)
+ {
+ carry += divisor[i] * d;
+ divisor[i] = carry % NBASE;
+ carry = carry / NBASE;
+ }
+ Assert(carry == 0);
+ carry = 0;
+ /* at this point only var1ndigits of dividend can be nonzero */
+ for (i = var1ndigits; i >= 0; i--)
+ {
+ carry += dividend[i] * d;
+ dividend[i] = carry % NBASE;
+ carry = carry / NBASE;
+ }
+ Assert(carry == 0);
+ Assert(divisor[1] >= HALF_NBASE);
+ }
+ /* First 2 divisor digits are used repeatedly in main loop */
+ divisor1 = divisor[1];
+ divisor2 = divisor[2];
+
+ /*
+ * Begin the main loop. Each iteration of this loop produces the j'th
+ * quotient digit by dividing dividend[j .. j + var2ndigits] by the
+ * divisor; this is essentially the same as the common manual
+ * procedure for long division.
+ */
+ for (j = 0; j < res_ndigits; j++)
+ {
+ /* Estimate quotient digit from the first two dividend digits */
+ int next2digits = dividend[j] * NBASE + dividend[j + 1];
+ int qhat;
+
+ /*
+ * If next2digits are 0, then quotient digit must be 0 and there's
+ * no need to adjust the working dividend. It's worth testing
+ * here to fall out ASAP when processing trailing zeroes in a
+ * dividend.
+ */
+ if (next2digits == 0)
+ {
+ res_digits[j] = 0;
+ continue;
+ }
+
+ if (dividend[j] == divisor1)
+ qhat = NBASE - 1;
+ else
+ qhat = next2digits / divisor1;
+
+ /*
+ * Adjust quotient digit if it's too large. Knuth proves that
+ * after this step, the quotient digit will be either correct or
+ * just one too large. (Note: it's OK to use dividend[j+2] here
+ * because we know the divisor length is at least 2.)
+ */
+ while (divisor2 * qhat >
+ (next2digits - qhat * divisor1) * NBASE + dividend[j + 2])
+ qhat--;
+
+ /* As above, need do nothing more when quotient digit is 0 */
+ if (qhat > 0)
+ {
+ /*
+ * Multiply the divisor by qhat, and subtract that from the
+ * working dividend. "carry" tracks the multiplication,
+ * "borrow" the subtraction (could we fold these together?)
+ */
+ carry = 0;
+ borrow = 0;
+ for (i = var2ndigits; i >= 0; i--)
+ {
+ carry += divisor[i] * qhat;
+ borrow -= carry % NBASE;
+ carry = carry / NBASE;
+ borrow += dividend[j + i];
+ if (borrow < 0)
+ {
+ dividend[j + i] = borrow + NBASE;
+ borrow = -1;
+ }
+ else
+ {
+ dividend[j + i] = borrow;
+ borrow = 0;
+ }
+ }
+ Assert(carry == 0);
+
+ /*
+ * If we got a borrow out of the top dividend digit, then
+ * indeed qhat was one too large. Fix it, and add back the
+ * divisor to correct the working dividend. (Knuth proves
+ * that this will occur only about 3/NBASE of the time; hence,
+ * it's a good idea to test this code with small NBASE to be
+ * sure this section gets exercised.)
+ */
+ if (borrow)
+ {
+ qhat--;
+ carry = 0;
+ for (i = var2ndigits; i >= 0; i--)
+ {
+ carry += dividend[j + i] + divisor[i];
+ if (carry >= NBASE)
+ {
+ dividend[j + i] = carry - NBASE;
+ carry = 1;
+ }
+ else
+ {
+ dividend[j + i] = carry;
+ carry = 0;
+ }
+ }
+ /* A carry should occur here to cancel the borrow above */
+ Assert(carry == 1);
+ }
+ }
+
+ /* And we're done with this quotient digit */
+ res_digits[j] = qhat;
+ }
+ }
+
+ pfree(dividend);
+
+ /*
+ * Finally, round or truncate the result to the requested precision.
*/
- digitbuf_free(result->buf);
- result->buf = digitbuf_alloc(res_ndigits + 2);
- res_digits = result->buf;
- result->digits = res_digits;
- result->ndigits = res_ndigits;
result->weight = res_weight;
- result->rscale = global_rscale;
result->sign = res_sign;
- res_digits[0] = 0;
- first_div = divisor[1].digits[1] * 10;
- if (ndigits_tmp > 2)
- first_div += divisor[1].digits[2];
+ /* Round or truncate to target rscale (and set result->dscale) */
+ if (round)
+ round_var(result, rscale);
+ else
+ trunc_var(result, rscale);
+
+ /* Strip leading and trailing zeroes */
+ strip_var(result);
+}
+
+
+/*
+ * div_var_fast() -
+ *
+ * This has the same API as div_var, but is implemented using the division
+ * algorithm from the "FM" library, rather than Knuth's schoolbook-division
+ * approach. This is significantly faster but can produce inaccurate
+ * results, because it sometimes has to propagate rounding to the left,
+ * and so we can never be entirely sure that we know the requested digits
+ * exactly. We compute DIV_GUARD_DIGITS extra digits, but there is
+ * no certainty that that's enough. We use this only in the transcendental
+ * function calculation routines, where everything is approximate anyway.
+ */
+static void
+div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result,
+ int rscale, bool round)
+{
+ int div_ndigits;
+ int res_sign;
+ int res_weight;
+ int *div;
+ int qdigit;
+ int carry;
+ int maxdiv;
+ int newdig;
+ NumericDigit *res_digits;
+ double fdividend,
+ fdivisor,
+ fdivisorinverse,
+ fquotient;
+ int qi;
+ int i;
- first_have = 0;
- first_nextdigit = 0;
+ /* copy these values into local vars for speed in inner loop */
+ int var1ndigits = var1->ndigits;
+ int var2ndigits = var2->ndigits;
+ NumericDigit *var1digits = var1->digits;
+ NumericDigit *var2digits = var2->digits;
- weight_tmp = 1;
- rscale_tmp = divisor[1].rscale;
+ /*
+ * First of all division by zero check; we must not be handed an
+ * unnormalized divisor.
+ */
+ if (var2ndigits == 0 || var2digits[0] == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DIVISION_BY_ZERO),
+ errmsg("division by zero")));
- for (ri = 0; ri <= res_ndigits; ri++)
+ /*
+ * Now result zero check
+ */
+ if (var1ndigits == 0)
{
- first_have = first_have * 10;
- if (first_nextdigit >= 0 && first_nextdigit < dividend.ndigits)
- first_have += dividend.digits[first_nextdigit];
- first_nextdigit++;
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /*
+ * Determine the result sign, weight and number of digits to calculate
+ */
+ if (var1->sign == var2->sign)
+ res_sign = NUMERIC_POS;
+ else
+ res_sign = NUMERIC_NEG;
+ res_weight = var1->weight - var2->weight + 1;
+ /* The number of accurate result digits we need to produce: */
+ div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
+ /* Add guard digits for roundoff error */
+ div_ndigits += DIV_GUARD_DIGITS;
+ if (div_ndigits < DIV_GUARD_DIGITS)
+ div_ndigits = DIV_GUARD_DIGITS;
+ /* Must be at least var1ndigits, too, to simplify data-loading loop */
+ if (div_ndigits < var1ndigits)
+ div_ndigits = var1ndigits;
- guess = (first_have * 10) / first_div + 1;
- if (guess > 9)
- guess = 9;
+ /*
+ * We do the arithmetic in an array "div[]" of signed int's. Since
+ * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
+ * to avoid normalizing carries immediately.
+ *
+ * We start with div[] containing one zero digit followed by the
+ * dividend's digits (plus appended zeroes to reach the desired precision
+ * including guard digits). Each step of the main loop computes an
+ * (approximate) quotient digit and stores it into div[], removing one
+ * position of dividend space. A final pass of carry propagation takes
+ * care of any mistaken quotient digits.
+ */
+ div = (int *) palloc0((div_ndigits + 1) * sizeof(int));
+ for (i = 0; i < var1ndigits; i++)
+ div[i + 1] = var1digits[i];
- while (guess > 0)
+ /*
+ * We estimate each quotient digit using floating-point arithmetic, taking
+ * the first four digits of the (current) dividend and divisor. This must
+ * be float to avoid overflow.
+ */
+ fdivisor = (double) var2digits[0];
+ for (i = 1; i < 4; i++)
+ {
+ fdivisor *= NBASE;
+ if (i < var2ndigits)
+ fdivisor += (double) var2digits[i];
+ }
+ fdivisorinverse = 1.0 / fdivisor;
+
+ /*
+ * maxdiv tracks the maximum possible absolute value of any div[] entry;
+ * when this threatens to exceed INT_MAX, we take the time to propagate
+ * carries. To avoid overflow in maxdiv itself, it actually represents
+ * the max possible abs. value divided by NBASE-1.
+ */
+ maxdiv = 1;
+
+ /*
+ * Outer loop computes next quotient digit, which will go into div[qi]
+ */
+ for (qi = 0; qi < div_ndigits; qi++)
+ {
+ /* Approximate the current dividend value */
+ fdividend = (double) div[qi];
+ for (i = 1; i < 4; i++)
{
- if (divisor[guess].buf == NULL)
- {
- int i;
- long sum = 0;
+ fdividend *= NBASE;
+ if (qi + i <= div_ndigits)
+ fdividend += (double) div[qi + i];
+ }
+ /* Compute the (approximate) quotient digit */
+ fquotient = fdividend * fdivisorinverse;
+ qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
+ (((int) fquotient) - 1); /* truncate towards -infinity */
- memcpy(&divisor[guess], &divisor[1], sizeof(NumericVar));
- divisor[guess].buf = digitbuf_alloc(divisor[guess].ndigits);
- divisor[guess].digits = divisor[guess].buf;
- for (i = divisor[1].ndigits - 1; i >= 0; i--)
+ if (qdigit != 0)
+ {
+ /* Do we need to normalize now? */
+ maxdiv += Abs(qdigit);
+ if (maxdiv > INT_MAX / (NBASE - 1))
+ {
+ /* Yes, do it */
+ carry = 0;
+ for (i = div_ndigits; i > qi; i--)
+ {
+ newdig = div[i] + carry;
+ if (newdig < 0)
+ {
+ carry = -((-newdig - 1) / NBASE) - 1;
+ newdig -= carry * NBASE;
+ }
+ else if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ div[i] = newdig;
+ }
+ newdig = div[qi] + carry;
+ div[qi] = newdig;
+
+ /*
+ * All the div[] digits except possibly div[qi] are now in the
+ * range 0..NBASE-1.
+ */
+ maxdiv = Abs(newdig) / (NBASE - 1);
+ maxdiv = Max(maxdiv, 1);
+
+ /*
+ * Recompute the quotient digit since new info may have
+ * propagated into the top four dividend digits
+ */
+ fdividend = (double) div[qi];
+ for (i = 1; i < 4; i++)
{
- sum += divisor[1].digits[i] * guess;
- divisor[guess].digits[i] = sum % 10;
- sum /= 10;
+ fdividend *= NBASE;
+ if (qi + i <= div_ndigits)
+ fdividend += (double) div[qi + i];
}
+ /* Compute the (approximate) quotient digit */
+ fquotient = fdividend * fdivisorinverse;
+ qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
+ (((int) fquotient) - 1); /* truncate towards -infinity */
+ maxdiv += Abs(qdigit);
}
- divisor[guess].weight = weight_tmp;
- divisor[guess].rscale = rscale_tmp;
-
- stat = cmp_abs(÷nd, &divisor[guess]);
- if (stat >= 0)
- break;
+ /* Subtract off the appropriate multiple of the divisor */
+ if (qdigit != 0)
+ {
+ int istop = Min(var2ndigits, div_ndigits - qi + 1);
- guess--;
+ for (i = 0; i < istop; i++)
+ div[qi + i] -= qdigit * var2digits[i];
+ }
}
- res_digits[ri + 1] = guess;
- if (stat == 0)
+ /*
+ * The dividend digit we are about to replace might still be nonzero.
+ * Fold it into the next digit position. We don't need to worry about
+ * overflow here since this should nearly cancel with the subtraction
+ * of the divisor.
+ */
+ div[qi + 1] += div[qi] * NBASE;
+
+ div[qi] = qdigit;
+ }
+
+ /*
+ * Approximate and store the last quotient digit (div[div_ndigits])
+ */
+ fdividend = (double) div[qi];
+ for (i = 1; i < 4; i++)
+ fdividend *= NBASE;
+ fquotient = fdividend * fdivisorinverse;
+ qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
+ (((int) fquotient) - 1); /* truncate towards -infinity */
+ div[qi] = qdigit;
+
+ /*
+ * Now we do a final carry propagation pass to normalize the result, which
+ * we combine with storing the result digits into the output. Note that
+ * this is still done at full precision w/guard digits.
+ */
+ alloc_var(result, div_ndigits + 1);
+ res_digits = result->digits;
+ carry = 0;
+ for (i = div_ndigits; i >= 0; i--)
+ {
+ newdig = div[i] + carry;
+ if (newdig < 0)
{
- ri++;
- break;
+ carry = -((-newdig - 1) / NBASE) - 1;
+ newdig -= carry * NBASE;
+ }
+ else if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
}
+ else
+ carry = 0;
+ res_digits[i] = newdig;
+ }
+ Assert(carry == 0);
- weight_tmp--;
- rscale_tmp++;
+ pfree(div);
- if (guess == 0)
- continue;
+ /*
+ * Finally, round the result to the requested precision.
+ */
+ result->weight = res_weight;
+ result->sign = res_sign;
- sub_abs(÷nd, &divisor[guess], ÷nd);
+ /* Round to target rscale (and set result->dscale) */
+ if (round)
+ round_var(result, rscale);
+ else
+ trunc_var(result, rscale);
- first_nextdigit = dividend.weight - weight_tmp;
- first_have = 0;
- if (first_nextdigit >= 0 && first_nextdigit < dividend.ndigits)
- first_have = dividend.digits[first_nextdigit];
- first_nextdigit++;
- }
+ /* Strip leading and trailing zeroes */
+ strip_var(result);
+}
- result->ndigits = ri + 1;
- if (ri == res_ndigits + 1)
- {
- int carry = (res_digits[ri] > 4) ? 1 : 0;
- result->ndigits = ri;
- res_digits[ri] = 0;
+/*
+ * Default scale selection for division
+ *
+ * Returns the appropriate result scale for the division result.
+ */
+static int
+select_div_scale(NumericVar *var1, NumericVar *var2)
+{
+ int weight1,
+ weight2,
+ qweight,
+ i;
+ NumericDigit firstdigit1,
+ firstdigit2;
+ int rscale;
+
+ /*
+ * The result scale of a division isn't specified in any SQL standard. For
+ * PostgreSQL we select a result scale that will give at least
+ * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a
+ * result no less accurate than float8; but use a scale not less than
+ * either input's display scale.
+ */
+
+ /* Get the actual (normalized) weight and first digit of each input */
- while (carry && ri > 0)
+ weight1 = 0; /* values to use if var1 is zero */
+ firstdigit1 = 0;
+ for (i = 0; i < var1->ndigits; i++)
+ {
+ firstdigit1 = var1->digits[i];
+ if (firstdigit1 != 0)
{
- carry += res_digits[--ri];
- res_digits[ri] = carry % 10;
- carry /= 10;
+ weight1 = var1->weight - i;
+ break;
}
}
- while (result->ndigits > 0 && *(result->digits) == 0)
+ weight2 = 0; /* values to use if var2 is zero */
+ firstdigit2 = 0;
+ for (i = 0; i < var2->ndigits; i++)
{
- (result->digits)++;
- (result->weight)--;
- (result->ndigits)--;
+ firstdigit2 = var2->digits[i];
+ if (firstdigit2 != 0)
+ {
+ weight2 = var2->weight - i;
+ break;
+ }
}
- while (result->ndigits > 0 && result->digits[result->ndigits - 1] == 0)
- (result->ndigits)--;
- if (result->ndigits == 0)
- result->sign = NUMERIC_POS;
/*
- * Tidy up
+ * Estimate weight of quotient. If the two first digits are equal, we
+ * can't be sure, but assume that var1 is less than var2.
*/
- digitbuf_free(dividend.buf);
- for (i = 1; i < 10; i++)
- digitbuf_free(divisor[i].buf);
+ qweight = weight1 - weight2;
+ if (firstdigit1 <= firstdigit2)
+ qweight--;
+
+ /* Select result scale */
+ rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS;
+ rscale = Max(rscale, var1->dscale);
+ rscale = Max(rscale, var2->dscale);
+ rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+ rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
+
+ return rscale;
}
-/* ----------
+/*
* mod_var() -
*
* Calculate the modulo of two numerics at variable level
- * ----------
*/
static void
mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
{
NumericVar tmp;
- int save_global_rscale;
- int div_dscale;
init_var(&tmp);
/* ---------
* We do this using the equation
* mod(x,y) = x - trunc(x/y)*y
- * We set global_rscale the same way numeric_div and numeric_mul do
- * to get the right answer from the equation. The final result,
- * however, need not be displayed to more precision than the inputs.
+ * div_var can be persuaded to give us trunc(x/y) directly.
* ----------
*/
- save_global_rscale = global_rscale;
+ div_var(var1, var2, &tmp, 0, false);
- div_dscale = MAX(var1->dscale + var2->dscale, NUMERIC_MIN_DISPLAY_SCALE);
- div_dscale = MIN(div_dscale, NUMERIC_MAX_DISPLAY_SCALE);
- global_rscale = MAX(var1->rscale + var2->rscale,
- NUMERIC_MIN_RESULT_SCALE);
- global_rscale = MAX(global_rscale, div_dscale + 4);
- global_rscale = MIN(global_rscale, NUMERIC_MAX_RESULT_SCALE);
-
- div_var(var1, var2, &tmp);
-
- tmp.dscale = div_dscale;
-
- /* do trunc() by forgetting digits to the right of the decimal point */
- tmp.ndigits = MAX(0, MIN(tmp.ndigits, tmp.weight + 1));
-
- global_rscale = var2->rscale + tmp.rscale;
-
- mul_var(var2, &tmp, &tmp);
+ mul_var(var2, &tmp, &tmp, var2->dscale);
sub_var(var1, &tmp, result);
- result->dscale = MAX(var1->dscale, var2->dscale);
-
- global_rscale = save_global_rscale;
free_var(&tmp);
}
-/* ----------
+/*
* ceil_var() -
*
* Return the smallest integer greater than or equal to the argument
* on variable level
- * ----------
*/
static void
ceil_var(NumericVar *var, NumericVar *result)
init_var(&tmp);
set_var_from_var(var, &tmp);
- tmp.rscale = 0;
- tmp.ndigits = MIN(tmp.ndigits, MAX(0, tmp.weight + 1));
- if (tmp.sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
+ trunc_var(&tmp, 0);
+
+ if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
add_var(&tmp, &const_one, &tmp);
set_var_from_var(&tmp, result);
}
-/* ----------
+/*
* floor_var() -
*
* Return the largest integer equal to or less than the argument
* on variable level
- * ----------
*/
static void
floor_var(NumericVar *var, NumericVar *result)
init_var(&tmp);
set_var_from_var(var, &tmp);
- tmp.rscale = 0;
- tmp.ndigits = MIN(tmp.ndigits, MAX(0, tmp.weight + 1));
- if (tmp.sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
+ trunc_var(&tmp, 0);
+
+ if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
sub_var(&tmp, &const_one, &tmp);
set_var_from_var(&tmp, result);
}
-/* ----------
+/*
* sqrt_var() -
*
- * Compute the square root of x using Newtons algorithm
- * ----------
+ * Compute the square root of x using Newton's algorithm
*/
static void
-sqrt_var(NumericVar *arg, NumericVar *result)
+sqrt_var(NumericVar *arg, NumericVar *result, int rscale)
{
NumericVar tmp_arg;
NumericVar tmp_val;
NumericVar last_val;
- int res_rscale;
- int save_global_rscale;
+ int local_rscale;
int stat;
- save_global_rscale = global_rscale;
- global_rscale += 8;
- res_rscale = global_rscale;
+ local_rscale = rscale + 8;
stat = cmp_var(arg, &const_zero);
if (stat == 0)
{
- set_var_from_var(&const_zero, result);
- result->rscale = res_rscale;
- result->sign = NUMERIC_POS;
+ zero_var(result);
+ result->dscale = rscale;
return;
}
+ /*
+ * SQL2003 defines sqrt() in terms of power, so we need to emit the right
+ * SQLSTATE error code if the operand is negative.
+ */
if (stat < 0)
- elog(ERROR, "math error on numeric - cannot compute SQRT of negative value");
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
+ errmsg("cannot take square root of a negative number")));
init_var(&tmp_arg);
init_var(&tmp_val);
init_var(&last_val);
+ /* Copy arg in case it is the same var as result */
set_var_from_var(arg, &tmp_arg);
- set_var_from_var(result, &last_val);
/*
* Initialize the result to the first guess
*/
- digitbuf_free(result->buf);
- result->buf = digitbuf_alloc(1);
- result->digits = result->buf;
+ alloc_var(result, 1);
result->digits[0] = tmp_arg.digits[0] / 2;
if (result->digits[0] == 0)
result->digits[0] = 1;
- result->ndigits = 1;
result->weight = tmp_arg.weight / 2;
- result->rscale = res_rscale;
result->sign = NUMERIC_POS;
- for (;;)
+ set_var_from_var(result, &last_val);
+
+ for (;;)
+ {
+ div_var_fast(&tmp_arg, result, &tmp_val, local_rscale, true);
+
+ add_var(result, &tmp_val, result);
+ mul_var(result, &const_zero_point_five, result, local_rscale);
+
+ if (cmp_var(&last_val, result) == 0)
+ break;
+ set_var_from_var(result, &last_val);
+ }
+
+ free_var(&last_val);
+ free_var(&tmp_val);
+ free_var(&tmp_arg);
+
+ /* Round to requested precision */
+ round_var(result, rscale);
+}
+
+
+/*
+ * exp_var() -
+ *
+ * Raise e to the power of x
+ */
+static void
+exp_var(NumericVar *arg, NumericVar *result, int rscale)
+{
+ NumericVar x;
+ int xintval;
+ bool xneg = FALSE;
+ int local_rscale;
+
+ /*----------
+ * We separate the integral and fraction parts of x, then compute
+ * e^x = e^xint * e^xfrac
+ * where e = exp(1) and e^xfrac = exp(xfrac) are computed by
+ * exp_var_internal; the limited range of inputs allows that routine
+ * to do a good job with a simple Taylor series. Raising e^xint is
+ * done by repeated multiplications in power_var_int.
+ *----------
+ */
+ init_var(&x);
+
+ set_var_from_var(arg, &x);
+
+ if (x.sign == NUMERIC_NEG)
+ {
+ xneg = TRUE;
+ x.sign = NUMERIC_POS;
+ }
+
+ /* Extract the integer part, remove it from x */
+ xintval = 0;
+ while (x.weight >= 0)
{
- div_var(&tmp_arg, result, &tmp_val);
+ xintval *= NBASE;
+ if (x.ndigits > 0)
+ {
+ xintval += x.digits[0];
+ x.digits++;
+ x.ndigits--;
+ }
+ x.weight--;
+ /* Guard against overflow */
+ if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3)
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("argument for function \"exp\" too big")));
+ }
- add_var(result, &tmp_val, result);
- div_var(result, &const_two, result);
+ /* Select an appropriate scale for internal calculation */
+ local_rscale = rscale + MUL_GUARD_DIGITS * 2;
- if (cmp_var(&last_val, result) == 0)
- break;
- set_var_from_var(result, &last_val);
+ /* Compute e^xfrac */
+ exp_var_internal(&x, result, local_rscale);
+
+ /* If there's an integer part, multiply by e^xint */
+ if (xintval > 0)
+ {
+ NumericVar e;
+
+ init_var(&e);
+ exp_var_internal(&const_one, &e, local_rscale);
+ power_var_int(&e, xintval, &e, local_rscale);
+ mul_var(&e, result, result, local_rscale);
+ free_var(&e);
}
- free_var(&last_val);
- free_var(&tmp_val);
- free_var(&tmp_arg);
+ /* Compensate for input sign, and round to requested rscale */
+ if (xneg)
+ div_var_fast(&const_one, result, result, rscale, true);
+ else
+ round_var(result, rscale);
- global_rscale = save_global_rscale;
- div_var(result, &const_one, result);
+ free_var(&x);
}
-/* ----------
- * exp_var() -
+/*
+ * exp_var_internal() -
*
- * Raise e to the power of x
- * ----------
+ * Raise e to the power of x, where 0 <= x <= 1
+ *
+ * NB: the result should be good to at least rscale digits, but it has
+ * *not* been rounded off; the caller must do that if wanted.
*/
static void
-exp_var(NumericVar *arg, NumericVar *result)
+exp_var_internal(NumericVar *arg, NumericVar *result, int rscale)
{
NumericVar x;
NumericVar xpow;
NumericVar ifac;
NumericVar elem;
NumericVar ni;
- int d;
- int i;
int ndiv2 = 0;
- bool xneg = FALSE;
- int save_global_rscale;
+ int local_rscale;
init_var(&x);
init_var(&xpow);
set_var_from_var(arg, &x);
- if (x.sign == NUMERIC_NEG)
- {
- xneg = TRUE;
- x.sign = NUMERIC_POS;
- }
-
- save_global_rscale = global_rscale;
- global_rscale = 0;
- for (i = x.weight, d = 0; i >= 0; i--, d++)
- {
- global_rscale *= 10;
- if (d < x.ndigits)
- global_rscale += x.digits[d];
- if (global_rscale >= 1000)
- elog(ERROR, "argument for EXP() too big");
- }
+ Assert(x.sign == NUMERIC_POS);
- global_rscale = global_rscale / 2 + save_global_rscale + 8;
+ local_rscale = rscale + 8;
- while (cmp_var(&x, &const_one) > 0)
+ /* Reduce input into range 0 <= x <= 0.01 */
+ while (cmp_var(&x, &const_zero_point_01) > 0)
{
ndiv2++;
- global_rscale++;
- div_var(&x, &const_two, &x);
+ local_rscale++;
+ mul_var(&x, &const_zero_point_five, &x, x.dscale + 1);
}
+ /*
+ * Use the Taylor series
+ *
+ * exp(x) = 1 + x + x^2/2! + x^3/3! + ...
+ *
+ * Given the limited range of x, this should converge reasonably quickly.
+ * We run the series until the terms fall below the local_rscale limit.
+ */
add_var(&const_one, &x, result);
set_var_from_var(&x, &xpow);
set_var_from_var(&const_one, &ifac);
set_var_from_var(&const_one, &ni);
- for (i = 2;; i++)
+ for (;;)
{
add_var(&ni, &const_one, &ni);
- mul_var(&xpow, &x, &xpow);
- mul_var(&ifac, &ni, &ifac);
- div_var(&xpow, &ifac, &elem);
+ mul_var(&xpow, &x, &xpow, local_rscale);
+ mul_var(&ifac, &ni, &ifac, 0);
+ div_var_fast(&xpow, &ifac, &elem, local_rscale, true);
if (elem.ndigits == 0)
break;
add_var(result, &elem, result);
}
+ /* Compensate for argument range reduction */
while (ndiv2-- > 0)
- mul_var(result, result, result);
-
- global_rscale = save_global_rscale;
- if (xneg)
- div_var(&const_one, result, result);
- else
- div_var(result, &const_one, result);
-
- result->sign = NUMERIC_POS;
+ mul_var(result, result, result, local_rscale);
free_var(&x);
free_var(&xpow);
}
-/* ----------
+/*
* ln_var() -
*
* Compute the natural log of x
- * ----------
*/
static void
-ln_var(NumericVar *arg, NumericVar *result)
+ln_var(NumericVar *arg, NumericVar *result, int rscale)
{
NumericVar x;
NumericVar xx;
NumericVar ni;
NumericVar elem;
NumericVar fact;
- int i;
- int save_global_rscale;
+ int local_rscale;
+ int cmp;
- if (cmp_var(arg, &const_zero) <= 0)
- elog(ERROR, "math error on numeric - cannot compute LN of value <= zero");
+ cmp = cmp_var(arg, &const_zero);
+ if (cmp == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
+ errmsg("cannot take logarithm of zero")));
+ else if (cmp < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
+ errmsg("cannot take logarithm of a negative number")));
- save_global_rscale = global_rscale;
- global_rscale += 8;
+ local_rscale = rscale + 8;
init_var(&x);
init_var(&xx);
init_var(&elem);
init_var(&fact);
- set_var_from_var(&const_two, &fact);
set_var_from_var(arg, &x);
+ set_var_from_var(&const_two, &fact);
- while (cmp_var(&x, &const_two) >= 0)
+ /* Reduce input into range 0.9 < x < 1.1 */
+ while (cmp_var(&x, &const_zero_point_nine) <= 0)
{
- sqrt_var(&x, &x);
- mul_var(&fact, &const_two, &fact);
+ local_rscale++;
+ sqrt_var(&x, &x, local_rscale);
+ mul_var(&fact, &const_two, &fact, 0);
}
- set_var_from_str("0.5", &elem);
- while (cmp_var(&x, &elem) <= 0)
+ while (cmp_var(&x, &const_one_point_one) >= 0)
{
- sqrt_var(&x, &x);
- mul_var(&fact, &const_two, &fact);
+ local_rscale++;
+ sqrt_var(&x, &x, local_rscale);
+ mul_var(&fact, &const_two, &fact, 0);
}
+ /*
+ * We use the Taylor series for 0.5 * ln((1+z)/(1-z)),
+ *
+ * z + z^3/3 + z^5/5 + ...
+ *
+ * where z = (x-1)/(x+1) is in the range (approximately) -0.053 .. 0.048
+ * due to the above range-reduction of x.
+ *
+ * The convergence of this is not as fast as one would like, but is
+ * tolerable given that z is small.
+ */
sub_var(&x, &const_one, result);
add_var(&x, &const_one, &elem);
- div_var(result, &elem, result);
+ div_var_fast(result, &elem, result, local_rscale, true);
set_var_from_var(result, &xx);
- mul_var(result, result, &x);
+ mul_var(result, result, &x, local_rscale);
set_var_from_var(&const_one, &ni);
- for (i = 2;; i++)
+ for (;;)
{
add_var(&ni, &const_two, &ni);
- mul_var(&xx, &x, &xx);
- div_var(&xx, &ni, &elem);
+ mul_var(&xx, &x, &xx, local_rscale);
+ div_var_fast(&xx, &ni, &elem, local_rscale, true);
- if (cmp_var(&elem, &const_zero) == 0)
+ if (elem.ndigits == 0)
break;
add_var(result, &elem, result);
+
+ if (elem.weight < (result->weight - local_rscale * 2 / DEC_DIGITS))
+ break;
}
- global_rscale = save_global_rscale;
- mul_var(result, &fact, result);
+ /* Compensate for argument range reduction, round to requested rscale */
+ mul_var(result, &fact, result, rscale);
free_var(&x);
free_var(&xx);
}
-/* ----------
+/*
* log_var() -
*
- * Compute the logarithm of x in a given base
- * ----------
+ * Compute the logarithm of num in a given base.
+ *
+ * Note: this routine chooses dscale of the result.
*/
static void
log_var(NumericVar *base, NumericVar *num, NumericVar *result)
{
NumericVar ln_base;
NumericVar ln_num;
-
- global_rscale += 8;
+ int dec_digits;
+ int rscale;
+ int local_rscale;
init_var(&ln_base);
init_var(&ln_num);
- ln_var(base, &ln_base);
- ln_var(num, &ln_num);
+ /* Set scale for ln() calculations --- compare numeric_ln() */
+
+ /* Approx decimal digits before decimal point */
+ dec_digits = (num->weight + 1) * DEC_DIGITS;
+
+ if (dec_digits > 1)
+ rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
+ else if (dec_digits < 1)
+ rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
+ else
+ rscale = NUMERIC_MIN_SIG_DIGITS;
+
+ rscale = Max(rscale, base->dscale);
+ rscale = Max(rscale, num->dscale);
+ rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+ rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
+
+ local_rscale = rscale + 8;
- global_rscale -= 8;
+ /* Form natural logarithms */
+ ln_var(base, &ln_base, local_rscale);
+ ln_var(num, &ln_num, local_rscale);
- div_var(&ln_num, &ln_base, result);
+ ln_base.dscale = rscale;
+ ln_num.dscale = rscale;
+
+ /* Select scale for division result */
+ rscale = select_div_scale(&ln_num, &ln_base);
+
+ div_var_fast(&ln_num, &ln_base, result, rscale, true);
free_var(&ln_num);
free_var(&ln_base);
}
-/* ----------
+/*
* power_var() -
*
* Raise base to the power of exp
- * ----------
+ *
+ * Note: this routine chooses dscale of the result.
*/
static void
power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
{
NumericVar ln_base;
NumericVar ln_num;
- int save_global_rscale;
+ int dec_digits;
+ int rscale;
+ int local_rscale;
+ double val;
+
+ /* If exp can be represented as an integer, use power_var_int */
+ if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1)
+ {
+ /* exact integer, but does it fit in int? */
+ NumericVar x;
+ int64 expval64;
+
+ /* must copy because numericvar_to_int8() scribbles on input */
+ init_var(&x);
+ set_var_from_var(exp, &x);
+ if (numericvar_to_int8(&x, &expval64))
+ {
+ int expval = (int) expval64;
+
+ /* Test for overflow by reverse-conversion. */
+ if ((int64) expval == expval64)
+ {
+ /* Okay, select rscale */
+ rscale = NUMERIC_MIN_SIG_DIGITS;
+ rscale = Max(rscale, base->dscale);
+ rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+ rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
+
+ power_var_int(base, expval, result, rscale);
+
+ free_var(&x);
+ return;
+ }
+ }
+ free_var(&x);
+ }
- save_global_rscale = global_rscale;
- global_rscale += global_rscale / 3 + 8;
+ /*
+ * This avoids log(0) for cases of 0 raised to a non-integer. 0 ^ 0
+ * handled by power_var_int().
+ */
+ if (cmp_var(base, &const_zero) == 0)
+ {
+ set_var_from_var(&const_zero, result);
+ result->dscale = NUMERIC_MIN_SIG_DIGITS; /* no need to round */
+ return;
+ }
init_var(&ln_base);
init_var(&ln_num);
- ln_var(base, &ln_base);
- mul_var(&ln_base, exp, &ln_num);
+ /* Set scale for ln() calculation --- need extra accuracy here */
+
+ /* Approx decimal digits before decimal point */
+ dec_digits = (base->weight + 1) * DEC_DIGITS;
+
+ if (dec_digits > 1)
+ rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(dec_digits - 1);
+ else if (dec_digits < 1)
+ rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(1 - dec_digits);
+ else
+ rscale = NUMERIC_MIN_SIG_DIGITS * 2;
+
+ rscale = Max(rscale, base->dscale * 2);
+ rscale = Max(rscale, exp->dscale * 2);
+ rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2);
+ rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2);
+
+ local_rscale = rscale + 8;
+
+ ln_var(base, &ln_base, local_rscale);
+
+ mul_var(&ln_base, exp, &ln_num, local_rscale);
+
+ /* Set scale for exp() -- compare numeric_exp() */
+
+ /* convert input to float8, ignoring overflow */
+ val = numericvar_to_double_no_overflow(&ln_num);
+
+ /*
+ * log10(result) = num * log10(e), so this is approximately the weight:
+ */
+ val *= 0.434294481903252;
- global_rscale = save_global_rscale;
+ /* limit to something that won't cause integer overflow */
+ val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
+ val = Min(val, NUMERIC_MAX_RESULT_SCALE);
- exp_var(&ln_num, result);
+ rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
+ rscale = Max(rscale, base->dscale);
+ rscale = Max(rscale, exp->dscale);
+ rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+ rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
+
+ exp_var(&ln_num, result, rscale);
free_var(&ln_num);
free_var(&ln_base);
+}
+
+/*
+ * power_var_int() -
+ *
+ * Raise base to the power of exp, where exp is an integer.
+ */
+static void
+power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale)
+{
+ bool neg;
+ NumericVar base_prod;
+ int local_rscale;
+
+ switch (exp)
+ {
+ case 0:
+
+ /*
+ * While 0 ^ 0 can be either 1 or indeterminate (error), we treat
+ * it as 1 because most programming languages do this. SQL:2003
+ * also requires a return value of 1.
+ * http://en.wikipedia.org/wiki/Exponentiation#Zero_to_the_zero_pow
+ * er
+ */
+ set_var_from_var(&const_one, result);
+ result->dscale = rscale; /* no need to round */
+ return;
+ case 1:
+ set_var_from_var(base, result);
+ round_var(result, rscale);
+ return;
+ case -1:
+ div_var(&const_one, base, result, rscale, true);
+ return;
+ case 2:
+ mul_var(base, base, result, rscale);
+ return;
+ default:
+ break;
+ }
+
+ /*
+ * The general case repeatedly multiplies base according to the bit
+ * pattern of exp. We do the multiplications with some extra precision.
+ */
+ neg = (exp < 0);
+ exp = Abs(exp);
+
+ local_rscale = rscale + MUL_GUARD_DIGITS * 2;
+
+ init_var(&base_prod);
+ set_var_from_var(base, &base_prod);
+
+ if (exp & 1)
+ set_var_from_var(base, result);
+ else
+ set_var_from_var(&const_one, result);
+
+ while ((exp >>= 1) > 0)
+ {
+ mul_var(&base_prod, &base_prod, &base_prod, local_rscale);
+ if (exp & 1)
+ mul_var(&base_prod, result, result, local_rscale);
+ }
+ free_var(&base_prod);
+
+ /* Compensate for input sign, and round to requested rscale */
+ if (neg)
+ div_var_fast(&const_one, result, result, rscale, true);
+ else
+ round_var(result, rscale);
}
*/
static int
cmp_abs(NumericVar *var1, NumericVar *var2)
+{
+ return cmp_abs_common(var1->digits, var1->ndigits, var1->weight,
+ var2->digits, var2->ndigits, var2->weight);
+}
+
+/* ----------
+ * cmp_abs_common() -
+ *
+ * Main routine of cmp_abs(). This function can be used by both
+ * NumericVar and Numeric.
+ * ----------
+ */
+static int
+cmp_abs_common(const NumericDigit *var1digits, int var1ndigits, int var1weight,
+ const NumericDigit *var2digits, int var2ndigits, int var2weight)
{
int i1 = 0;
int i2 = 0;
- int w1 = var1->weight;
- int w2 = var2->weight;
- int stat;
- while (w1 > w2 && i1 < var1->ndigits)
+ /* Check any digits before the first common digit */
+
+ while (var1weight > var2weight && i1 < var1ndigits)
{
- if (var1->digits[i1++] != 0)
+ if (var1digits[i1++] != 0)
return 1;
- w1--;
+ var1weight--;
}
- while (w2 > w1 && i2 < var2->ndigits)
+ while (var2weight > var1weight && i2 < var2ndigits)
{
- if (var2->digits[i2++] != 0)
+ if (var2digits[i2++] != 0)
return -1;
- w2--;
+ var2weight--;
}
- if (w1 == w2)
+ /* At this point, either w1 == w2 or we've run out of digits */
+
+ if (var1weight == var2weight)
{
- while (i1 < var1->ndigits && i2 < var2->ndigits)
+ while (i1 < var1ndigits && i2 < var2ndigits)
{
- stat = var1->digits[i1++] - var2->digits[i2++];
+ int stat = var1digits[i1++] - var2digits[i2++];
+
if (stat)
{
if (stat > 0)
}
}
- while (i1 < var1->ndigits)
+ /*
+ * At this point, we've run out of digits on one side or the other; so any
+ * remaining nonzero digits imply that side is larger
+ */
+ while (i1 < var1ndigits)
{
- if (var1->digits[i1++] != 0)
+ if (var1digits[i1++] != 0)
return 1;
}
- while (i2 < var2->ndigits)
+ while (i2 < var2ndigits)
{
- if (var2->digits[i2++] != 0)
+ if (var2digits[i2++] != 0)
return -1;
}
}
-/* ----------
+/*
* add_abs() -
*
* Add the absolute values of two variables into result.
* result might point to one of the operands without danger.
- * ----------
*/
static void
add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
NumericDigit *res_digits;
int res_ndigits;
int res_weight;
- int res_rscale;
+ int res_rscale,
+ rscale1,
+ rscale2;
int res_dscale;
int i,
i1,
NumericDigit *var1digits = var1->digits;
NumericDigit *var2digits = var2->digits;
- res_weight = MAX(var1->weight, var2->weight) + 1;
- res_rscale = MAX(var1->rscale, var2->rscale);
- res_dscale = MAX(var1->dscale, var2->dscale);
+ res_weight = Max(var1->weight, var2->weight) + 1;
+
+ res_dscale = Max(var1->dscale, var2->dscale);
+
+ /* Note: here we are figuring rscale in base-NBASE digits */
+ rscale1 = var1->ndigits - var1->weight - 1;
+ rscale2 = var2->ndigits - var2->weight - 1;
+ res_rscale = Max(rscale1, rscale2);
+
res_ndigits = res_rscale + res_weight + 1;
if (res_ndigits <= 0)
res_ndigits = 1;
- res_buf = digitbuf_alloc(res_ndigits);
- res_digits = res_buf;
+ res_buf = digitbuf_alloc(res_ndigits + 1);
+ res_buf[0] = 0; /* spare digit for later rounding */
+ res_digits = res_buf + 1;
i1 = res_rscale + var1->weight + 1;
i2 = res_rscale + var2->weight + 1;
if (i2 >= 0 && i2 < var2ndigits)
carry += var2digits[i2];
- if (carry >= 10)
+ if (carry >= NBASE)
{
- res_digits[i] = carry - 10;
+ res_digits[i] = carry - NBASE;
carry = 1;
}
else
Assert(carry == 0); /* else we failed to allow for carry out */
- while (res_ndigits > 0 && *res_digits == 0)
- {
- res_digits++;
- res_weight--;
- res_ndigits--;
- }
- while (res_ndigits > 0 && res_digits[res_ndigits - 1] == 0)
- res_ndigits--;
-
- if (res_ndigits == 0)
- res_weight = 0;
-
digitbuf_free(result->buf);
result->ndigits = res_ndigits;
result->buf = res_buf;
result->digits = res_digits;
result->weight = res_weight;
- result->rscale = res_rscale;
result->dscale = res_dscale;
+
+ /* Remove leading/trailing zeroes */
+ strip_var(result);
}
-/* ----------
- * sub_abs() -
+/*
+ * sub_abs()
*
* Subtract the absolute value of var2 from the absolute value of var1
* and store in result. result might point to one of the operands
* without danger.
*
* ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!!
- * ----------
*/
static void
sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
NumericDigit *res_digits;
int res_ndigits;
int res_weight;
- int res_rscale;
+ int res_rscale,
+ rscale1,
+ rscale2;
int res_dscale;
int i,
i1,
NumericDigit *var2digits = var2->digits;
res_weight = var1->weight;
- res_rscale = MAX(var1->rscale, var2->rscale);
- res_dscale = MAX(var1->dscale, var2->dscale);
+
+ res_dscale = Max(var1->dscale, var2->dscale);
+
+ /* Note: here we are figuring rscale in base-NBASE digits */
+ rscale1 = var1->ndigits - var1->weight - 1;
+ rscale2 = var2->ndigits - var2->weight - 1;
+ res_rscale = Max(rscale1, rscale2);
+
res_ndigits = res_rscale + res_weight + 1;
if (res_ndigits <= 0)
res_ndigits = 1;
- res_buf = digitbuf_alloc(res_ndigits);
- res_digits = res_buf;
+ res_buf = digitbuf_alloc(res_ndigits + 1);
+ res_buf[0] = 0; /* spare digit for later rounding */
+ res_digits = res_buf + 1;
i1 = res_rscale + var1->weight + 1;
i2 = res_rscale + var2->weight + 1;
if (borrow < 0)
{
- res_digits[i] = borrow + 10;
+ res_digits[i] = borrow + NBASE;
borrow = -1;
}
else
Assert(borrow == 0); /* else caller gave us var1 < var2 */
- while (res_ndigits > 0 && *res_digits == 0)
- {
- res_digits++;
- res_weight--;
- res_ndigits--;
- }
- while (res_ndigits > 0 && res_digits[res_ndigits - 1] == 0)
- res_ndigits--;
-
- if (res_ndigits == 0)
- res_weight = 0;
-
digitbuf_free(result->buf);
result->ndigits = res_ndigits;
result->buf = res_buf;
result->digits = res_digits;
result->weight = res_weight;
- result->rscale = res_rscale;
result->dscale = res_dscale;
+
+ /* Remove leading/trailing zeroes */
+ strip_var(result);
+}
+
+/*
+ * round_var
+ *
+ * Round the value of a variable to no more than rscale decimal digits
+ * after the decimal point. NOTE: we allow rscale < 0 here, implying
+ * rounding before the decimal point.
+ */
+static void
+round_var(NumericVar *var, int rscale)
+{
+ NumericDigit *digits = var->digits;
+ int di;
+ int ndigits;
+ int carry;
+
+ var->dscale = rscale;
+
+ /* decimal digits wanted */
+ di = (var->weight + 1) * DEC_DIGITS + rscale;
+
+ /*
+ * If di = 0, the value loses all digits, but could round up to 1 if its
+ * first extra digit is >= 5. If di < 0 the result must be 0.
+ */
+ if (di < 0)
+ {
+ var->ndigits = 0;
+ var->weight = 0;
+ var->sign = NUMERIC_POS;
+ }
+ else
+ {
+ /* NBASE digits wanted */
+ ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
+
+ /* 0, or number of decimal digits to keep in last NBASE digit */
+ di %= DEC_DIGITS;
+
+ if (ndigits < var->ndigits ||
+ (ndigits == var->ndigits && di > 0))
+ {
+ var->ndigits = ndigits;
+
+#if DEC_DIGITS == 1
+ /* di must be zero */
+ carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
+#else
+ if (di == 0)
+ carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
+ else
+ {
+ /* Must round within last NBASE digit */
+ int extra,
+ pow10;
+
+#if DEC_DIGITS == 4
+ pow10 = round_powers[di];
+#elif DEC_DIGITS == 2
+ pow10 = 10;
+#else
+#error unsupported NBASE
+#endif
+ extra = digits[--ndigits] % pow10;
+ digits[ndigits] -= extra;
+ carry = 0;
+ if (extra >= pow10 / 2)
+ {
+ pow10 += digits[ndigits];
+ if (pow10 >= NBASE)
+ {
+ pow10 -= NBASE;
+ carry = 1;
+ }
+ digits[ndigits] = pow10;
+ }
+ }
+#endif
+
+ /* Propagate carry if needed */
+ while (carry)
+ {
+ carry += digits[--ndigits];
+ if (carry >= NBASE)
+ {
+ digits[ndigits] = carry - NBASE;
+ carry = 1;
+ }
+ else
+ {
+ digits[ndigits] = carry;
+ carry = 0;
+ }
+ }
+
+ if (ndigits < 0)
+ {
+ Assert(ndigits == -1); /* better not have added > 1 digit */
+ Assert(var->digits > var->buf);
+ var->digits--;
+ var->ndigits++;
+ var->weight++;
+ }
+ }
+ }
+}
+
+/*
+ * trunc_var
+ *
+ * Truncate (towards zero) the value of a variable at rscale decimal digits
+ * after the decimal point. NOTE: we allow rscale < 0 here, implying
+ * truncation before the decimal point.
+ */
+static void
+trunc_var(NumericVar *var, int rscale)
+{
+ int di;
+ int ndigits;
+
+ var->dscale = rscale;
+
+ /* decimal digits wanted */
+ di = (var->weight + 1) * DEC_DIGITS + rscale;
+
+ /*
+ * If di <= 0, the value loses all digits.
+ */
+ if (di <= 0)
+ {
+ var->ndigits = 0;
+ var->weight = 0;
+ var->sign = NUMERIC_POS;
+ }
+ else
+ {
+ /* NBASE digits wanted */
+ ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
+
+ if (ndigits <= var->ndigits)
+ {
+ var->ndigits = ndigits;
+
+#if DEC_DIGITS == 1
+ /* no within-digit stuff to worry about */
+#else
+ /* 0, or number of decimal digits to keep in last NBASE digit */
+ di %= DEC_DIGITS;
+
+ if (di > 0)
+ {
+ /* Must truncate within last NBASE digit */
+ NumericDigit *digits = var->digits;
+ int extra,
+ pow10;
+
+#if DEC_DIGITS == 4
+ pow10 = round_powers[di];
+#elif DEC_DIGITS == 2
+ pow10 = 10;
+#else
+#error unsupported NBASE
+#endif
+ extra = digits[--ndigits] % pow10;
+ digits[ndigits] -= extra;
+ }
+#endif
+ }
+ }
+}
+
+/*
+ * strip_var
+ *
+ * Strip any leading and trailing zeroes from a numeric variable
+ */
+static void
+strip_var(NumericVar *var)
+{
+ NumericDigit *digits = var->digits;
+ int ndigits = var->ndigits;
+
+ /* Strip leading zeroes */
+ while (ndigits > 0 && *digits == 0)
+ {
+ digits++;
+ var->weight--;
+ ndigits--;
+ }
+
+ /* Strip trailing zeroes */
+ while (ndigits > 0 && digits[ndigits - 1] == 0)
+ ndigits--;
+
+ /* If it's zero, normalize the sign and weight */
+ if (ndigits == 0)
+ {
+ var->sign = NUMERIC_POS;
+ var->weight = 0;
+ }
+
+ var->digits = digits;
+ var->ndigits = ndigits;
}