]> granicus.if.org Git - postgresql/commitdiff
Speed up operations on numeric, mostly by avoiding palloc() overhead.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 21 Nov 2012 13:53:35 +0000 (15:53 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 21 Nov 2012 13:53:35 +0000 (15:53 +0200)
In many functions, a NumericVar was initialized from an input Numeric, to be
passed as input to a calculation function. When the NumericVar is not
modified, the digits array of the NumericVar can point directly to the digits
array in the original Numeric, and we can avoid a palloc() and memcpy(). Add
init_var_from_num() function to initialize a var like that.

Remove dscale argument from get_str_from_var(), as all the callers just
passed the dscale of the variable. That means that the rounding it used to
do was not actually necessary, and get_str_from_var() no longer scribbles on
its input. That makes it safer in general, and allows us to use the new
init_var_from_num() function in e.g numeric_out().

Also modified numericvar_to_int8() to no scribble on its input either. It
creates a temporary copy to avoid that. To compensate, the callers no longer
need to create a temporary copy, so the net # of pallocs is the same, but this
is nicer.

In the passing, use a constant for the number 10 in get_str_from_var_sci(),
when calculating 10^exponent. Saves a palloc() and some cycles to convert
integer 10 to numeric.

Original patch by Kyotaro HORIGUCHI, with further changes by me. Reviewed
by Pavel Stehule.

src/backend/utils/adt/numeric.c

index 68c1f1de3b77ecf5562fed710e4cff8492f27663..b408df7faf4676e81a0a53d705c05887b5462d04 100644 (file)
@@ -276,6 +276,16 @@ static NumericDigit const_two_data[1] = {2};
 static NumericVar const_two =
 {1, 0, NUMERIC_POS, 0, NULL, const_two_data};
 
+#if DEC_DIGITS == 4 || DEC_DIGITS == 2
+static NumericDigit const_ten_data[1] = {10};
+static NumericVar const_ten =
+{1, 0, NUMERIC_POS, 0, NULL, const_ten_data};
+#elif DEC_DIGITS == 1
+static NumericDigit const_ten_data[1] = {1};
+static NumericVar const_ten =
+{1, 1, NUMERIC_POS, 0, NULL, const_ten_data};
+#endif
+
 #if DEC_DIGITS == 4
 static NumericDigit const_zero_point_five_data[1] = {5000};
 #elif DEC_DIGITS == 2
@@ -367,8 +377,9 @@ static void zero_var(NumericVar *var);
 static const char *set_var_from_str(const char *str, const char *cp,
                                 NumericVar *dest);
 static void set_var_from_num(Numeric value, NumericVar *dest);
+static void init_var_from_num(Numeric num, NumericVar *dest);
 static void set_var_from_var(NumericVar *value, NumericVar *dest);
-static char *get_str_from_var(NumericVar *var, int dscale);
+static char *get_str_from_var(NumericVar *var);
 static char *get_str_from_var_sci(NumericVar *var, int rscale);
 
 static Numeric make_result(NumericVar *var);
@@ -533,18 +544,10 @@ numeric_out(PG_FUNCTION_ARGS)
 
        /*
         * Get the number in the variable format.
-        *
-        * Even if we didn't need to change format, we'd still need to copy the
-        * value to have a modifiable copy for rounding.  set_var_from_num() also
-        * guarantees there is extra digit space in case we produce a carry out
-        * from rounding.
         */
-       init_var(&x);
-       set_var_from_num(num, &x);
+       init_var_from_num(num, &x);
 
-       str = get_str_from_var(&x, x.dscale);
-
-       free_var(&x);
+       str = get_str_from_var(&x);
 
        PG_RETURN_CSTRING(str);
 }
@@ -616,12 +619,10 @@ numeric_out_sci(Numeric num, int scale)
        if (NUMERIC_IS_NAN(num))
                return pstrdup("NaN");
 
-       init_var(&x);
-       set_var_from_num(num, &x);
+       init_var_from_num(num, &x);
 
        str = get_str_from_var_sci(&x, scale);
 
-       free_var(&x);
        return str;
 }
 
@@ -695,8 +696,7 @@ numeric_send(PG_FUNCTION_ARGS)
        StringInfoData buf;
        int                     i;
 
-       init_var(&x);
-       set_var_from_num(num, &x);
+       init_var_from_num(num, &x);
 
        pq_begintypsend(&buf);
 
@@ -707,8 +707,6 @@ numeric_send(PG_FUNCTION_ARGS)
        for (i = 0; i < x.ndigits; i++)
                pq_sendint(&buf, x.digits[i], sizeof(NumericDigit));
 
-       free_var(&x);
-
        PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
 }
 
@@ -1150,9 +1148,7 @@ numeric_ceil(PG_FUNCTION_ARGS)
        if (NUMERIC_IS_NAN(num))
                PG_RETURN_NUMERIC(make_result(&const_nan));
 
-       init_var(&result);
-
-       set_var_from_num(num, &result);
+       init_var_from_num(num, &result);
        ceil_var(&result, &result);
 
        res = make_result(&result);
@@ -1177,9 +1173,7 @@ numeric_floor(PG_FUNCTION_ARGS)
        if (NUMERIC_IS_NAN(num))
                PG_RETURN_NUMERIC(make_result(&const_nan));
 
-       init_var(&result);
-
-       set_var_from_num(num, &result);
+       init_var_from_num(num, &result);
        floor_var(&result, &result);
 
        res = make_result(&result);
@@ -1282,13 +1276,9 @@ compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
        NumericVar      bound2_var;
        NumericVar      operand_var;
 
-       init_var(&bound1_var);
-       init_var(&bound2_var);
-       init_var(&operand_var);
-
-       set_var_from_num(bound1, &bound1_var);
-       set_var_from_num(bound2, &bound2_var);
-       set_var_from_num(operand, &operand_var);
+       init_var_from_num(bound1, &bound1_var);
+       init_var_from_num(bound2, &bound2_var);
+       init_var_from_num(operand, &operand_var);
 
        if (cmp_var(&bound1_var, &bound2_var) < 0)
        {
@@ -1573,19 +1563,14 @@ numeric_add(PG_FUNCTION_ARGS)
        /*
         * Unpack the values, let add_var() compute the result and return it.
         */
-       init_var(&arg1);
-       init_var(&arg2);
-       init_var(&result);
-
-       set_var_from_num(num1, &arg1);
-       set_var_from_num(num2, &arg2);
+       init_var_from_num(num1, &arg1);
+       init_var_from_num(num2, &arg2);
 
+       init_var(&result);
        add_var(&arg1, &arg2, &result);
 
        res = make_result(&result);
 
-       free_var(&arg1);
-       free_var(&arg2);
        free_var(&result);
 
        PG_RETURN_NUMERIC(res);
@@ -1616,19 +1601,14 @@ numeric_sub(PG_FUNCTION_ARGS)
        /*
         * Unpack the values, let sub_var() compute the result and return it.
         */
-       init_var(&arg1);
-       init_var(&arg2);
-       init_var(&result);
-
-       set_var_from_num(num1, &arg1);
-       set_var_from_num(num2, &arg2);
+       init_var_from_num(num1, &arg1);
+       init_var_from_num(num2, &arg2);
 
+       init_var(&result);
        sub_var(&arg1, &arg2, &result);
 
        res = make_result(&result);
 
-       free_var(&arg1);
-       free_var(&arg2);
        free_var(&result);
 
        PG_RETURN_NUMERIC(res);
@@ -1663,19 +1643,14 @@ numeric_mul(PG_FUNCTION_ARGS)
         * we request exact representation for the product (rscale = sum(dscale of
         * arg1, dscale of arg2)).
         */
-       init_var(&arg1);
-       init_var(&arg2);
-       init_var(&result);
-
-       set_var_from_num(num1, &arg1);
-       set_var_from_num(num2, &arg2);
+       init_var_from_num(num1, &arg1);
+       init_var_from_num(num2, &arg2);
 
+       init_var(&result);
        mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
 
        res = make_result(&result);
 
-       free_var(&arg1);
-       free_var(&arg2);
        free_var(&result);
 
        PG_RETURN_NUMERIC(res);
@@ -1707,12 +1682,10 @@ numeric_div(PG_FUNCTION_ARGS)
        /*
         * Unpack the arguments
         */
-       init_var(&arg1);
-       init_var(&arg2);
-       init_var(&result);
+       init_var_from_num(num1, &arg1);
+       init_var_from_num(num2, &arg2);
 
-       set_var_from_num(num1, &arg1);
-       set_var_from_num(num2, &arg2);
+       init_var(&result);
 
        /*
         * Select scale for division result
@@ -1726,8 +1699,6 @@ numeric_div(PG_FUNCTION_ARGS)
 
        res = make_result(&result);
 
-       free_var(&arg1);
-       free_var(&arg2);
        free_var(&result);
 
        PG_RETURN_NUMERIC(res);
@@ -1758,12 +1729,10 @@ numeric_div_trunc(PG_FUNCTION_ARGS)
        /*
         * Unpack the arguments
         */
-       init_var(&arg1);
-       init_var(&arg2);
-       init_var(&result);
+       init_var_from_num(num1, &arg1);
+       init_var_from_num(num2, &arg2);
 
-       set_var_from_num(num1, &arg1);
-       set_var_from_num(num2, &arg2);
+       init_var(&result);
 
        /*
         * Do the divide and return the result
@@ -1772,8 +1741,6 @@ numeric_div_trunc(PG_FUNCTION_ARGS)
 
        res = make_result(&result);
 
-       free_var(&arg1);
-       free_var(&arg2);
        free_var(&result);
 
        PG_RETURN_NUMERIC(res);
@@ -1798,20 +1765,16 @@ numeric_mod(PG_FUNCTION_ARGS)
        if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
                PG_RETURN_NUMERIC(make_result(&const_nan));
 
-       init_var(&arg1);
-       init_var(&arg2);
-       init_var(&result);
+       init_var_from_num(num1, &arg1);
+       init_var_from_num(num2, &arg2);
 
-       set_var_from_num(num1, &arg1);
-       set_var_from_num(num2, &arg2);
+       init_var(&result);
 
        mod_var(&arg1, &arg2, &result);
 
        res = make_result(&result);
 
        free_var(&result);
-       free_var(&arg2);
-       free_var(&arg1);
 
        PG_RETURN_NUMERIC(res);
 }
@@ -1838,9 +1801,7 @@ numeric_inc(PG_FUNCTION_ARGS)
        /*
         * Compute the result and return it
         */
-       init_var(&arg);
-
-       set_var_from_num(num, &arg);
+       init_var_from_num(num, &arg);
 
        add_var(&arg, &const_one, &arg);
 
@@ -1977,10 +1938,9 @@ numeric_sqrt(PG_FUNCTION_ARGS)
         * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
         * case not less than the input's dscale.
         */
-       init_var(&arg);
-       init_var(&result);
+       init_var_from_num(num, &arg);
 
-       set_var_from_num(num, &arg);
+       init_var(&result);
 
        /* Assume the input was normalized, so arg.weight is accurate */
        sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
@@ -1998,7 +1958,6 @@ numeric_sqrt(PG_FUNCTION_ARGS)
        res = make_result(&result);
 
        free_var(&result);
-       free_var(&arg);
 
        PG_RETURN_NUMERIC(res);
 }
@@ -2030,10 +1989,9 @@ numeric_exp(PG_FUNCTION_ARGS)
         * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
         * case not less than the input's dscale.
         */
-       init_var(&arg);
-       init_var(&result);
+       init_var_from_num(num, &arg);
 
-       set_var_from_num(num, &arg);
+       init_var(&result);
 
        /* convert input to float8, ignoring overflow */
        val = numericvar_to_double_no_overflow(&arg);
@@ -2061,7 +2019,6 @@ numeric_exp(PG_FUNCTION_ARGS)
        res = make_result(&result);
 
        free_var(&result);
-       free_var(&arg);
 
        PG_RETURN_NUMERIC(res);
 }
@@ -2088,11 +2045,9 @@ numeric_ln(PG_FUNCTION_ARGS)
        if (NUMERIC_IS_NAN(num))
                PG_RETURN_NUMERIC(make_result(&const_nan));
 
-       init_var(&arg);
+       init_var_from_num(num, &arg);
        init_var(&result);
 
-       set_var_from_num(num, &arg);
-
        /* Approx decimal digits before decimal point */
        dec_digits = (arg.weight + 1) * DEC_DIGITS;
 
@@ -2112,7 +2067,6 @@ numeric_ln(PG_FUNCTION_ARGS)
        res = make_result(&result);
 
        free_var(&result);
-       free_var(&arg);
 
        PG_RETURN_NUMERIC(res);
 }
@@ -2142,13 +2096,10 @@ numeric_log(PG_FUNCTION_ARGS)
        /*
         * Initialize things
         */
-       init_var(&arg1);
-       init_var(&arg2);
+       init_var_from_num(num1, &arg1);
+       init_var_from_num(num2, &arg2);
        init_var(&result);
 
-       set_var_from_num(num1, &arg1);
-       set_var_from_num(num2, &arg2);
-
        /*
         * Call log_var() to compute and return the result; note it handles scale
         * selection itself.
@@ -2158,8 +2109,6 @@ numeric_log(PG_FUNCTION_ARGS)
        res = make_result(&result);
 
        free_var(&result);
-       free_var(&arg2);
-       free_var(&arg1);
 
        PG_RETURN_NUMERIC(res);
 }
@@ -2190,15 +2139,12 @@ numeric_power(PG_FUNCTION_ARGS)
        /*
         * Initialize things
         */
-       init_var(&arg1);
-       init_var(&arg2);
        init_var(&arg2_trunc);
        init_var(&result);
+       init_var_from_num(num1, &arg1);
+       init_var_from_num(num2, &arg2);
 
-       set_var_from_num(num1, &arg1);
-       set_var_from_num(num2, &arg2);
        set_var_from_var(&arg2, &arg2_trunc);
-
        trunc_var(&arg2_trunc, 0);
 
        /*
@@ -2227,9 +2173,7 @@ numeric_power(PG_FUNCTION_ARGS)
        res = make_result(&result);
 
        free_var(&result);
-       free_var(&arg2);
        free_var(&arg2_trunc);
-       free_var(&arg1);
 
        PG_RETURN_NUMERIC(res);
 }
@@ -2276,10 +2220,8 @@ numeric_int4(PG_FUNCTION_ARGS)
                                 errmsg("cannot convert NaN to integer")));
 
        /* Convert to variable format, then convert to int4 */
-       init_var(&x);
-       set_var_from_num(num, &x);
+       init_var_from_num(num, &x);
        result = numericvar_to_int4(&x);
-       free_var(&x);
        PG_RETURN_INT32(result);
 }
 
@@ -2344,16 +2286,13 @@ numeric_int8(PG_FUNCTION_ARGS)
                                 errmsg("cannot convert NaN to bigint")));
 
        /* Convert to variable format and thence to int8 */
-       init_var(&x);
-       set_var_from_num(num, &x);
+       init_var_from_num(num, &x);
 
        if (!numericvar_to_int8(&x, &result))
                ereport(ERROR,
                                (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
                                 errmsg("bigint out of range")));
 
-       free_var(&x);
-
        PG_RETURN_INT64(result);
 }
 
@@ -2392,16 +2331,13 @@ numeric_int2(PG_FUNCTION_ARGS)
                                 errmsg("cannot convert NaN to smallint")));
 
        /* Convert to variable format and thence to int8 */
-       init_var(&x);
-       set_var_from_num(num, &x);
+       init_var_from_num(num, &x);
 
        if (!numericvar_to_int8(&x, &val))
                ereport(ERROR,
                                (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
                                 errmsg("smallint out of range")));
 
-       free_var(&x);
-
        /* Down-convert to int2 */
        result = (int16) val;
 
@@ -2763,8 +2699,7 @@ numeric_stddev_internal(ArrayType *transarray,
        if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2))
                return make_result(&const_nan);
 
-       init_var(&vN);
-       set_var_from_num(N, &vN);
+       init_var_from_num(N, &vN);
 
        /*
         * Sample stddev and variance are undefined when N <= 1; population stddev
@@ -2777,7 +2712,6 @@ numeric_stddev_internal(ArrayType *transarray,
 
        if (cmp_var(&vN, comp) <= 0)
        {
-               free_var(&vN);
                *is_null = true;
                return NULL;
        }
@@ -2785,10 +2719,8 @@ numeric_stddev_internal(ArrayType *transarray,
        init_var(&vNminus1);
        sub_var(&vN, &const_one, &vNminus1);
 
-       init_var(&vsumX);
-       set_var_from_num(sumX, &vsumX);
-       init_var(&vsumX2);
-       set_var_from_num(sumX2, &vsumX2);
+       init_var_from_num(sumX, &vsumX);
+       init_var_from_num(sumX2, &vsumX2);
 
        /* compute rscale for mul_var calls */
        rscale = vsumX.dscale * 2;
@@ -2816,7 +2748,6 @@ numeric_stddev_internal(ArrayType *transarray,
                res = make_result(&vsumX);
        }
 
-       free_var(&vN);
        free_var(&vNminus1);
        free_var(&vsumX);
        free_var(&vsumX2);
@@ -3449,6 +3380,32 @@ set_var_from_num(Numeric num, NumericVar *dest)
 }
 
 
+/*
+ * init_var_from_num() -
+ *
+ *     Initialize a variable from packed db format. The digits array is not
+ *     copied, which saves some cycles when the resulting var is not modified.
+ *     Also, there's no need to call free_var(), as long as you don't assign any
+ *     other value to it (with set_var_* functions, or by using the var as the
+ *     destination of a function like add_var())
+ *
+ *     CAUTION: Do not modify the digits buffer of a var initialized with this
+ *     function, e.g by calling round_var() or trunc_var(), as the changes will
+ *     propagate to the original Numeric! It's OK to use it as the destination
+ *     argument of one of the calculational functions, though.
+ */
+static void
+init_var_from_num(Numeric num, NumericVar *dest)
+{
+       dest->ndigits = NUMERIC_NDIGITS(num);
+       dest->weight = NUMERIC_WEIGHT(num);
+       dest->sign = NUMERIC_SIGN(num);
+       dest->dscale = NUMERIC_DSCALE(num);
+       dest->digits = NUMERIC_DIGITS(num);
+       dest->buf = NULL;       /* digits array is not palloc'd */
+}
+
+
 /*
  * set_var_from_var() -
  *
@@ -3475,12 +3432,13 @@ set_var_from_var(NumericVar *value, NumericVar *dest)
  * get_str_from_var() -
  *
  *     Convert a var to text representation (guts of numeric_out).
- *     CAUTION: var's contents may be modified by rounding!
+ *     The var is displayed to the number of digits indicated by its dscale.
  *     Returns a palloc'd string.
  */
 static char *
-get_str_from_var(NumericVar *var, int dscale)
+get_str_from_var(NumericVar *var)
 {
+       int                     dscale;
        char       *str;
        char       *cp;
        char       *endcp;
@@ -3492,13 +3450,7 @@ get_str_from_var(NumericVar *var, int dscale)
        NumericDigit d1;
 #endif
 
-       if (dscale < 0)
-               dscale = 0;
-
-       /*
-        * Check if we must round up before printing the value and do so.
-        */
-       round_var(var, dscale);
+       dscale = var->dscale;
 
        /*
         * Allocate space for the result.
@@ -3634,8 +3586,6 @@ get_str_from_var(NumericVar *var, int dscale)
  *     rscale is the number of decimal digits desired after the decimal point in
  *     the output, negative values will be treated as meaning zero.
  *
- *     CAUTION: var's contents may be modified by rounding!
- *
  *     Returns a palloc'd string.
  */
 static char *
@@ -3694,10 +3644,9 @@ get_str_from_var_sci(NumericVar *var, int rscale)
        init_var(&denominator);
        init_var(&significand);
 
-       int8_to_numericvar((int64) 10, &denominator);
-       power_var_int(&denominator, exponent, &denominator, denom_scale);
+       power_var_int(&const_ten, exponent, &denominator, denom_scale);
        div_var(var, &denominator, &significand, rscale, true);
-       sig_out = get_str_from_var(&significand, rscale);
+       sig_out = get_str_from_var(&significand);
 
        free_var(&denominator);
        free_var(&significand);
@@ -3886,8 +3835,6 @@ apply_typmod(NumericVar *var, int32 typmod)
  * Convert numeric to int8, rounding if needed.
  *
  * If overflow, return FALSE (no error is raised).     Return TRUE if okay.
- *
- *     CAUTION: var's contents may be modified by rounding!
  */
 static bool
 numericvar_to_int8(NumericVar *var, int64 *result)
@@ -3899,16 +3846,20 @@ numericvar_to_int8(NumericVar *var, int64 *result)
        int64           val,
                                oldval;
        bool            neg;
+       NumericVar      rounded;
 
        /* Round to nearest integer */
-       round_var(var, 0);
+       init_var(&rounded);
+       set_var_from_var(var, &rounded);
+       round_var(&rounded, 0);
 
        /* Check for zero input */
-       strip_var(var);
-       ndigits = var->ndigits;
+       strip_var(&rounded);
+       ndigits = rounded.ndigits;
        if (ndigits == 0)
        {
                *result = 0;
+               free_var(&rounded);
                return true;
        }
 
@@ -3916,12 +3867,12 @@ numericvar_to_int8(NumericVar *var, int64 *result)
         * For input like 10000000000, we must treat stripped digits as real. So
         * the loop assumes there are weight+1 digits before the decimal point.
         */
-       weight = var->weight;
+       weight = rounded.weight;
        Assert(weight >= 0 && ndigits <= weight + 1);
 
        /* Construct the result */
-       digits = var->digits;
-       neg = (var->sign == NUMERIC_NEG);
+       digits = rounded.digits;
+       neg = (rounded.sign == NUMERIC_NEG);
        val = digits[0];
        for (i = 1; i <= weight; i++)
        {
@@ -3940,10 +3891,15 @@ numericvar_to_int8(NumericVar *var, int64 *result)
                if ((val / NBASE) != oldval)    /* possible overflow? */
                {
                        if (!neg || (-val) != val || val == 0 || oldval < 0)
+                       {
+                               free_var(&rounded);
                                return false;
+                       }
                }
        }
 
+       free_var(&rounded);
+
        *result = neg ? -val : val;
        return true;
 }
@@ -4030,7 +3986,7 @@ numericvar_to_double_no_overflow(NumericVar *var)
        double          val;
        char       *endptr;
 
-       tmp = get_str_from_var(var, var->dscale);
+       tmp = get_str_from_var(var);
 
        /* unlike float8in, we ignore ERANGE from strtod */
        val = strtod(tmp, &endptr);
@@ -5597,13 +5553,9 @@ power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
        if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1)
        {
                /* exact integer, but does it fit in int? */
-               NumericVar      x;
                int64           expval64;
 
-               /* must copy because numericvar_to_int8() scribbles on input */
-               init_var(&x);
-               set_var_from_var(exp, &x);
-               if (numericvar_to_int8(&x, &expval64))
+               if (numericvar_to_int8(exp, &expval64))
                {
                        int                     expval = (int) expval64;
 
@@ -5617,12 +5569,9 @@ power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
                                rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
 
                                power_var_int(base, expval, result, rscale);
-
-                               free_var(&x);
                                return;
                        }
                }
-               free_var(&x);
        }
 
        /*