]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/numeric.c
Detect overflow in integer arithmetic operators (integer, smallint, and
[postgresql] / src / backend / utils / adt / numeric.c
1 /*-------------------------------------------------------------------------
2  *
3  * numeric.c
4  *        An exact numeric data type for the Postgres database system
5  *
6  * Original coding 1998, Jan Wieck.  Heavily revised 2003, Tom Lane.
7  *
8  * Many of the algorithmic ideas are borrowed from David M. Smith's "FM"
9  * multiple-precision math library, most recently published as Algorithm
10  * 786: Multiple-Precision Complex Arithmetic and Functions, ACM
11  * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998,
12  * pages 359-367.
13  *
14  * Copyright (c) 1998-2004, PostgreSQL Global Development Group
15  *
16  * IDENTIFICATION
17  *        $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.80 2004/10/04 14:42:46 tgl Exp $
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include <ctype.h>
25 #include <float.h>
26 #include <limits.h>
27 #include <math.h>
28 #include <errno.h>
29
30 #include "catalog/pg_type.h"
31 #include "libpq/pqformat.h"
32 #include "utils/array.h"
33 #include "utils/builtins.h"
34 #include "utils/int8.h"
35 #include "utils/numeric.h"
36
37 /* ----------
38  * Uncomment the following to enable compilation of dump_numeric()
39  * and dump_var() and to get a dump of any result produced by make_result().
40  * ----------
41 #define NUMERIC_DEBUG
42  */
43
44
45 /* ----------
46  * Local data types
47  *
48  * Numeric values are represented in a base-NBASE floating point format.
49  * Each "digit" ranges from 0 to NBASE-1.  The type NumericDigit is signed
50  * and wide enough to store a digit.  We assume that NBASE*NBASE can fit in
51  * an int.      Although the purely calculational routines could handle any even
52  * NBASE that's less than sqrt(INT_MAX), in practice we are only interested
53  * in NBASE a power of ten, so that I/O conversions and decimal rounding
54  * are easy.  Also, it's actually more efficient if NBASE is rather less than
55  * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var to
56  * postpone processing carries.
57  * ----------
58  */
59
60 #if 0
61 #define NBASE           10
62 #define HALF_NBASE      5
63 #define DEC_DIGITS      1                       /* decimal digits per NBASE digit */
64 #define MUL_GUARD_DIGITS        4       /* these are measured in NBASE digits */
65 #define DIV_GUARD_DIGITS        8
66
67 typedef signed char NumericDigit;
68 #endif
69
70 #if 0
71 #define NBASE           100
72 #define HALF_NBASE      50
73 #define DEC_DIGITS      2                       /* decimal digits per NBASE digit */
74 #define MUL_GUARD_DIGITS        3       /* these are measured in NBASE digits */
75 #define DIV_GUARD_DIGITS        6
76
77 typedef signed char NumericDigit;
78 #endif
79
80 #if 1
81 #define NBASE           10000
82 #define HALF_NBASE      5000
83 #define DEC_DIGITS      4                       /* decimal digits per NBASE digit */
84 #define MUL_GUARD_DIGITS        2       /* these are measured in NBASE digits */
85 #define DIV_GUARD_DIGITS        4
86
87 typedef int16 NumericDigit;
88 #endif
89
90
91 /* ----------
92  * The value represented by a NumericVar is determined by the sign, weight,
93  * ndigits, and digits[] array.
94  * Note: the first digit of a NumericVar's value is assumed to be multiplied
95  * by NBASE ** weight.  Another way to say it is that there are weight+1
96  * digits before the decimal point.  It is possible to have weight < 0.
97  *
98  * buf points at the physical start of the palloc'd digit buffer for the
99  * NumericVar.  digits points at the first digit in actual use (the one
100  * with the specified weight).  We normally leave an unused digit or two
101  * (preset to zeroes) between buf and digits, so that there is room to store
102  * a carry out of the top digit without special pushups.  We just need to
103  * decrement digits (and increment weight) to make room for the carry digit.
104  * (There is no such extra space in a numeric value stored in the database,
105  * only in a NumericVar in memory.)
106  *
107  * If buf is NULL then the digit buffer isn't actually palloc'd and should
108  * not be freed --- see the constants below for an example.
109  *
110  * dscale, or display scale, is the nominal precision expressed as number
111  * of digits after the decimal point (it must always be >= 0 at present).
112  * dscale may be more than the number of physically stored fractional digits,
113  * implying that we have suppressed storage of significant trailing zeroes.
114  * It should never be less than the number of stored digits, since that would
115  * imply hiding digits that are present.  NOTE that dscale is always expressed
116  * in *decimal* digits, and so it may correspond to a fractional number of
117  * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits.
118  *
119  * rscale, or result scale, is the target precision for a computation.
120  * Like dscale it is expressed as number of *decimal* digits after the decimal
121  * point, and is always >= 0 at present.
122  * Note that rscale is not stored in variables --- it's figured on-the-fly
123  * from the dscales of the inputs.
124  *
125  * NB: All the variable-level functions are written in a style that makes it
126  * possible to give one and the same variable as argument and destination.
127  * This is feasible because the digit buffer is separate from the variable.
128  * ----------
129  */
130 typedef struct NumericVar
131 {
132         int                     ndigits;                /* # of digits in digits[] - can be 0! */
133         int                     weight;                 /* weight of first digit */
134         int                     sign;                   /* NUMERIC_POS, NUMERIC_NEG, or
135                                                                  * NUMERIC_NAN */
136         int                     dscale;                 /* display scale */
137         NumericDigit *buf;                      /* start of palloc'd space for digits[] */
138         NumericDigit *digits;           /* base-NBASE digits */
139 } NumericVar;
140
141
142 /* ----------
143  * Some preinitialized constants
144  * ----------
145  */
146 static NumericDigit const_zero_data[1] = {0};
147 static NumericVar const_zero =
148 {0, 0, NUMERIC_POS, 0, NULL, const_zero_data};
149
150 static NumericDigit const_one_data[1] = {1};
151 static NumericVar const_one =
152 {1, 0, NUMERIC_POS, 0, NULL, const_one_data};
153
154 static NumericDigit const_two_data[1] = {2};
155 static NumericVar const_two =
156 {1, 0, NUMERIC_POS, 0, NULL, const_two_data};
157
158 #if DEC_DIGITS == 4
159 static NumericDigit const_zero_point_five_data[1] = {5000};
160
161 #elif DEC_DIGITS == 2
162 static NumericDigit const_zero_point_five_data[1] = {50};
163
164 #elif DEC_DIGITS == 1
165 static NumericDigit const_zero_point_five_data[1] = {5};
166 #endif
167 static NumericVar const_zero_point_five =
168 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data};
169
170 #if DEC_DIGITS == 4
171 static NumericDigit const_zero_point_nine_data[1] = {9000};
172
173 #elif DEC_DIGITS == 2
174 static NumericDigit const_zero_point_nine_data[1] = {90};
175
176 #elif DEC_DIGITS == 1
177 static NumericDigit const_zero_point_nine_data[1] = {9};
178 #endif
179 static NumericVar const_zero_point_nine =
180 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data};
181
182 #if DEC_DIGITS == 4
183 static NumericDigit const_zero_point_01_data[1] = {100};
184 static NumericVar const_zero_point_01 =
185 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
186
187 #elif DEC_DIGITS == 2
188 static NumericDigit const_zero_point_01_data[1] = {1};
189 static NumericVar const_zero_point_01 =
190 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
191
192 #elif DEC_DIGITS == 1
193 static NumericDigit const_zero_point_01_data[1] = {1};
194 static NumericVar const_zero_point_01 =
195 {1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
196 #endif
197
198 #if DEC_DIGITS == 4
199 static NumericDigit const_one_point_one_data[2] = {1, 1000};
200
201 #elif DEC_DIGITS == 2
202 static NumericDigit const_one_point_one_data[2] = {1, 10};
203
204 #elif DEC_DIGITS == 1
205 static NumericDigit const_one_point_one_data[2] = {1, 1};
206 #endif
207 static NumericVar const_one_point_one =
208 {2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data};
209
210 static NumericVar const_nan =
211 {0, 0, NUMERIC_NAN, 0, NULL, NULL};
212
213 #if DEC_DIGITS == 4
214 static const int round_powers[4] = {0, 1000, 100, 10};
215 #endif
216
217
218 /* ----------
219  * Local functions
220  * ----------
221  */
222
223 #ifdef NUMERIC_DEBUG
224 static void dump_numeric(const char *str, Numeric num);
225 static void dump_var(const char *str, NumericVar *var);
226
227 #else
228 #define dump_numeric(s,n)
229 #define dump_var(s,v)
230 #endif
231
232 #define digitbuf_alloc(ndigits)  \
233         ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit)))
234 #define digitbuf_free(buf)      \
235         do { \
236                  if ((buf) != NULL) \
237                          pfree(buf); \
238         } while (0)
239
240 #define init_var(v)             MemSetAligned(v, 0, sizeof(NumericVar))
241
242 static void alloc_var(NumericVar *var, int ndigits);
243 static void free_var(NumericVar *var);
244 static void zero_var(NumericVar *var);
245
246 static void set_var_from_str(const char *str, NumericVar *dest);
247 static void set_var_from_num(Numeric value, NumericVar *dest);
248 static void set_var_from_var(NumericVar *value, NumericVar *dest);
249 static char *get_str_from_var(NumericVar *var, int dscale);
250
251 static Numeric make_result(NumericVar *var);
252
253 static void apply_typmod(NumericVar *var, int32 typmod);
254
255 static int32 numericvar_to_int4(NumericVar *var);
256 static bool numericvar_to_int8(NumericVar *var, int64 *result);
257 static void int8_to_numericvar(int64 val, NumericVar *var);
258 static double numeric_to_double_no_overflow(Numeric num);
259 static double numericvar_to_double_no_overflow(NumericVar *var);
260
261 static int      cmp_numerics(Numeric num1, Numeric num2);
262 static int      cmp_var(NumericVar *var1, NumericVar *var2);
263 static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
264 static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
265 static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
266                 int rscale);
267 static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
268                 int rscale);
269 static int      select_div_scale(NumericVar *var1, NumericVar *var2);
270 static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
271 static void ceil_var(NumericVar *var, NumericVar *result);
272 static void floor_var(NumericVar *var, NumericVar *result);
273
274 static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale);
275 static void exp_var(NumericVar *arg, NumericVar *result, int rscale);
276 static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale);
277 static void ln_var(NumericVar *arg, NumericVar *result, int rscale);
278 static void log_var(NumericVar *base, NumericVar *num, NumericVar *result);
279 static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result);
280 static void power_var_int(NumericVar *base, int exp, NumericVar *result,
281                           int rscale);
282
283 static int      cmp_abs(NumericVar *var1, NumericVar *var2);
284 static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
285 static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
286 static void round_var(NumericVar *var, int rscale);
287 static void trunc_var(NumericVar *var, int rscale);
288 static void strip_var(NumericVar *var);
289 static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
290                            NumericVar *count_var, NumericVar *result_var);
291
292
293 /* ----------------------------------------------------------------------
294  *
295  * Input-, output- and rounding-functions
296  *
297  * ----------------------------------------------------------------------
298  */
299
300
301 /*
302  * numeric_in() -
303  *
304  *      Input function for numeric data type
305  */
306 Datum
307 numeric_in(PG_FUNCTION_ARGS)
308 {
309         char       *str = PG_GETARG_CSTRING(0);
310
311 #ifdef NOT_USED
312         Oid                     typelem = PG_GETARG_OID(1);
313 #endif
314         int32           typmod = PG_GETARG_INT32(2);
315         NumericVar      value;
316         Numeric         res;
317
318         /*
319          * Check for NaN
320          */
321         if (pg_strcasecmp(str, "NaN") == 0)
322                 PG_RETURN_NUMERIC(make_result(&const_nan));
323
324         /*
325          * Use set_var_from_str() to parse the input string and return it in
326          * the packed DB storage format
327          */
328         init_var(&value);
329         set_var_from_str(str, &value);
330
331         apply_typmod(&value, typmod);
332
333         res = make_result(&value);
334         free_var(&value);
335
336         PG_RETURN_NUMERIC(res);
337 }
338
339
340 /*
341  * numeric_out() -
342  *
343  *      Output function for numeric data type
344  */
345 Datum
346 numeric_out(PG_FUNCTION_ARGS)
347 {
348         Numeric         num = PG_GETARG_NUMERIC(0);
349         NumericVar      x;
350         char       *str;
351
352         /*
353          * Handle NaN
354          */
355         if (NUMERIC_IS_NAN(num))
356                 PG_RETURN_CSTRING(pstrdup("NaN"));
357
358         /*
359          * Get the number in the variable format.
360          *
361          * Even if we didn't need to change format, we'd still need to copy the
362          * value to have a modifiable copy for rounding.  set_var_from_num()
363          * also guarantees there is extra digit space in case we produce a
364          * carry out from rounding.
365          */
366         init_var(&x);
367         set_var_from_num(num, &x);
368
369         str = get_str_from_var(&x, x.dscale);
370
371         free_var(&x);
372
373         PG_RETURN_CSTRING(str);
374 }
375
376 /*
377  *              numeric_recv                    - converts external binary format to numeric
378  *
379  * External format is a sequence of int16's:
380  * ndigits, weight, sign, dscale, NumericDigits.
381  */
382 Datum
383 numeric_recv(PG_FUNCTION_ARGS)
384 {
385         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
386         NumericVar      value;
387         Numeric         res;
388         int                     len,
389                                 i;
390
391         init_var(&value);
392
393         len = (uint16) pq_getmsgint(buf, sizeof(uint16));
394         if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE)
395                 ereport(ERROR,
396                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
397                                  errmsg("invalid length in external \"numeric\" value")));
398
399         alloc_var(&value, len);
400
401         value.weight = (int16) pq_getmsgint(buf, sizeof(int16));
402         value.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
403         if (!(value.sign == NUMERIC_POS ||
404                   value.sign == NUMERIC_NEG ||
405                   value.sign == NUMERIC_NAN))
406                 ereport(ERROR,
407                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
408                                  errmsg("invalid sign in external \"numeric\" value")));
409
410         value.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
411         for (i = 0; i < len; i++)
412         {
413                 NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
414
415                 if (d < 0 || d >= NBASE)
416                         ereport(ERROR,
417                                         (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
418                                  errmsg("invalid digit in external \"numeric\" value")));
419                 value.digits[i] = d;
420         }
421
422         res = make_result(&value);
423         free_var(&value);
424
425         PG_RETURN_NUMERIC(res);
426 }
427
428 /*
429  *              numeric_send                    - converts numeric to binary format
430  */
431 Datum
432 numeric_send(PG_FUNCTION_ARGS)
433 {
434         Numeric         num = PG_GETARG_NUMERIC(0);
435         NumericVar      x;
436         StringInfoData buf;
437         int                     i;
438
439         init_var(&x);
440         set_var_from_num(num, &x);
441
442         pq_begintypsend(&buf);
443
444         pq_sendint(&buf, x.ndigits, sizeof(int16));
445         pq_sendint(&buf, x.weight, sizeof(int16));
446         pq_sendint(&buf, x.sign, sizeof(int16));
447         pq_sendint(&buf, x.dscale, sizeof(int16));
448         for (i = 0; i < x.ndigits; i++)
449                 pq_sendint(&buf, x.digits[i], sizeof(NumericDigit));
450
451         free_var(&x);
452
453         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
454 }
455
456
457 /*
458  * numeric() -
459  *
460  *      This is a special function called by the Postgres database system
461  *      before a value is stored in a tuple's attribute. The precision and
462  *      scale of the attribute have to be applied on the value.
463  */
464 Datum
465 numeric         (PG_FUNCTION_ARGS)
466 {
467         Numeric         num = PG_GETARG_NUMERIC(0);
468         int32           typmod = PG_GETARG_INT32(1);
469         Numeric         new;
470         int32           tmp_typmod;
471         int                     precision;
472         int                     scale;
473         int                     ddigits;
474         int                     maxdigits;
475         NumericVar      var;
476
477         /*
478          * Handle NaN
479          */
480         if (NUMERIC_IS_NAN(num))
481                 PG_RETURN_NUMERIC(make_result(&const_nan));
482
483         /*
484          * If the value isn't a valid type modifier, simply return a copy of
485          * the input value
486          */
487         if (typmod < (int32) (VARHDRSZ))
488         {
489                 new = (Numeric) palloc(num->varlen);
490                 memcpy(new, num, num->varlen);
491                 PG_RETURN_NUMERIC(new);
492         }
493
494         /*
495          * Get the precision and scale out of the typmod value
496          */
497         tmp_typmod = typmod - VARHDRSZ;
498         precision = (tmp_typmod >> 16) & 0xffff;
499         scale = tmp_typmod & 0xffff;
500         maxdigits = precision - scale;
501
502         /*
503          * If the number is certainly in bounds and due to the target scale no
504          * rounding could be necessary, just make a copy of the input and
505          * modify its scale fields.  (Note we assume the existing dscale is
506          * honest...)
507          */
508         ddigits = (num->n_weight + 1) * DEC_DIGITS;
509         if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num))
510         {
511                 new = (Numeric) palloc(num->varlen);
512                 memcpy(new, num, num->varlen);
513                 new->n_sign_dscale = NUMERIC_SIGN(new) |
514                         ((uint16) scale & NUMERIC_DSCALE_MASK);
515                 PG_RETURN_NUMERIC(new);
516         }
517
518         /*
519          * We really need to fiddle with things - unpack the number into a
520          * variable and let apply_typmod() do it.
521          */
522         init_var(&var);
523
524         set_var_from_num(num, &var);
525         apply_typmod(&var, typmod);
526         new = make_result(&var);
527
528         free_var(&var);
529
530         PG_RETURN_NUMERIC(new);
531 }
532
533
534 /* ----------------------------------------------------------------------
535  *
536  * Sign manipulation, rounding and the like
537  *
538  * ----------------------------------------------------------------------
539  */
540
541 Datum
542 numeric_abs(PG_FUNCTION_ARGS)
543 {
544         Numeric         num = PG_GETARG_NUMERIC(0);
545         Numeric         res;
546
547         /*
548          * Handle NaN
549          */
550         if (NUMERIC_IS_NAN(num))
551                 PG_RETURN_NUMERIC(make_result(&const_nan));
552
553         /*
554          * Do it the easy way directly on the packed format
555          */
556         res = (Numeric) palloc(num->varlen);
557         memcpy(res, num, num->varlen);
558
559         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
560
561         PG_RETURN_NUMERIC(res);
562 }
563
564
565 Datum
566 numeric_uminus(PG_FUNCTION_ARGS)
567 {
568         Numeric         num = PG_GETARG_NUMERIC(0);
569         Numeric         res;
570
571         /*
572          * Handle NaN
573          */
574         if (NUMERIC_IS_NAN(num))
575                 PG_RETURN_NUMERIC(make_result(&const_nan));
576
577         /*
578          * Do it the easy way directly on the packed format
579          */
580         res = (Numeric) palloc(num->varlen);
581         memcpy(res, num, num->varlen);
582
583         /*
584          * The packed format is known to be totally zero digit trimmed always.
585          * So we can identify a ZERO by the fact that there are no digits at
586          * all.  Do nothing to a zero.
587          */
588         if (num->varlen != NUMERIC_HDRSZ)
589         {
590                 /* Else, flip the sign */
591                 if (NUMERIC_SIGN(num) == NUMERIC_POS)
592                         res->n_sign_dscale = NUMERIC_NEG | NUMERIC_DSCALE(num);
593                 else
594                         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
595         }
596
597         PG_RETURN_NUMERIC(res);
598 }
599
600
601 Datum
602 numeric_uplus(PG_FUNCTION_ARGS)
603 {
604         Numeric         num = PG_GETARG_NUMERIC(0);
605         Numeric         res;
606
607         res = (Numeric) palloc(num->varlen);
608         memcpy(res, num, num->varlen);
609
610         PG_RETURN_NUMERIC(res);
611 }
612
613 /*
614  * numeric_sign() -
615  *
616  * returns -1 if the argument is less than 0, 0 if the argument is equal
617  * to 0, and 1 if the argument is greater than zero.
618  */
619 Datum
620 numeric_sign(PG_FUNCTION_ARGS)
621 {
622         Numeric         num = PG_GETARG_NUMERIC(0);
623         Numeric         res;
624         NumericVar      result;
625
626         /*
627          * Handle NaN
628          */
629         if (NUMERIC_IS_NAN(num))
630                 PG_RETURN_NUMERIC(make_result(&const_nan));
631
632         init_var(&result);
633
634         /*
635          * The packed format is known to be totally zero digit trimmed always.
636          * So we can identify a ZERO by the fact that there are no digits at
637          * all.
638          */
639         if (num->varlen == NUMERIC_HDRSZ)
640                 set_var_from_var(&const_zero, &result);
641         else
642         {
643                 /*
644                  * And if there are some, we return a copy of ONE with the sign of
645                  * our argument
646                  */
647                 set_var_from_var(&const_one, &result);
648                 result.sign = NUMERIC_SIGN(num);
649         }
650
651         res = make_result(&result);
652         free_var(&result);
653
654         PG_RETURN_NUMERIC(res);
655 }
656
657
658 /*
659  * numeric_round() -
660  *
661  *      Round a value to have 'scale' digits after the decimal point.
662  *      We allow negative 'scale', implying rounding before the decimal
663  *      point --- Oracle interprets rounding that way.
664  */
665 Datum
666 numeric_round(PG_FUNCTION_ARGS)
667 {
668         Numeric         num = PG_GETARG_NUMERIC(0);
669         int32           scale = PG_GETARG_INT32(1);
670         Numeric         res;
671         NumericVar      arg;
672
673         /*
674          * Handle NaN
675          */
676         if (NUMERIC_IS_NAN(num))
677                 PG_RETURN_NUMERIC(make_result(&const_nan));
678
679         /*
680          * Limit the scale value to avoid possible overflow in calculations
681          */
682         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
683         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
684
685         /*
686          * Unpack the argument and round it at the proper digit position
687          */
688         init_var(&arg);
689         set_var_from_num(num, &arg);
690
691         round_var(&arg, scale);
692
693         /* We don't allow negative output dscale */
694         if (scale < 0)
695                 arg.dscale = 0;
696
697         /*
698          * Return the rounded result
699          */
700         res = make_result(&arg);
701
702         free_var(&arg);
703         PG_RETURN_NUMERIC(res);
704 }
705
706
707 /*
708  * numeric_trunc() -
709  *
710  *      Truncate a value to have 'scale' digits after the decimal point.
711  *      We allow negative 'scale', implying a truncation before the decimal
712  *      point --- Oracle interprets truncation that way.
713  */
714 Datum
715 numeric_trunc(PG_FUNCTION_ARGS)
716 {
717         Numeric         num = PG_GETARG_NUMERIC(0);
718         int32           scale = PG_GETARG_INT32(1);
719         Numeric         res;
720         NumericVar      arg;
721
722         /*
723          * Handle NaN
724          */
725         if (NUMERIC_IS_NAN(num))
726                 PG_RETURN_NUMERIC(make_result(&const_nan));
727
728         /*
729          * Limit the scale value to avoid possible overflow in calculations
730          */
731         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
732         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
733
734         /*
735          * Unpack the argument and truncate it at the proper digit position
736          */
737         init_var(&arg);
738         set_var_from_num(num, &arg);
739
740         trunc_var(&arg, scale);
741
742         /* We don't allow negative output dscale */
743         if (scale < 0)
744                 arg.dscale = 0;
745
746         /*
747          * Return the truncated result
748          */
749         res = make_result(&arg);
750
751         free_var(&arg);
752         PG_RETURN_NUMERIC(res);
753 }
754
755
756 /*
757  * numeric_ceil() -
758  *
759  *      Return the smallest integer greater than or equal to the argument
760  */
761 Datum
762 numeric_ceil(PG_FUNCTION_ARGS)
763 {
764         Numeric         num = PG_GETARG_NUMERIC(0);
765         Numeric         res;
766         NumericVar      result;
767
768         if (NUMERIC_IS_NAN(num))
769                 PG_RETURN_NUMERIC(make_result(&const_nan));
770
771         init_var(&result);
772
773         set_var_from_num(num, &result);
774         ceil_var(&result, &result);
775
776         res = make_result(&result);
777         free_var(&result);
778
779         PG_RETURN_NUMERIC(res);
780 }
781
782
783 /*
784  * numeric_floor() -
785  *
786  *      Return the largest integer equal to or less than the argument
787  */
788 Datum
789 numeric_floor(PG_FUNCTION_ARGS)
790 {
791         Numeric         num = PG_GETARG_NUMERIC(0);
792         Numeric         res;
793         NumericVar      result;
794
795         if (NUMERIC_IS_NAN(num))
796                 PG_RETURN_NUMERIC(make_result(&const_nan));
797
798         init_var(&result);
799
800         set_var_from_num(num, &result);
801         floor_var(&result, &result);
802
803         res = make_result(&result);
804         free_var(&result);
805
806         PG_RETURN_NUMERIC(res);
807 }
808
809 /*
810  * width_bucket_numeric() -
811  *
812  * 'bound1' and 'bound2' are the lower and upper bounds of the
813  * histogram's range, respectively. 'count' is the number of buckets
814  * in the histogram. width_bucket() returns an integer indicating the
815  * bucket number that 'operand' belongs in for an equiwidth histogram
816  * with the specified characteristics. An operand smaller than the
817  * lower bound is assigned to bucket 0. An operand greater than the
818  * upper bound is assigned to an additional bucket (with number
819  * count+1).
820  */
821 Datum
822 width_bucket_numeric(PG_FUNCTION_ARGS)
823 {
824         Numeric         operand = PG_GETARG_NUMERIC(0);
825         Numeric         bound1 = PG_GETARG_NUMERIC(1);
826         Numeric         bound2 = PG_GETARG_NUMERIC(2);
827         int32           count = PG_GETARG_INT32(3);
828         NumericVar      count_var;
829         NumericVar      result_var;
830         int32           result;
831
832         if (count <= 0)
833                 ereport(ERROR,
834                         (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
835                          errmsg("count must be greater than zero")));
836
837         init_var(&result_var);
838         init_var(&count_var);
839
840         /* Convert 'count' to a numeric, for ease of use later */
841         int8_to_numericvar((int64) count, &count_var);
842
843         switch (cmp_numerics(bound1, bound2))
844         {
845                 case 0:
846                         ereport(ERROR,
847                         (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
848                          errmsg("lower bound cannot equal upper bound")));
849
850                         /* bound1 < bound2 */
851                 case -1:
852                         if (cmp_numerics(operand, bound1) < 0)
853                                 set_var_from_var(&const_zero, &result_var);
854                         else if (cmp_numerics(operand, bound2) >= 0)
855                                 add_var(&count_var, &const_one, &result_var);
856                         else
857                                 compute_bucket(operand, bound1, bound2,
858                                                            &count_var, &result_var);
859                         break;
860
861                         /* bound1 > bound2 */
862                 case 1:
863                         if (cmp_numerics(operand, bound1) > 0)
864                                 set_var_from_var(&const_zero, &result_var);
865                         else if (cmp_numerics(operand, bound2) <= 0)
866                                 add_var(&count_var, &const_one, &result_var);
867                         else
868                                 compute_bucket(operand, bound1, bound2,
869                                                            &count_var, &result_var);
870                         break;
871         }
872
873         result = numericvar_to_int4(&result_var);
874
875         free_var(&count_var);
876         free_var(&result_var);
877
878         PG_RETURN_INT32(result);
879 }
880
881 /*
882  * compute_bucket() -
883  *
884  * If 'operand' is not outside the bucket range, determine the correct
885  * bucket for it to go. The calculations performed by this function
886  * are derived directly from the SQL2003 spec.
887  */
888 static void
889 compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
890                            NumericVar *count_var, NumericVar *result_var)
891 {
892         NumericVar      bound1_var;
893         NumericVar      bound2_var;
894         NumericVar      operand_var;
895
896         init_var(&bound1_var);
897         init_var(&bound2_var);
898         init_var(&operand_var);
899
900         set_var_from_num(bound1, &bound1_var);
901         set_var_from_num(bound2, &bound2_var);
902         set_var_from_num(operand, &operand_var);
903
904         if (cmp_var(&bound1_var, &bound2_var) < 0)
905         {
906                 sub_var(&operand_var, &bound1_var, &operand_var);
907                 sub_var(&bound2_var, &bound1_var, &bound2_var);
908                 div_var(&operand_var, &bound2_var, result_var,
909                                 select_div_scale(&operand_var, &bound2_var));
910         }
911         else
912         {
913                 sub_var(&bound1_var, &operand_var, &operand_var);
914                 sub_var(&bound1_var, &bound2_var, &bound1_var);
915                 div_var(&operand_var, &bound1_var, result_var,
916                                 select_div_scale(&operand_var, &bound1_var));
917         }
918
919         mul_var(result_var, count_var, result_var,
920                         result_var->dscale + count_var->dscale);
921         add_var(result_var, &const_one, result_var);
922         floor_var(result_var, result_var);
923
924         free_var(&bound1_var);
925         free_var(&bound2_var);
926         free_var(&operand_var);
927 }
928
929 /* ----------------------------------------------------------------------
930  *
931  * Comparison functions
932  *
933  * Note: btree indexes need these routines not to leak memory; therefore,
934  * be careful to free working copies of toasted datums.  Most places don't
935  * need to be so careful.
936  * ----------------------------------------------------------------------
937  */
938
939
940 Datum
941 numeric_cmp(PG_FUNCTION_ARGS)
942 {
943         Numeric         num1 = PG_GETARG_NUMERIC(0);
944         Numeric         num2 = PG_GETARG_NUMERIC(1);
945         int                     result;
946
947         result = cmp_numerics(num1, num2);
948
949         PG_FREE_IF_COPY(num1, 0);
950         PG_FREE_IF_COPY(num2, 1);
951
952         PG_RETURN_INT32(result);
953 }
954
955
956 Datum
957 numeric_eq(PG_FUNCTION_ARGS)
958 {
959         Numeric         num1 = PG_GETARG_NUMERIC(0);
960         Numeric         num2 = PG_GETARG_NUMERIC(1);
961         bool            result;
962
963         result = cmp_numerics(num1, num2) == 0;
964
965         PG_FREE_IF_COPY(num1, 0);
966         PG_FREE_IF_COPY(num2, 1);
967
968         PG_RETURN_BOOL(result);
969 }
970
971 Datum
972 numeric_ne(PG_FUNCTION_ARGS)
973 {
974         Numeric         num1 = PG_GETARG_NUMERIC(0);
975         Numeric         num2 = PG_GETARG_NUMERIC(1);
976         bool            result;
977
978         result = cmp_numerics(num1, num2) != 0;
979
980         PG_FREE_IF_COPY(num1, 0);
981         PG_FREE_IF_COPY(num2, 1);
982
983         PG_RETURN_BOOL(result);
984 }
985
986 Datum
987 numeric_gt(PG_FUNCTION_ARGS)
988 {
989         Numeric         num1 = PG_GETARG_NUMERIC(0);
990         Numeric         num2 = PG_GETARG_NUMERIC(1);
991         bool            result;
992
993         result = cmp_numerics(num1, num2) > 0;
994
995         PG_FREE_IF_COPY(num1, 0);
996         PG_FREE_IF_COPY(num2, 1);
997
998         PG_RETURN_BOOL(result);
999 }
1000
1001 Datum
1002 numeric_ge(PG_FUNCTION_ARGS)
1003 {
1004         Numeric         num1 = PG_GETARG_NUMERIC(0);
1005         Numeric         num2 = PG_GETARG_NUMERIC(1);
1006         bool            result;
1007
1008         result = cmp_numerics(num1, num2) >= 0;
1009
1010         PG_FREE_IF_COPY(num1, 0);
1011         PG_FREE_IF_COPY(num2, 1);
1012
1013         PG_RETURN_BOOL(result);
1014 }
1015
1016 Datum
1017 numeric_lt(PG_FUNCTION_ARGS)
1018 {
1019         Numeric         num1 = PG_GETARG_NUMERIC(0);
1020         Numeric         num2 = PG_GETARG_NUMERIC(1);
1021         bool            result;
1022
1023         result = cmp_numerics(num1, num2) < 0;
1024
1025         PG_FREE_IF_COPY(num1, 0);
1026         PG_FREE_IF_COPY(num2, 1);
1027
1028         PG_RETURN_BOOL(result);
1029 }
1030
1031 Datum
1032 numeric_le(PG_FUNCTION_ARGS)
1033 {
1034         Numeric         num1 = PG_GETARG_NUMERIC(0);
1035         Numeric         num2 = PG_GETARG_NUMERIC(1);
1036         bool            result;
1037
1038         result = cmp_numerics(num1, num2) <= 0;
1039
1040         PG_FREE_IF_COPY(num1, 0);
1041         PG_FREE_IF_COPY(num2, 1);
1042
1043         PG_RETURN_BOOL(result);
1044 }
1045
1046 static int
1047 cmp_numerics(Numeric num1, Numeric num2)
1048 {
1049         int                     result;
1050
1051         /*
1052          * We consider all NANs to be equal and larger than any non-NAN. This
1053          * is somewhat arbitrary; the important thing is to have a consistent
1054          * sort order.
1055          */
1056         if (NUMERIC_IS_NAN(num1))
1057         {
1058                 if (NUMERIC_IS_NAN(num2))
1059                         result = 0;                     /* NAN = NAN */
1060                 else
1061                         result = 1;                     /* NAN > non-NAN */
1062         }
1063         else if (NUMERIC_IS_NAN(num2))
1064         {
1065                 result = -1;                    /* non-NAN < NAN */
1066         }
1067         else
1068         {
1069                 NumericVar      arg1;
1070                 NumericVar      arg2;
1071
1072                 init_var(&arg1);
1073                 init_var(&arg2);
1074
1075                 set_var_from_num(num1, &arg1);
1076                 set_var_from_num(num2, &arg2);
1077
1078                 result = cmp_var(&arg1, &arg2);
1079
1080                 free_var(&arg1);
1081                 free_var(&arg2);
1082         }
1083
1084         return result;
1085 }
1086
1087
1088 /* ----------------------------------------------------------------------
1089  *
1090  * Basic arithmetic functions
1091  *
1092  * ----------------------------------------------------------------------
1093  */
1094
1095
1096 /*
1097  * numeric_add() -
1098  *
1099  *      Add two numerics
1100  */
1101 Datum
1102 numeric_add(PG_FUNCTION_ARGS)
1103 {
1104         Numeric         num1 = PG_GETARG_NUMERIC(0);
1105         Numeric         num2 = PG_GETARG_NUMERIC(1);
1106         NumericVar      arg1;
1107         NumericVar      arg2;
1108         NumericVar      result;
1109         Numeric         res;
1110
1111         /*
1112          * Handle NaN
1113          */
1114         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1115                 PG_RETURN_NUMERIC(make_result(&const_nan));
1116
1117         /*
1118          * Unpack the values, let add_var() compute the result and return it.
1119          */
1120         init_var(&arg1);
1121         init_var(&arg2);
1122         init_var(&result);
1123
1124         set_var_from_num(num1, &arg1);
1125         set_var_from_num(num2, &arg2);
1126
1127         add_var(&arg1, &arg2, &result);
1128
1129         res = make_result(&result);
1130
1131         free_var(&arg1);
1132         free_var(&arg2);
1133         free_var(&result);
1134
1135         PG_RETURN_NUMERIC(res);
1136 }
1137
1138
1139 /*
1140  * numeric_sub() -
1141  *
1142  *      Subtract one numeric from another
1143  */
1144 Datum
1145 numeric_sub(PG_FUNCTION_ARGS)
1146 {
1147         Numeric         num1 = PG_GETARG_NUMERIC(0);
1148         Numeric         num2 = PG_GETARG_NUMERIC(1);
1149         NumericVar      arg1;
1150         NumericVar      arg2;
1151         NumericVar      result;
1152         Numeric         res;
1153
1154         /*
1155          * Handle NaN
1156          */
1157         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1158                 PG_RETURN_NUMERIC(make_result(&const_nan));
1159
1160         /*
1161          * Unpack the values, let sub_var() compute the result and return it.
1162          */
1163         init_var(&arg1);
1164         init_var(&arg2);
1165         init_var(&result);
1166
1167         set_var_from_num(num1, &arg1);
1168         set_var_from_num(num2, &arg2);
1169
1170         sub_var(&arg1, &arg2, &result);
1171
1172         res = make_result(&result);
1173
1174         free_var(&arg1);
1175         free_var(&arg2);
1176         free_var(&result);
1177
1178         PG_RETURN_NUMERIC(res);
1179 }
1180
1181
1182 /*
1183  * numeric_mul() -
1184  *
1185  *      Calculate the product of two numerics
1186  */
1187 Datum
1188 numeric_mul(PG_FUNCTION_ARGS)
1189 {
1190         Numeric         num1 = PG_GETARG_NUMERIC(0);
1191         Numeric         num2 = PG_GETARG_NUMERIC(1);
1192         NumericVar      arg1;
1193         NumericVar      arg2;
1194         NumericVar      result;
1195         Numeric         res;
1196
1197         /*
1198          * Handle NaN
1199          */
1200         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1201                 PG_RETURN_NUMERIC(make_result(&const_nan));
1202
1203         /*
1204          * Unpack the values, let mul_var() compute the result and return it.
1205          * Unlike add_var() and sub_var(), mul_var() will round its result. In
1206          * the case of numeric_mul(), which is invoked for the * operator on
1207          * numerics, we request exact representation for the product (rscale =
1208          * sum(dscale of arg1, dscale of arg2)).
1209          */
1210         init_var(&arg1);
1211         init_var(&arg2);
1212         init_var(&result);
1213
1214         set_var_from_num(num1, &arg1);
1215         set_var_from_num(num2, &arg2);
1216
1217         mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
1218
1219         res = make_result(&result);
1220
1221         free_var(&arg1);
1222         free_var(&arg2);
1223         free_var(&result);
1224
1225         PG_RETURN_NUMERIC(res);
1226 }
1227
1228
1229 /*
1230  * numeric_div() -
1231  *
1232  *      Divide one numeric into another
1233  */
1234 Datum
1235 numeric_div(PG_FUNCTION_ARGS)
1236 {
1237         Numeric         num1 = PG_GETARG_NUMERIC(0);
1238         Numeric         num2 = PG_GETARG_NUMERIC(1);
1239         NumericVar      arg1;
1240         NumericVar      arg2;
1241         NumericVar      result;
1242         Numeric         res;
1243         int                     rscale;
1244
1245         /*
1246          * Handle NaN
1247          */
1248         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1249                 PG_RETURN_NUMERIC(make_result(&const_nan));
1250
1251         /*
1252          * Unpack the arguments
1253          */
1254         init_var(&arg1);
1255         init_var(&arg2);
1256         init_var(&result);
1257
1258         set_var_from_num(num1, &arg1);
1259         set_var_from_num(num2, &arg2);
1260
1261         /*
1262          * Select scale for division result
1263          */
1264         rscale = select_div_scale(&arg1, &arg2);
1265
1266         /*
1267          * Do the divide and return the result
1268          */
1269         div_var(&arg1, &arg2, &result, rscale);
1270
1271         res = make_result(&result);
1272
1273         free_var(&arg1);
1274         free_var(&arg2);
1275         free_var(&result);
1276
1277         PG_RETURN_NUMERIC(res);
1278 }
1279
1280
1281 /*
1282  * numeric_mod() -
1283  *
1284  *      Calculate the modulo of two numerics
1285  */
1286 Datum
1287 numeric_mod(PG_FUNCTION_ARGS)
1288 {
1289         Numeric         num1 = PG_GETARG_NUMERIC(0);
1290         Numeric         num2 = PG_GETARG_NUMERIC(1);
1291         Numeric         res;
1292         NumericVar      arg1;
1293         NumericVar      arg2;
1294         NumericVar      result;
1295
1296         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1297                 PG_RETURN_NUMERIC(make_result(&const_nan));
1298
1299         init_var(&arg1);
1300         init_var(&arg2);
1301         init_var(&result);
1302
1303         set_var_from_num(num1, &arg1);
1304         set_var_from_num(num2, &arg2);
1305
1306         mod_var(&arg1, &arg2, &result);
1307
1308         res = make_result(&result);
1309
1310         free_var(&result);
1311         free_var(&arg2);
1312         free_var(&arg1);
1313
1314         PG_RETURN_NUMERIC(res);
1315 }
1316
1317
1318 /*
1319  * numeric_inc() -
1320  *
1321  *      Increment a number by one
1322  */
1323 Datum
1324 numeric_inc(PG_FUNCTION_ARGS)
1325 {
1326         Numeric         num = PG_GETARG_NUMERIC(0);
1327         NumericVar      arg;
1328         Numeric         res;
1329
1330         /*
1331          * Handle NaN
1332          */
1333         if (NUMERIC_IS_NAN(num))
1334                 PG_RETURN_NUMERIC(make_result(&const_nan));
1335
1336         /*
1337          * Compute the result and return it
1338          */
1339         init_var(&arg);
1340
1341         set_var_from_num(num, &arg);
1342
1343         add_var(&arg, &const_one, &arg);
1344
1345         res = make_result(&arg);
1346
1347         free_var(&arg);
1348
1349         PG_RETURN_NUMERIC(res);
1350 }
1351
1352
1353 /*
1354  * numeric_smaller() -
1355  *
1356  *      Return the smaller of two numbers
1357  */
1358 Datum
1359 numeric_smaller(PG_FUNCTION_ARGS)
1360 {
1361         Numeric         num1 = PG_GETARG_NUMERIC(0);
1362         Numeric         num2 = PG_GETARG_NUMERIC(1);
1363
1364         /*
1365          * Use cmp_numerics so that this will agree with the comparison
1366          * operators, particularly as regards comparisons involving NaN.
1367          */
1368         if (cmp_numerics(num1, num2) < 0)
1369                 PG_RETURN_NUMERIC(num1);
1370         else
1371                 PG_RETURN_NUMERIC(num2);
1372 }
1373
1374
1375 /*
1376  * numeric_larger() -
1377  *
1378  *      Return the larger of two numbers
1379  */
1380 Datum
1381 numeric_larger(PG_FUNCTION_ARGS)
1382 {
1383         Numeric         num1 = PG_GETARG_NUMERIC(0);
1384         Numeric         num2 = PG_GETARG_NUMERIC(1);
1385
1386         /*
1387          * Use cmp_numerics so that this will agree with the comparison
1388          * operators, particularly as regards comparisons involving NaN.
1389          */
1390         if (cmp_numerics(num1, num2) > 0)
1391                 PG_RETURN_NUMERIC(num1);
1392         else
1393                 PG_RETURN_NUMERIC(num2);
1394 }
1395
1396
1397 /* ----------------------------------------------------------------------
1398  *
1399  * Advanced math functions
1400  *
1401  * ----------------------------------------------------------------------
1402  */
1403
1404 /*
1405  * numeric_fac()
1406  *
1407  * Compute factorial
1408  */
1409 Datum
1410 numeric_fac(PG_FUNCTION_ARGS)
1411 {
1412         int64           num = PG_GETARG_INT64(0);
1413         Numeric         res;
1414         NumericVar      fact;
1415         NumericVar      result;
1416
1417         if (num <= 1)
1418         {
1419                 res = make_result(&const_one);
1420                 PG_RETURN_NUMERIC(res);
1421         }
1422
1423         init_var(&fact);
1424         init_var(&result);
1425
1426         int8_to_numericvar(num, &result);
1427
1428         for (num = num - 1; num > 1; num--)
1429         {
1430                 int8_to_numericvar(num, &fact);
1431
1432                 mul_var(&result, &fact, &result, 0);
1433         }
1434
1435         res = make_result(&result);
1436
1437         free_var(&fact);
1438         free_var(&result);
1439
1440         PG_RETURN_NUMERIC(res);
1441 }
1442
1443
1444 /*
1445  * numeric_sqrt() -
1446  *
1447  *      Compute the square root of a numeric.
1448  */
1449 Datum
1450 numeric_sqrt(PG_FUNCTION_ARGS)
1451 {
1452         Numeric         num = PG_GETARG_NUMERIC(0);
1453         Numeric         res;
1454         NumericVar      arg;
1455         NumericVar      result;
1456         int                     sweight;
1457         int                     rscale;
1458
1459         /*
1460          * Handle NaN
1461          */
1462         if (NUMERIC_IS_NAN(num))
1463                 PG_RETURN_NUMERIC(make_result(&const_nan));
1464
1465         /*
1466          * Unpack the argument and determine the result scale.  We choose a
1467          * scale to give at least NUMERIC_MIN_SIG_DIGITS significant digits;
1468          * but in any case not less than the input's dscale.
1469          */
1470         init_var(&arg);
1471         init_var(&result);
1472
1473         set_var_from_num(num, &arg);
1474
1475         /* Assume the input was normalized, so arg.weight is accurate */
1476         sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
1477
1478         rscale = NUMERIC_MIN_SIG_DIGITS - sweight;
1479         rscale = Max(rscale, arg.dscale);
1480         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1481         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1482
1483         /*
1484          * Let sqrt_var() do the calculation and return the result.
1485          */
1486         sqrt_var(&arg, &result, rscale);
1487
1488         res = make_result(&result);
1489
1490         free_var(&result);
1491         free_var(&arg);
1492
1493         PG_RETURN_NUMERIC(res);
1494 }
1495
1496
1497 /*
1498  * numeric_exp() -
1499  *
1500  *      Raise e to the power of x
1501  */
1502 Datum
1503 numeric_exp(PG_FUNCTION_ARGS)
1504 {
1505         Numeric         num = PG_GETARG_NUMERIC(0);
1506         Numeric         res;
1507         NumericVar      arg;
1508         NumericVar      result;
1509         int                     rscale;
1510         double          val;
1511
1512         /*
1513          * Handle NaN
1514          */
1515         if (NUMERIC_IS_NAN(num))
1516                 PG_RETURN_NUMERIC(make_result(&const_nan));
1517
1518         /*
1519          * Unpack the argument and determine the result scale.  We choose a
1520          * scale to give at least NUMERIC_MIN_SIG_DIGITS significant digits;
1521          * but in any case not less than the input's dscale.
1522          */
1523         init_var(&arg);
1524         init_var(&result);
1525
1526         set_var_from_num(num, &arg);
1527
1528         /* convert input to float8, ignoring overflow */
1529         val = numericvar_to_double_no_overflow(&arg);
1530
1531         /*
1532          * log10(result) = num * log10(e), so this is approximately the
1533          * decimal weight of the result:
1534          */
1535         val *= 0.434294481903252;
1536
1537         /* limit to something that won't cause integer overflow */
1538         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
1539         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
1540
1541         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
1542         rscale = Max(rscale, arg.dscale);
1543         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1544         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1545
1546         /*
1547          * Let exp_var() do the calculation and return the result.
1548          */
1549         exp_var(&arg, &result, rscale);
1550
1551         res = make_result(&result);
1552
1553         free_var(&result);
1554         free_var(&arg);
1555
1556         PG_RETURN_NUMERIC(res);
1557 }
1558
1559
1560 /*
1561  * numeric_ln() -
1562  *
1563  *      Compute the natural logarithm of x
1564  */
1565 Datum
1566 numeric_ln(PG_FUNCTION_ARGS)
1567 {
1568         Numeric         num = PG_GETARG_NUMERIC(0);
1569         Numeric         res;
1570         NumericVar      arg;
1571         NumericVar      result;
1572         int                     dec_digits;
1573         int                     rscale;
1574
1575         /*
1576          * Handle NaN
1577          */
1578         if (NUMERIC_IS_NAN(num))
1579                 PG_RETURN_NUMERIC(make_result(&const_nan));
1580
1581         init_var(&arg);
1582         init_var(&result);
1583
1584         set_var_from_num(num, &arg);
1585
1586         /* Approx decimal digits before decimal point */
1587         dec_digits = (arg.weight + 1) * DEC_DIGITS;
1588
1589         if (dec_digits > 1)
1590                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
1591         else if (dec_digits < 1)
1592                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
1593         else
1594                 rscale = NUMERIC_MIN_SIG_DIGITS;
1595
1596         rscale = Max(rscale, arg.dscale);
1597         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1598         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1599
1600         ln_var(&arg, &result, rscale);
1601
1602         res = make_result(&result);
1603
1604         free_var(&result);
1605         free_var(&arg);
1606
1607         PG_RETURN_NUMERIC(res);
1608 }
1609
1610
1611 /*
1612  * numeric_log() -
1613  *
1614  *      Compute the logarithm of x in a given base
1615  */
1616 Datum
1617 numeric_log(PG_FUNCTION_ARGS)
1618 {
1619         Numeric         num1 = PG_GETARG_NUMERIC(0);
1620         Numeric         num2 = PG_GETARG_NUMERIC(1);
1621         Numeric         res;
1622         NumericVar      arg1;
1623         NumericVar      arg2;
1624         NumericVar      result;
1625
1626         /*
1627          * Handle NaN
1628          */
1629         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1630                 PG_RETURN_NUMERIC(make_result(&const_nan));
1631
1632         /*
1633          * Initialize things
1634          */
1635         init_var(&arg1);
1636         init_var(&arg2);
1637         init_var(&result);
1638
1639         set_var_from_num(num1, &arg1);
1640         set_var_from_num(num2, &arg2);
1641
1642         /*
1643          * Call log_var() to compute and return the result; note it handles
1644          * scale selection itself.
1645          */
1646         log_var(&arg1, &arg2, &result);
1647
1648         res = make_result(&result);
1649
1650         free_var(&result);
1651         free_var(&arg2);
1652         free_var(&arg1);
1653
1654         PG_RETURN_NUMERIC(res);
1655 }
1656
1657
1658 /*
1659  * numeric_power() -
1660  *
1661  *      Raise b to the power of x
1662  */
1663 Datum
1664 numeric_power(PG_FUNCTION_ARGS)
1665 {
1666         Numeric         num1 = PG_GETARG_NUMERIC(0);
1667         Numeric         num2 = PG_GETARG_NUMERIC(1);
1668         Numeric         res;
1669         NumericVar      arg1;
1670         NumericVar      arg2;
1671         NumericVar      arg2_trunc;
1672         NumericVar      result;
1673
1674         /*
1675          * Handle NaN
1676          */
1677         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1678                 PG_RETURN_NUMERIC(make_result(&const_nan));
1679
1680         /*
1681          * Initialize things
1682          */
1683         init_var(&arg1);
1684         init_var(&arg2);
1685         init_var(&arg2_trunc);
1686         init_var(&result);
1687
1688         set_var_from_num(num1, &arg1);
1689         set_var_from_num(num2, &arg2);
1690         set_var_from_var(&arg2, &arg2_trunc);
1691
1692         trunc_var(&arg2_trunc, 0);
1693
1694         /*
1695          * Return special SQLSTATE error codes for a few conditions mandated
1696          * by the standard.
1697          */
1698         if ((cmp_var(&arg1, &const_zero) == 0 &&
1699                  cmp_var(&arg2, &const_zero) < 0) ||
1700                 (cmp_var(&arg1, &const_zero) < 0 &&
1701                  cmp_var(&arg2, &arg2_trunc) != 0))
1702                 ereport(ERROR,
1703                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
1704                                  errmsg("invalid argument for power function")));
1705
1706         /*
1707          * Call power_var() to compute and return the result; note it handles
1708          * scale selection itself.
1709          */
1710         power_var(&arg1, &arg2, &result);
1711
1712         res = make_result(&result);
1713
1714         free_var(&result);
1715         free_var(&arg2);
1716         free_var(&arg2_trunc);
1717         free_var(&arg1);
1718
1719         PG_RETURN_NUMERIC(res);
1720 }
1721
1722
1723 /* ----------------------------------------------------------------------
1724  *
1725  * Type conversion functions
1726  *
1727  * ----------------------------------------------------------------------
1728  */
1729
1730
1731 Datum
1732 int4_numeric(PG_FUNCTION_ARGS)
1733 {
1734         int32           val = PG_GETARG_INT32(0);
1735         Numeric         res;
1736         NumericVar      result;
1737
1738         init_var(&result);
1739
1740         int8_to_numericvar((int64) val, &result);
1741
1742         res = make_result(&result);
1743
1744         free_var(&result);
1745
1746         PG_RETURN_NUMERIC(res);
1747 }
1748
1749
1750 Datum
1751 numeric_int4(PG_FUNCTION_ARGS)
1752 {
1753         Numeric         num = PG_GETARG_NUMERIC(0);
1754         NumericVar      x;
1755         int32           result;
1756
1757         /* XXX would it be better to return NULL? */
1758         if (NUMERIC_IS_NAN(num))
1759                 ereport(ERROR,
1760                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1761                                  errmsg("cannot convert NaN to integer")));
1762
1763         /* Convert to variable format, then convert to int4 */
1764         init_var(&x);
1765         set_var_from_num(num, &x);
1766         result = numericvar_to_int4(&x);
1767         free_var(&x);
1768         PG_RETURN_INT32(result);
1769 }
1770
1771 /*
1772  * Given a NumericVar, convert it to an int32. If the NumericVar
1773  * exceeds the range of an int32, raise the appropriate error via
1774  * ereport(). The input NumericVar is *not* free'd.
1775  */
1776 static int32
1777 numericvar_to_int4(NumericVar *var)
1778 {
1779         int32           result;
1780         int64           val;
1781
1782         if (!numericvar_to_int8(var, &val))
1783                 ereport(ERROR,
1784                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1785                                  errmsg("integer out of range")));
1786
1787         /* Down-convert to int4 */
1788         result = (int32) val;
1789
1790         /* Test for overflow by reverse-conversion. */
1791         if ((int64) result != val)
1792                 ereport(ERROR,
1793                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1794                                  errmsg("integer out of range")));
1795
1796         return result;
1797 }
1798
1799 Datum
1800 int8_numeric(PG_FUNCTION_ARGS)
1801 {
1802         int64           val = PG_GETARG_INT64(0);
1803         Numeric         res;
1804         NumericVar      result;
1805
1806         init_var(&result);
1807
1808         int8_to_numericvar(val, &result);
1809
1810         res = make_result(&result);
1811
1812         free_var(&result);
1813
1814         PG_RETURN_NUMERIC(res);
1815 }
1816
1817
1818 Datum
1819 numeric_int8(PG_FUNCTION_ARGS)
1820 {
1821         Numeric         num = PG_GETARG_NUMERIC(0);
1822         NumericVar      x;
1823         int64           result;
1824
1825         /* XXX would it be better to return NULL? */
1826         if (NUMERIC_IS_NAN(num))
1827                 ereport(ERROR,
1828                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1829                                  errmsg("cannot convert NaN to bigint")));
1830
1831         /* Convert to variable format and thence to int8 */
1832         init_var(&x);
1833         set_var_from_num(num, &x);
1834
1835         if (!numericvar_to_int8(&x, &result))
1836                 ereport(ERROR,
1837                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1838                                  errmsg("bigint out of range")));
1839
1840         free_var(&x);
1841
1842         PG_RETURN_INT64(result);
1843 }
1844
1845
1846 Datum
1847 int2_numeric(PG_FUNCTION_ARGS)
1848 {
1849         int16           val = PG_GETARG_INT16(0);
1850         Numeric         res;
1851         NumericVar      result;
1852
1853         init_var(&result);
1854
1855         int8_to_numericvar((int64) val, &result);
1856
1857         res = make_result(&result);
1858
1859         free_var(&result);
1860
1861         PG_RETURN_NUMERIC(res);
1862 }
1863
1864
1865 Datum
1866 numeric_int2(PG_FUNCTION_ARGS)
1867 {
1868         Numeric         num = PG_GETARG_NUMERIC(0);
1869         NumericVar      x;
1870         int64           val;
1871         int16           result;
1872
1873         /* XXX would it be better to return NULL? */
1874         if (NUMERIC_IS_NAN(num))
1875                 ereport(ERROR,
1876                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1877                                  errmsg("cannot convert NaN to smallint")));
1878
1879         /* Convert to variable format and thence to int8 */
1880         init_var(&x);
1881         set_var_from_num(num, &x);
1882
1883         if (!numericvar_to_int8(&x, &val))
1884                 ereport(ERROR,
1885                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1886                                  errmsg("smallint out of range")));
1887
1888         free_var(&x);
1889
1890         /* Down-convert to int2 */
1891         result = (int16) val;
1892
1893         /* Test for overflow by reverse-conversion. */
1894         if ((int64) result != val)
1895                 ereport(ERROR,
1896                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1897                                  errmsg("smallint out of range")));
1898
1899         PG_RETURN_INT16(result);
1900 }
1901
1902
1903 Datum
1904 float8_numeric(PG_FUNCTION_ARGS)
1905 {
1906         float8          val = PG_GETARG_FLOAT8(0);
1907         Numeric         res;
1908         NumericVar      result;
1909         char            buf[DBL_DIG + 100];
1910
1911         if (isnan(val))
1912                 PG_RETURN_NUMERIC(make_result(&const_nan));
1913
1914         sprintf(buf, "%.*g", DBL_DIG, val);
1915
1916         init_var(&result);
1917
1918         set_var_from_str(buf, &result);
1919         res = make_result(&result);
1920
1921         free_var(&result);
1922
1923         PG_RETURN_NUMERIC(res);
1924 }
1925
1926
1927 Datum
1928 numeric_float8(PG_FUNCTION_ARGS)
1929 {
1930         Numeric         num = PG_GETARG_NUMERIC(0);
1931         char       *tmp;
1932         Datum           result;
1933
1934         if (NUMERIC_IS_NAN(num))
1935                 PG_RETURN_FLOAT8(get_float8_nan());
1936
1937         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
1938                                                                                           NumericGetDatum(num)));
1939
1940         result = DirectFunctionCall1(float8in, CStringGetDatum(tmp));
1941
1942         pfree(tmp);
1943
1944         PG_RETURN_DATUM(result);
1945 }
1946
1947
1948 /* Convert numeric to float8; if out of range, return +/- HUGE_VAL */
1949 Datum
1950 numeric_float8_no_overflow(PG_FUNCTION_ARGS)
1951 {
1952         Numeric         num = PG_GETARG_NUMERIC(0);
1953         double          val;
1954
1955         if (NUMERIC_IS_NAN(num))
1956                 PG_RETURN_FLOAT8(get_float8_nan());
1957
1958         val = numeric_to_double_no_overflow(num);
1959
1960         PG_RETURN_FLOAT8(val);
1961 }
1962
1963 Datum
1964 float4_numeric(PG_FUNCTION_ARGS)
1965 {
1966         float4          val = PG_GETARG_FLOAT4(0);
1967         Numeric         res;
1968         NumericVar      result;
1969         char            buf[FLT_DIG + 100];
1970
1971         if (isnan(val))
1972                 PG_RETURN_NUMERIC(make_result(&const_nan));
1973
1974         sprintf(buf, "%.*g", FLT_DIG, val);
1975
1976         init_var(&result);
1977
1978         set_var_from_str(buf, &result);
1979         res = make_result(&result);
1980
1981         free_var(&result);
1982
1983         PG_RETURN_NUMERIC(res);
1984 }
1985
1986
1987 Datum
1988 numeric_float4(PG_FUNCTION_ARGS)
1989 {
1990         Numeric         num = PG_GETARG_NUMERIC(0);
1991         char       *tmp;
1992         Datum           result;
1993
1994         if (NUMERIC_IS_NAN(num))
1995                 PG_RETURN_FLOAT4(get_float4_nan());
1996
1997         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
1998                                                                                           NumericGetDatum(num)));
1999
2000         result = DirectFunctionCall1(float4in, CStringGetDatum(tmp));
2001
2002         pfree(tmp);
2003
2004         PG_RETURN_DATUM(result);
2005 }
2006
2007
2008 Datum
2009 text_numeric(PG_FUNCTION_ARGS)
2010 {
2011         text       *str = PG_GETARG_TEXT_P(0);
2012         int                     len;
2013         char       *s;
2014         Datum           result;
2015
2016         len = (VARSIZE(str) - VARHDRSZ);
2017         s = palloc(len + 1);
2018         memcpy(s, VARDATA(str), len);
2019         *(s + len) = '\0';
2020
2021         result = DirectFunctionCall3(numeric_in, CStringGetDatum(s),
2022                                                                  ObjectIdGetDatum(0), Int32GetDatum(-1));
2023
2024         pfree(s);
2025
2026         return result;
2027 }
2028
2029 Datum
2030 numeric_text(PG_FUNCTION_ARGS)
2031 {
2032         /* val is numeric, but easier to leave it as Datum */
2033         Datum           val = PG_GETARG_DATUM(0);
2034         char       *s;
2035         int                     len;
2036         text       *result;
2037
2038         s = DatumGetCString(DirectFunctionCall1(numeric_out, val));
2039         len = strlen(s);
2040
2041         result = (text *) palloc(VARHDRSZ + len);
2042
2043         VARATT_SIZEP(result) = len + VARHDRSZ;
2044         memcpy(VARDATA(result), s, len);
2045
2046         pfree(s);
2047
2048         PG_RETURN_TEXT_P(result);
2049 }
2050
2051
2052 /* ----------------------------------------------------------------------
2053  *
2054  * Aggregate functions
2055  *
2056  * The transition datatype for all these aggregates is a 3-element array
2057  * of Numeric, holding the values N, sum(X), sum(X*X) in that order.
2058  *
2059  * We represent N as a numeric mainly to avoid having to build a special
2060  * datatype; it's unlikely it'd overflow an int4, but ...
2061  *
2062  * ----------------------------------------------------------------------
2063  */
2064
2065 static ArrayType *
2066 do_numeric_accum(ArrayType *transarray, Numeric newval)
2067 {
2068         Datum      *transdatums;
2069         int                     ndatums;
2070         Datum           N,
2071                                 sumX,
2072                                 sumX2;
2073         ArrayType  *result;
2074
2075         /* We assume the input is array of numeric */
2076         deconstruct_array(transarray,
2077                                           NUMERICOID, -1, false, 'i',
2078                                           &transdatums, &ndatums);
2079         if (ndatums != 3)
2080                 elog(ERROR, "expected 3-element numeric array");
2081         N = transdatums[0];
2082         sumX = transdatums[1];
2083         sumX2 = transdatums[2];
2084
2085         N = DirectFunctionCall1(numeric_inc, N);
2086         sumX = DirectFunctionCall2(numeric_add, sumX,
2087                                                            NumericGetDatum(newval));
2088         sumX2 = DirectFunctionCall2(numeric_add, sumX2,
2089                                                                 DirectFunctionCall2(numeric_mul,
2090                                                                                                  NumericGetDatum(newval),
2091                                                                                            NumericGetDatum(newval)));
2092
2093         transdatums[0] = N;
2094         transdatums[1] = sumX;
2095         transdatums[2] = sumX2;
2096
2097         result = construct_array(transdatums, 3,
2098                                                          NUMERICOID, -1, false, 'i');
2099
2100         return result;
2101 }
2102
2103 Datum
2104 numeric_accum(PG_FUNCTION_ARGS)
2105 {
2106         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2107         Numeric         newval = PG_GETARG_NUMERIC(1);
2108
2109         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2110 }
2111
2112 /*
2113  * Integer data types all use Numeric accumulators to share code and
2114  * avoid risk of overflow.      For int2 and int4 inputs, Numeric accumulation
2115  * is overkill for the N and sum(X) values, but definitely not overkill
2116  * for the sum(X*X) value.      Hence, we use int2_accum and int4_accum only
2117  * for stddev/variance --- there are faster special-purpose accumulator
2118  * routines for SUM and AVG of these datatypes.
2119  */
2120
2121 Datum
2122 int2_accum(PG_FUNCTION_ARGS)
2123 {
2124         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2125         Datum           newval2 = PG_GETARG_DATUM(1);
2126         Numeric         newval;
2127
2128         newval = DatumGetNumeric(DirectFunctionCall1(int2_numeric, newval2));
2129
2130         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2131 }
2132
2133 Datum
2134 int4_accum(PG_FUNCTION_ARGS)
2135 {
2136         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2137         Datum           newval4 = PG_GETARG_DATUM(1);
2138         Numeric         newval;
2139
2140         newval = DatumGetNumeric(DirectFunctionCall1(int4_numeric, newval4));
2141
2142         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2143 }
2144
2145 Datum
2146 int8_accum(PG_FUNCTION_ARGS)
2147 {
2148         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2149         Datum           newval8 = PG_GETARG_DATUM(1);
2150         Numeric         newval;
2151
2152         newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
2153
2154         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2155 }
2156
2157 Datum
2158 numeric_avg(PG_FUNCTION_ARGS)
2159 {
2160         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2161         Datum      *transdatums;
2162         int                     ndatums;
2163         Numeric         N,
2164                                 sumX;
2165
2166         /* We assume the input is array of numeric */
2167         deconstruct_array(transarray,
2168                                           NUMERICOID, -1, false, 'i',
2169                                           &transdatums, &ndatums);
2170         if (ndatums != 3)
2171                 elog(ERROR, "expected 3-element numeric array");
2172         N = DatumGetNumeric(transdatums[0]);
2173         sumX = DatumGetNumeric(transdatums[1]);
2174         /* ignore sumX2 */
2175
2176         /* SQL92 defines AVG of no values to be NULL */
2177         /* N is zero iff no digits (cf. numeric_uminus) */
2178         if (N->varlen == NUMERIC_HDRSZ)
2179                 PG_RETURN_NULL();
2180
2181         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div,
2182                                                                                 NumericGetDatum(sumX),
2183                                                                                 NumericGetDatum(N)));
2184 }
2185
2186 Datum
2187 numeric_variance(PG_FUNCTION_ARGS)
2188 {
2189         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2190         Datum      *transdatums;
2191         int                     ndatums;
2192         Numeric         N,
2193                                 sumX,
2194                                 sumX2,
2195                                 res;
2196         NumericVar      vN,
2197                                 vsumX,
2198                                 vsumX2,
2199                                 vNminus1;
2200         int                     rscale;
2201
2202         /* We assume the input is array of numeric */
2203         deconstruct_array(transarray,
2204                                           NUMERICOID, -1, false, 'i',
2205                                           &transdatums, &ndatums);
2206         if (ndatums != 3)
2207                 elog(ERROR, "expected 3-element numeric array");
2208         N = DatumGetNumeric(transdatums[0]);
2209         sumX = DatumGetNumeric(transdatums[1]);
2210         sumX2 = DatumGetNumeric(transdatums[2]);
2211
2212         if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2))
2213                 PG_RETURN_NUMERIC(make_result(&const_nan));
2214
2215         /* Sample variance is undefined when N is 0 or 1, so return NULL */
2216         init_var(&vN);
2217         set_var_from_num(N, &vN);
2218
2219         if (cmp_var(&vN, &const_one) <= 0)
2220         {
2221                 free_var(&vN);
2222                 PG_RETURN_NULL();
2223         }
2224
2225         init_var(&vNminus1);
2226         sub_var(&vN, &const_one, &vNminus1);
2227
2228         init_var(&vsumX);
2229         set_var_from_num(sumX, &vsumX);
2230         init_var(&vsumX2);
2231         set_var_from_num(sumX2, &vsumX2);
2232
2233         /* compute rscale for mul_var calls */
2234         rscale = vsumX.dscale * 2;
2235
2236         mul_var(&vsumX, &vsumX, &vsumX, rscale);        /* vsumX = sumX * sumX */
2237         mul_var(&vN, &vsumX2, &vsumX2, rscale);         /* vsumX2 = N * sumX2 */
2238         sub_var(&vsumX2, &vsumX, &vsumX2);      /* N * sumX2 - sumX * sumX */
2239
2240         if (cmp_var(&vsumX2, &const_zero) <= 0)
2241         {
2242                 /* Watch out for roundoff error producing a negative numerator */
2243                 res = make_result(&const_zero);
2244         }
2245         else
2246         {
2247                 mul_var(&vN, &vNminus1, &vNminus1, 0);  /* N * (N - 1) */
2248                 rscale = select_div_scale(&vsumX2, &vNminus1);
2249                 div_var(&vsumX2, &vNminus1, &vsumX, rscale);    /* variance */
2250
2251                 res = make_result(&vsumX);
2252         }
2253
2254         free_var(&vN);
2255         free_var(&vNminus1);
2256         free_var(&vsumX);
2257         free_var(&vsumX2);
2258
2259         PG_RETURN_NUMERIC(res);
2260 }
2261
2262 Datum
2263 numeric_stddev(PG_FUNCTION_ARGS)
2264 {
2265         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2266         Datum      *transdatums;
2267         int                     ndatums;
2268         Numeric         N,
2269                                 sumX,
2270                                 sumX2,
2271                                 res;
2272         NumericVar      vN,
2273                                 vsumX,
2274                                 vsumX2,
2275                                 vNminus1;
2276         int                     rscale;
2277
2278         /* We assume the input is array of numeric */
2279         deconstruct_array(transarray,
2280                                           NUMERICOID, -1, false, 'i',
2281                                           &transdatums, &ndatums);
2282         if (ndatums != 3)
2283                 elog(ERROR, "expected 3-element numeric array");
2284         N = DatumGetNumeric(transdatums[0]);
2285         sumX = DatumGetNumeric(transdatums[1]);
2286         sumX2 = DatumGetNumeric(transdatums[2]);
2287
2288         if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2))
2289                 PG_RETURN_NUMERIC(make_result(&const_nan));
2290
2291         /* Sample stddev is undefined when N is 0 or 1, so return NULL */
2292         init_var(&vN);
2293         set_var_from_num(N, &vN);
2294
2295         if (cmp_var(&vN, &const_one) <= 0)
2296         {
2297                 free_var(&vN);
2298                 PG_RETURN_NULL();
2299         }
2300
2301         init_var(&vNminus1);
2302         sub_var(&vN, &const_one, &vNminus1);
2303
2304         init_var(&vsumX);
2305         set_var_from_num(sumX, &vsumX);
2306         init_var(&vsumX2);
2307         set_var_from_num(sumX2, &vsumX2);
2308
2309         /* compute rscale for mul_var calls */
2310         rscale = vsumX.dscale * 2;
2311
2312         mul_var(&vsumX, &vsumX, &vsumX, rscale);        /* vsumX = sumX * sumX */
2313         mul_var(&vN, &vsumX2, &vsumX2, rscale);         /* vsumX2 = N * sumX2 */
2314         sub_var(&vsumX2, &vsumX, &vsumX2);      /* N * sumX2 - sumX * sumX */
2315
2316         if (cmp_var(&vsumX2, &const_zero) <= 0)
2317         {
2318                 /* Watch out for roundoff error producing a negative numerator */
2319                 res = make_result(&const_zero);
2320         }
2321         else
2322         {
2323                 mul_var(&vN, &vNminus1, &vNminus1, 0);  /* N * (N - 1) */
2324                 rscale = select_div_scale(&vsumX2, &vNminus1);
2325                 div_var(&vsumX2, &vNminus1, &vsumX, rscale);    /* variance */
2326                 sqrt_var(&vsumX, &vsumX, rscale);               /* stddev */
2327
2328                 res = make_result(&vsumX);
2329         }
2330
2331         free_var(&vN);
2332         free_var(&vNminus1);
2333         free_var(&vsumX);
2334         free_var(&vsumX2);
2335
2336         PG_RETURN_NUMERIC(res);
2337 }
2338
2339
2340 /*
2341  * SUM transition functions for integer datatypes.
2342  *
2343  * To avoid overflow, we use accumulators wider than the input datatype.
2344  * A Numeric accumulator is needed for int8 input; for int4 and int2
2345  * inputs, we use int8 accumulators which should be sufficient for practical
2346  * purposes.  (The latter two therefore don't really belong in this file,
2347  * but we keep them here anyway.)
2348  *
2349  * Because SQL92 defines the SUM() of no values to be NULL, not zero,
2350  * the initial condition of the transition data value needs to be NULL. This
2351  * means we can't rely on ExecAgg to automatically insert the first non-null
2352  * data value into the transition data: it doesn't know how to do the type
2353  * conversion.  The upshot is that these routines have to be marked non-strict
2354  * and handle substitution of the first non-null input themselves.
2355  */
2356
2357 Datum
2358 int2_sum(PG_FUNCTION_ARGS)
2359 {
2360         int64           oldsum;
2361         int64           newval;
2362
2363         if (PG_ARGISNULL(0))
2364         {
2365                 /* No non-null input seen so far... */
2366                 if (PG_ARGISNULL(1))
2367                         PG_RETURN_NULL();       /* still no non-null */
2368                 /* This is the first non-null input. */
2369                 newval = (int64) PG_GETARG_INT16(1);
2370                 PG_RETURN_INT64(newval);
2371         }
2372
2373         oldsum = PG_GETARG_INT64(0);
2374
2375         /* Leave sum unchanged if new input is null. */
2376         if (PG_ARGISNULL(1))
2377                 PG_RETURN_INT64(oldsum);
2378
2379         /* OK to do the addition. */
2380         newval = oldsum + (int64) PG_GETARG_INT16(1);
2381
2382         PG_RETURN_INT64(newval);
2383 }
2384
2385 Datum
2386 int4_sum(PG_FUNCTION_ARGS)
2387 {
2388         int64           oldsum;
2389         int64           newval;
2390
2391         if (PG_ARGISNULL(0))
2392         {
2393                 /* No non-null input seen so far... */
2394                 if (PG_ARGISNULL(1))
2395                         PG_RETURN_NULL();       /* still no non-null */
2396                 /* This is the first non-null input. */
2397                 newval = (int64) PG_GETARG_INT32(1);
2398                 PG_RETURN_INT64(newval);
2399         }
2400
2401         oldsum = PG_GETARG_INT64(0);
2402
2403         /* Leave sum unchanged if new input is null. */
2404         if (PG_ARGISNULL(1))
2405                 PG_RETURN_INT64(oldsum);
2406
2407         /* OK to do the addition. */
2408         newval = oldsum + (int64) PG_GETARG_INT32(1);
2409
2410         PG_RETURN_INT64(newval);
2411 }
2412
2413 Datum
2414 int8_sum(PG_FUNCTION_ARGS)
2415 {
2416         Numeric         oldsum;
2417         Datum           newval;
2418
2419         if (PG_ARGISNULL(0))
2420         {
2421                 /* No non-null input seen so far... */
2422                 if (PG_ARGISNULL(1))
2423                         PG_RETURN_NULL();       /* still no non-null */
2424                 /* This is the first non-null input. */
2425                 newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2426                 PG_RETURN_DATUM(newval);
2427         }
2428
2429         oldsum = PG_GETARG_NUMERIC(0);
2430
2431         /* Leave sum unchanged if new input is null. */
2432         if (PG_ARGISNULL(1))
2433                 PG_RETURN_NUMERIC(oldsum);
2434
2435         /* OK to do the addition. */
2436         newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2437
2438         PG_RETURN_DATUM(DirectFunctionCall2(numeric_add,
2439                                                                                 NumericGetDatum(oldsum), newval));
2440 }
2441
2442
2443 /*
2444  * Routines for avg(int2) and avg(int4).  The transition datatype
2445  * is a two-element int8 array, holding count and sum.
2446  */
2447
2448 typedef struct Int8TransTypeData
2449 {
2450 #ifndef INT64_IS_BUSTED
2451         int64           count;
2452         int64           sum;
2453 #else
2454         /* "int64" isn't really 64 bits, so fake up properly-aligned fields */
2455         int32           count;
2456         int32           pad1;
2457         int32           sum;
2458         int32           pad2;
2459 #endif
2460 } Int8TransTypeData;
2461
2462 Datum
2463 int2_avg_accum(PG_FUNCTION_ARGS)
2464 {
2465         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2466         int16           newval = PG_GETARG_INT16(1);
2467         Int8TransTypeData *transdata;
2468
2469         /*
2470          * We copied the input array, so it's okay to scribble on it directly.
2471          */
2472         if (ARR_SIZE(transarray) != ARR_OVERHEAD(1) + sizeof(Int8TransTypeData))
2473                 elog(ERROR, "expected 2-element int8 array");
2474         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2475
2476         transdata->count++;
2477         transdata->sum += newval;
2478
2479         PG_RETURN_ARRAYTYPE_P(transarray);
2480 }
2481
2482 Datum
2483 int4_avg_accum(PG_FUNCTION_ARGS)
2484 {
2485         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2486         int32           newval = PG_GETARG_INT32(1);
2487         Int8TransTypeData *transdata;
2488
2489         /*
2490          * We copied the input array, so it's okay to scribble on it directly.
2491          */
2492         if (ARR_SIZE(transarray) != ARR_OVERHEAD(1) + sizeof(Int8TransTypeData))
2493                 elog(ERROR, "expected 2-element int8 array");
2494         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2495
2496         transdata->count++;
2497         transdata->sum += newval;
2498
2499         PG_RETURN_ARRAYTYPE_P(transarray);
2500 }
2501
2502 Datum
2503 int8_avg(PG_FUNCTION_ARGS)
2504 {
2505         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2506         Int8TransTypeData *transdata;
2507         Datum           countd,
2508                                 sumd;
2509
2510         if (ARR_SIZE(transarray) != ARR_OVERHEAD(1) + sizeof(Int8TransTypeData))
2511                 elog(ERROR, "expected 2-element int8 array");
2512         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2513
2514         /* SQL92 defines AVG of no values to be NULL */
2515         if (transdata->count == 0)
2516                 PG_RETURN_NULL();
2517
2518         countd = DirectFunctionCall1(int8_numeric,
2519                                                                  Int64GetDatumFast(transdata->count));
2520         sumd = DirectFunctionCall1(int8_numeric,
2521                                                            Int64GetDatumFast(transdata->sum));
2522
2523         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumd, countd));
2524 }
2525
2526
2527 /* ----------------------------------------------------------------------
2528  *
2529  * Debug support
2530  *
2531  * ----------------------------------------------------------------------
2532  */
2533
2534 #ifdef NUMERIC_DEBUG
2535
2536 /*
2537  * dump_numeric() - Dump a value in the db storage format for debugging
2538  */
2539 static void
2540 dump_numeric(const char *str, Numeric num)
2541 {
2542         NumericDigit *digits = (NumericDigit *) num->n_data;
2543         int                     ndigits;
2544         int                     i;
2545
2546         ndigits = (num->varlen - NUMERIC_HDRSZ) / sizeof(NumericDigit);
2547
2548         printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num));
2549         switch (NUMERIC_SIGN(num))
2550         {
2551                 case NUMERIC_POS:
2552                         printf("POS");
2553                         break;
2554                 case NUMERIC_NEG:
2555                         printf("NEG");
2556                         break;
2557                 case NUMERIC_NAN:
2558                         printf("NaN");
2559                         break;
2560                 default:
2561                         printf("SIGN=0x%x", NUMERIC_SIGN(num));
2562                         break;
2563         }
2564
2565         for (i = 0; i < ndigits; i++)
2566                 printf(" %0*d", DEC_DIGITS, digits[i]);
2567         printf("\n");
2568 }
2569
2570
2571 /*
2572  * dump_var() - Dump a value in the variable format for debugging
2573  */
2574 static void
2575 dump_var(const char *str, NumericVar *var)
2576 {
2577         int                     i;
2578
2579         printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale);
2580         switch (var->sign)
2581         {
2582                 case NUMERIC_POS:
2583                         printf("POS");
2584                         break;
2585                 case NUMERIC_NEG:
2586                         printf("NEG");
2587                         break;
2588                 case NUMERIC_NAN:
2589                         printf("NaN");
2590                         break;
2591                 default:
2592                         printf("SIGN=0x%x", var->sign);
2593                         break;
2594         }
2595
2596         for (i = 0; i < var->ndigits; i++)
2597                 printf(" %0*d", DEC_DIGITS, var->digits[i]);
2598
2599         printf("\n");
2600 }
2601 #endif   /* NUMERIC_DEBUG */
2602
2603
2604 /* ----------------------------------------------------------------------
2605  *
2606  * Local functions follow
2607  *
2608  * In general, these do not support NaNs --- callers must eliminate
2609  * the possibility of NaN first.  (make_result() is an exception.)
2610  *
2611  * ----------------------------------------------------------------------
2612  */
2613
2614
2615 /*
2616  * alloc_var() -
2617  *
2618  *      Allocate a digit buffer of ndigits digits (plus a spare digit for rounding)
2619  */
2620 static void
2621 alloc_var(NumericVar *var, int ndigits)
2622 {
2623         digitbuf_free(var->buf);
2624         var->buf = digitbuf_alloc(ndigits + 1);
2625         var->buf[0] = 0;                        /* spare digit for rounding */
2626         var->digits = var->buf + 1;
2627         var->ndigits = ndigits;
2628 }
2629
2630
2631 /*
2632  * free_var() -
2633  *
2634  *      Return the digit buffer of a variable to the free pool
2635  */
2636 static void
2637 free_var(NumericVar *var)
2638 {
2639         digitbuf_free(var->buf);
2640         var->buf = NULL;
2641         var->digits = NULL;
2642         var->sign = NUMERIC_NAN;
2643 }
2644
2645
2646 /*
2647  * zero_var() -
2648  *
2649  *      Set a variable to ZERO.
2650  *      Note: its dscale is not touched.
2651  */
2652 static void
2653 zero_var(NumericVar *var)
2654 {
2655         digitbuf_free(var->buf);
2656         var->buf = NULL;
2657         var->digits = NULL;
2658         var->ndigits = 0;
2659         var->weight = 0;                        /* by convention; doesn't really matter */
2660         var->sign = NUMERIC_POS;        /* anything but NAN... */
2661 }
2662
2663
2664 /*
2665  * set_var_from_str()
2666  *
2667  *      Parse a string and put the number into a variable
2668  */
2669 static void
2670 set_var_from_str(const char *str, NumericVar *dest)
2671 {
2672         const char *cp = str;
2673         bool            have_dp = FALSE;
2674         int                     i;
2675         unsigned char *decdigits;
2676         int                     sign = NUMERIC_POS;
2677         int                     dweight = -1;
2678         int                     ddigits;
2679         int                     dscale = 0;
2680         int                     weight;
2681         int                     ndigits;
2682         int                     offset;
2683         NumericDigit *digits;
2684
2685         /*
2686          * We first parse the string to extract decimal digits and determine
2687          * the correct decimal weight.  Then convert to NBASE representation.
2688          */
2689
2690         /* skip leading spaces */
2691         while (*cp)
2692         {
2693                 if (!isspace((unsigned char) *cp))
2694                         break;
2695                 cp++;
2696         }
2697
2698         switch (*cp)
2699         {
2700                 case '+':
2701                         sign = NUMERIC_POS;
2702                         cp++;
2703                         break;
2704
2705                 case '-':
2706                         sign = NUMERIC_NEG;
2707                         cp++;
2708                         break;
2709         }
2710
2711         if (*cp == '.')
2712         {
2713                 have_dp = TRUE;
2714                 cp++;
2715         }
2716
2717         if (!isdigit((unsigned char) *cp))
2718                 ereport(ERROR,
2719                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2720                   errmsg("invalid input syntax for type numeric: \"%s\"", str)));
2721
2722         decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS * 2);
2723
2724         /* leading padding for digit alignment later */
2725         memset(decdigits, 0, DEC_DIGITS);
2726         i = DEC_DIGITS;
2727
2728         while (*cp)
2729         {
2730                 if (isdigit((unsigned char) *cp))
2731                 {
2732                         decdigits[i++] = *cp++ - '0';
2733                         if (!have_dp)
2734                                 dweight++;
2735                         else
2736                                 dscale++;
2737                 }
2738                 else if (*cp == '.')
2739                 {
2740                         if (have_dp)
2741                                 ereport(ERROR,
2742                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2743                                   errmsg("invalid input syntax for type numeric: \"%s\"",
2744                                                  str)));
2745                         have_dp = TRUE;
2746                         cp++;
2747                 }
2748                 else
2749                         break;
2750         }
2751
2752         ddigits = i - DEC_DIGITS;
2753         /* trailing padding for digit alignment later */
2754         memset(decdigits + i, 0, DEC_DIGITS - 1);
2755
2756         /* Handle exponent, if any */
2757         if (*cp == 'e' || *cp == 'E')
2758         {
2759                 long            exponent;
2760                 char       *endptr;
2761
2762                 cp++;
2763                 exponent = strtol(cp, &endptr, 10);
2764                 if (endptr == cp)
2765                         ereport(ERROR,
2766                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2767                                   errmsg("invalid input syntax for type numeric: \"%s\"",
2768                                                  str)));
2769                 cp = endptr;
2770                 if (exponent > NUMERIC_MAX_PRECISION ||
2771                         exponent < -NUMERIC_MAX_PRECISION)
2772                         ereport(ERROR,
2773                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2774                                   errmsg("invalid input syntax for type numeric: \"%s\"",
2775                                                  str)));
2776                 dweight += (int) exponent;
2777                 dscale -= (int) exponent;
2778                 if (dscale < 0)
2779                         dscale = 0;
2780         }
2781
2782         /* Should be nothing left but spaces */
2783         while (*cp)
2784         {
2785                 if (!isspace((unsigned char) *cp))
2786                         ereport(ERROR,
2787                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2788                                   errmsg("invalid input syntax for type numeric: \"%s\"",
2789                                                  str)));
2790                 cp++;
2791         }
2792
2793         /*
2794          * Okay, convert pure-decimal representation to base NBASE.  First we
2795          * need to determine the converted weight and ndigits.  offset is the
2796          * number of decimal zeroes to insert before the first given digit to
2797          * have a correctly aligned first NBASE digit.
2798          */
2799         if (dweight >= 0)
2800                 weight = (dweight + 1 + DEC_DIGITS - 1) / DEC_DIGITS - 1;
2801         else
2802                 weight = -((-dweight - 1) / DEC_DIGITS + 1);
2803         offset = (weight + 1) * DEC_DIGITS - (dweight + 1);
2804         ndigits = (ddigits + offset + DEC_DIGITS - 1) / DEC_DIGITS;
2805
2806         alloc_var(dest, ndigits);
2807         dest->sign = sign;
2808         dest->weight = weight;
2809         dest->dscale = dscale;
2810
2811         i = DEC_DIGITS - offset;
2812         digits = dest->digits;
2813
2814         while (ndigits-- > 0)
2815         {
2816 #if DEC_DIGITS == 4
2817                 *digits++ = ((decdigits[i] * 10 + decdigits[i + 1]) * 10 +
2818                                          decdigits[i + 2]) * 10 + decdigits[i + 3];
2819 #elif DEC_DIGITS == 2
2820                 *digits++ = decdigits[i] * 10 + decdigits[i + 1];
2821 #elif DEC_DIGITS == 1
2822                 *digits++ = decdigits[i];
2823 #else
2824 #error unsupported NBASE
2825 #endif
2826                 i += DEC_DIGITS;
2827         }
2828
2829         pfree(decdigits);
2830
2831         /* Strip any leading/trailing zeroes, and normalize weight if zero */
2832         strip_var(dest);
2833 }
2834
2835
2836 /*
2837  * set_var_from_num() -
2838  *
2839  *      Convert the packed db format into a variable
2840  */
2841 static void
2842 set_var_from_num(Numeric num, NumericVar *dest)
2843 {
2844         int                     ndigits;
2845
2846         ndigits = (num->varlen - NUMERIC_HDRSZ) / sizeof(NumericDigit);
2847
2848         alloc_var(dest, ndigits);
2849
2850         dest->weight = num->n_weight;
2851         dest->sign = NUMERIC_SIGN(num);
2852         dest->dscale = NUMERIC_DSCALE(num);
2853
2854         memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit));
2855 }
2856
2857
2858 /*
2859  * set_var_from_var() -
2860  *
2861  *      Copy one variable into another
2862  */
2863 static void
2864 set_var_from_var(NumericVar *value, NumericVar *dest)
2865 {
2866         NumericDigit *newbuf;
2867
2868         newbuf = digitbuf_alloc(value->ndigits + 1);
2869         newbuf[0] = 0;                          /* spare digit for rounding */
2870         memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit));
2871
2872         digitbuf_free(dest->buf);
2873
2874         memmove(dest, value, sizeof(NumericVar));
2875         dest->buf = newbuf;
2876         dest->digits = newbuf + 1;
2877 }
2878
2879
2880 /*
2881  * get_str_from_var() -
2882  *
2883  *      Convert a var to text representation (guts of numeric_out).
2884  *      CAUTION: var's contents may be modified by rounding!
2885  *      Returns a palloc'd string.
2886  */
2887 static char *
2888 get_str_from_var(NumericVar *var, int dscale)
2889 {
2890         char       *str;
2891         char       *cp;
2892         char       *endcp;
2893         int                     i;
2894         int                     d;
2895         NumericDigit dig;
2896
2897 #if DEC_DIGITS > 1
2898         NumericDigit d1;
2899 #endif
2900
2901         if (dscale < 0)
2902                 dscale = 0;
2903
2904         /*
2905          * Check if we must round up before printing the value and do so.
2906          */
2907         round_var(var, dscale);
2908
2909         /*
2910          * Allocate space for the result.
2911          *
2912          * i is set to to # of decimal digits before decimal point. dscale is the
2913          * # of decimal digits we will print after decimal point. We may
2914          * generate as many as DEC_DIGITS-1 excess digits at the end, and in
2915          * addition we need room for sign, decimal point, null terminator.
2916          */
2917         i = (var->weight + 1) * DEC_DIGITS;
2918         if (i <= 0)
2919                 i = 1;
2920
2921         str = palloc(i + dscale + DEC_DIGITS + 2);
2922         cp = str;
2923
2924         /*
2925          * Output a dash for negative values
2926          */
2927         if (var->sign == NUMERIC_NEG)
2928                 *cp++ = '-';
2929
2930         /*
2931          * Output all digits before the decimal point
2932          */
2933         if (var->weight < 0)
2934         {
2935                 d = var->weight + 1;
2936                 *cp++ = '0';
2937         }
2938         else
2939         {
2940                 for (d = 0; d <= var->weight; d++)
2941                 {
2942                         dig = (d < var->ndigits) ? var->digits[d] : 0;
2943                         /* In the first digit, suppress extra leading decimal zeroes */
2944 #if DEC_DIGITS == 4
2945                         {
2946                                 bool            putit = (d > 0);
2947
2948                                 d1 = dig / 1000;
2949                                 dig -= d1 * 1000;
2950                                 putit |= (d1 > 0);
2951                                 if (putit)
2952                                         *cp++ = d1 + '0';
2953                                 d1 = dig / 100;
2954                                 dig -= d1 * 100;
2955                                 putit |= (d1 > 0);
2956                                 if (putit)
2957                                         *cp++ = d1 + '0';
2958                                 d1 = dig / 10;
2959                                 dig -= d1 * 10;
2960                                 putit |= (d1 > 0);
2961                                 if (putit)
2962                                         *cp++ = d1 + '0';
2963                                 *cp++ = dig + '0';
2964                         }
2965 #elif DEC_DIGITS == 2
2966                         d1 = dig / 10;
2967                         dig -= d1 * 10;
2968                         if (d1 > 0 || d > 0)
2969                                 *cp++ = d1 + '0';
2970                         *cp++ = dig + '0';
2971 #elif DEC_DIGITS == 1
2972                         *cp++ = dig + '0';
2973 #else
2974 #error unsupported NBASE
2975 #endif
2976                 }
2977         }
2978
2979         /*
2980          * If requested, output a decimal point and all the digits that follow
2981          * it.  We initially put out a multiple of DEC_DIGITS digits, then
2982          * truncate if needed.
2983          */
2984         if (dscale > 0)
2985         {
2986                 *cp++ = '.';
2987                 endcp = cp + dscale;
2988                 for (i = 0; i < dscale; d++, i += DEC_DIGITS)
2989                 {
2990                         dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0;
2991 #if DEC_DIGITS == 4
2992                         d1 = dig / 1000;
2993                         dig -= d1 * 1000;
2994                         *cp++ = d1 + '0';
2995                         d1 = dig / 100;
2996                         dig -= d1 * 100;
2997                         *cp++ = d1 + '0';
2998                         d1 = dig / 10;
2999                         dig -= d1 * 10;
3000                         *cp++ = d1 + '0';
3001                         *cp++ = dig + '0';
3002 #elif DEC_DIGITS == 2
3003                         d1 = dig / 10;
3004                         dig -= d1 * 10;
3005                         *cp++ = d1 + '0';
3006                         *cp++ = dig + '0';
3007 #elif DEC_DIGITS == 1
3008                         *cp++ = dig + '0';
3009 #else
3010 #error unsupported NBASE
3011 #endif
3012                 }
3013                 cp = endcp;
3014         }
3015
3016         /*
3017          * terminate the string and return it
3018          */
3019         *cp = '\0';
3020         return str;
3021 }
3022
3023
3024 /*
3025  * make_result() -
3026  *
3027  *      Create the packed db numeric format in palloc()'d memory from
3028  *      a variable.
3029  */
3030 static Numeric
3031 make_result(NumericVar *var)
3032 {
3033         Numeric         result;
3034         NumericDigit *digits = var->digits;
3035         int                     weight = var->weight;
3036         int                     sign = var->sign;
3037         int                     n;
3038         Size            len;
3039
3040         if (sign == NUMERIC_NAN)
3041         {
3042                 result = (Numeric) palloc(NUMERIC_HDRSZ);
3043
3044                 result->varlen = NUMERIC_HDRSZ;
3045                 result->n_weight = 0;
3046                 result->n_sign_dscale = NUMERIC_NAN;
3047
3048                 dump_numeric("make_result()", result);
3049                 return result;
3050         }
3051
3052         n = var->ndigits;
3053
3054         /* truncate leading zeroes */
3055         while (n > 0 && *digits == 0)
3056         {
3057                 digits++;
3058                 weight--;
3059                 n--;
3060         }
3061         /* truncate trailing zeroes */
3062         while (n > 0 && digits[n - 1] == 0)
3063                 n--;
3064
3065         /* If zero result, force to weight=0 and positive sign */
3066         if (n == 0)
3067         {
3068                 weight = 0;
3069                 sign = NUMERIC_POS;
3070         }
3071
3072         /* Build the result */
3073         len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
3074         result = (Numeric) palloc(len);
3075         result->varlen = len;
3076         result->n_weight = weight;
3077         result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK);
3078
3079         memcpy(result->n_data, digits, n * sizeof(NumericDigit));
3080
3081         /* Check for overflow of int16 fields */
3082         if (result->n_weight != weight ||
3083                 NUMERIC_DSCALE(result) != var->dscale)
3084                 ereport(ERROR,
3085                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3086                                  errmsg("value overflows numeric format")));
3087
3088         dump_numeric("make_result()", result);
3089         return result;
3090 }
3091
3092
3093 /*
3094  * apply_typmod() -
3095  *
3096  *      Do bounds checking and rounding according to the attributes
3097  *      typmod field.
3098  */
3099 static void
3100 apply_typmod(NumericVar *var, int32 typmod)
3101 {
3102         int                     precision;
3103         int                     scale;
3104         int                     maxdigits;
3105         int                     ddigits;
3106         int                     i;
3107
3108         /* Do nothing if we have a default typmod (-1) */
3109         if (typmod < (int32) (VARHDRSZ))
3110                 return;
3111
3112         typmod -= VARHDRSZ;
3113         precision = (typmod >> 16) & 0xffff;
3114         scale = typmod & 0xffff;
3115         maxdigits = precision - scale;
3116
3117         /* Round to target scale (and set var->dscale) */
3118         round_var(var, scale);
3119
3120         /*
3121          * Check for overflow - note we can't do this before rounding, because
3122          * rounding could raise the weight.  Also note that the var's weight
3123          * could be inflated by leading zeroes, which will be stripped before
3124          * storage but perhaps might not have been yet. In any case, we must
3125          * recognize a true zero, whose weight doesn't mean anything.
3126          */
3127         ddigits = (var->weight + 1) * DEC_DIGITS;
3128         if (ddigits > maxdigits)
3129         {
3130                 /* Determine true weight; and check for all-zero result */
3131                 for (i = 0; i < var->ndigits; i++)
3132                 {
3133                         NumericDigit dig = var->digits[i];
3134
3135                         if (dig)
3136                         {
3137                                 /* Adjust for any high-order decimal zero digits */
3138 #if DEC_DIGITS == 4
3139                                 if (dig < 10)
3140                                         ddigits -= 3;
3141                                 else if (dig < 100)
3142                                         ddigits -= 2;
3143                                 else if (dig < 1000)
3144                                         ddigits -= 1;
3145 #elif DEC_DIGITS == 2
3146                                 if (dig < 10)
3147                                         ddigits -= 1;
3148 #elif DEC_DIGITS == 1
3149                                 /* no adjustment */
3150 #else
3151 #error unsupported NBASE
3152 #endif
3153                                 if (ddigits > maxdigits)
3154                                         ereport(ERROR,
3155                                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3156                                                          errmsg("numeric field overflow"),
3157                                                          errdetail("The absolute value is greater than or equal to 10^%d for field with precision %d, scale %d.",
3158                                                                            ddigits - 1, precision, scale)));
3159                                 break;
3160                         }
3161                         ddigits -= DEC_DIGITS;
3162                 }
3163         }
3164 }
3165
3166 /*
3167  * Convert numeric to int8, rounding if needed.
3168  *
3169  * If overflow, return FALSE (no error is raised).      Return TRUE if okay.
3170  *
3171  *      CAUTION: var's contents may be modified by rounding!
3172  */
3173 static bool
3174 numericvar_to_int8(NumericVar *var, int64 *result)
3175 {
3176         NumericDigit *digits;
3177         int                     ndigits;
3178         int                     weight;
3179         int                     i;
3180         int64           val,
3181                                 oldval;
3182         bool            neg;
3183
3184         /* Round to nearest integer */
3185         round_var(var, 0);
3186
3187         /* Check for zero input */
3188         strip_var(var);
3189         ndigits = var->ndigits;
3190         if (ndigits == 0)
3191         {
3192                 *result = 0;
3193                 return true;
3194         }
3195
3196         /*
3197          * For input like 10000000000, we must treat stripped digits as real.
3198          * So the loop assumes there are weight+1 digits before the decimal
3199          * point.
3200          */
3201         weight = var->weight;
3202         Assert(weight >= 0 && ndigits <= weight + 1);
3203
3204         /* Construct the result */
3205         digits = var->digits;
3206         neg = (var->sign == NUMERIC_NEG);
3207         val = digits[0];
3208         for (i = 1; i <= weight; i++)
3209         {
3210                 oldval = val;
3211                 val *= NBASE;
3212                 if (i < ndigits)
3213                         val += digits[i];
3214
3215                 /*
3216                  * The overflow check is a bit tricky because we want to accept
3217                  * INT64_MIN, which will overflow the positive accumulator.  We
3218                  * can detect this case easily though because INT64_MIN is the
3219                  * only nonzero value for which -val == val (on a two's complement
3220                  * machine, anyway).
3221                  */
3222                 if ((val / NBASE) != oldval)    /* possible overflow? */
3223                 {
3224                         if (!neg || (-val) != val || val == 0 || oldval < 0)
3225                                 return false;
3226                 }
3227         }
3228
3229         *result = neg ? -val : val;
3230         return true;
3231 }
3232
3233 /*
3234  * Convert int8 value to numeric.
3235  */
3236 static void
3237 int8_to_numericvar(int64 val, NumericVar *var)
3238 {
3239         uint64          uval,
3240                                 newuval;
3241         NumericDigit *ptr;
3242         int                     ndigits;
3243
3244         /* int8 can require at most 19 decimal digits; add one for safety */
3245         alloc_var(var, 20 / DEC_DIGITS);
3246         if (val < 0)
3247         {
3248                 var->sign = NUMERIC_NEG;
3249                 uval = -val;
3250         }
3251         else
3252         {
3253                 var->sign = NUMERIC_POS;
3254                 uval = val;
3255         }
3256         var->dscale = 0;
3257         if (val == 0)
3258         {
3259                 var->ndigits = 0;
3260                 var->weight = 0;
3261                 return;
3262         }
3263         ptr = var->digits + var->ndigits;
3264         ndigits = 0;
3265         do
3266         {
3267                 ptr--;
3268                 ndigits++;
3269                 newuval = uval / NBASE;
3270                 *ptr = uval - newuval * NBASE;
3271                 uval = newuval;
3272         } while (uval);
3273         var->digits = ptr;
3274         var->ndigits = ndigits;
3275         var->weight = ndigits - 1;
3276 }
3277
3278 /*
3279  * Convert numeric to float8; if out of range, return +/- HUGE_VAL
3280  */
3281 static double
3282 numeric_to_double_no_overflow(Numeric num)
3283 {
3284         char       *tmp;
3285         double          val;
3286         char       *endptr;
3287
3288         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
3289                                                                                           NumericGetDatum(num)));
3290
3291         /* unlike float8in, we ignore ERANGE from strtod */
3292         val = strtod(tmp, &endptr);
3293         if (*endptr != '\0')
3294         {
3295                 /* shouldn't happen ... */
3296                 ereport(ERROR,
3297                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3298                  errmsg("invalid input syntax for type double precision: \"%s\"",
3299                                 tmp)));
3300         }
3301
3302         pfree(tmp);
3303
3304         return val;
3305 }
3306
3307 /* As above, but work from a NumericVar */
3308 static double
3309 numericvar_to_double_no_overflow(NumericVar *var)
3310 {
3311         char       *tmp;
3312         double          val;
3313         char       *endptr;
3314
3315         tmp = get_str_from_var(var, var->dscale);
3316
3317         /* unlike float8in, we ignore ERANGE from strtod */
3318         val = strtod(tmp, &endptr);
3319         if (*endptr != '\0')
3320         {
3321                 /* shouldn't happen ... */
3322                 ereport(ERROR,
3323                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3324                  errmsg("invalid input syntax for type double precision: \"%s\"",
3325                                 tmp)));
3326         }
3327
3328         pfree(tmp);
3329
3330         return val;
3331 }
3332
3333
3334 /*
3335  * cmp_var() -
3336  *
3337  *      Compare two values on variable level.  We assume zeroes have been
3338  *      truncated to no digits.
3339  */
3340 static int
3341 cmp_var(NumericVar *var1, NumericVar *var2)
3342 {
3343         if (var1->ndigits == 0)
3344         {
3345                 if (var2->ndigits == 0)
3346                         return 0;
3347                 if (var2->sign == NUMERIC_NEG)
3348                         return 1;
3349                 return -1;
3350         }
3351         if (var2->ndigits == 0)
3352         {
3353                 if (var1->sign == NUMERIC_POS)
3354                         return 1;
3355                 return -1;
3356         }
3357
3358         if (var1->sign == NUMERIC_POS)
3359         {
3360                 if (var2->sign == NUMERIC_NEG)
3361                         return 1;
3362                 return cmp_abs(var1, var2);
3363         }
3364
3365         if (var2->sign == NUMERIC_POS)
3366                 return -1;
3367
3368         return cmp_abs(var2, var1);
3369 }
3370
3371
3372 /*
3373  * add_var() -
3374  *
3375  *      Full version of add functionality on variable level (handling signs).
3376  *      result might point to one of the operands too without danger.
3377  */
3378 static void
3379 add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3380 {
3381         /*
3382          * Decide on the signs of the two variables what to do
3383          */
3384         if (var1->sign == NUMERIC_POS)
3385         {
3386                 if (var2->sign == NUMERIC_POS)
3387                 {
3388                         /*
3389                          * Both are positive result = +(ABS(var1) + ABS(var2))
3390                          */
3391                         add_abs(var1, var2, result);
3392                         result->sign = NUMERIC_POS;
3393                 }
3394                 else
3395                 {
3396                         /*
3397                          * var1 is positive, var2 is negative Must compare absolute
3398                          * values
3399                          */
3400                         switch (cmp_abs(var1, var2))
3401                         {
3402                                 case 0:
3403                                         /* ----------
3404                                          * ABS(var1) == ABS(var2)
3405                                          * result = ZERO
3406                                          * ----------
3407                                          */
3408                                         zero_var(result);
3409                                         result->dscale = Max(var1->dscale, var2->dscale);
3410                                         break;
3411
3412                                 case 1:
3413                                         /* ----------
3414                                          * ABS(var1) > ABS(var2)
3415                                          * result = +(ABS(var1) - ABS(var2))
3416                                          * ----------
3417                                          */
3418                                         sub_abs(var1, var2, result);
3419                                         result->sign = NUMERIC_POS;
3420                                         break;
3421
3422                                 case -1:
3423                                         /* ----------
3424                                          * ABS(var1) < ABS(var2)
3425                                          * result = -(ABS(var2) - ABS(var1))
3426                                          * ----------
3427                                          */
3428                                         sub_abs(var2, var1, result);
3429                                         result->sign = NUMERIC_NEG;
3430                                         break;
3431                         }
3432                 }
3433         }
3434         else
3435         {
3436                 if (var2->sign == NUMERIC_POS)
3437                 {
3438                         /* ----------
3439                          * var1 is negative, var2 is positive
3440                          * Must compare absolute values
3441                          * ----------
3442                          */
3443                         switch (cmp_abs(var1, var2))
3444                         {
3445                                 case 0:
3446                                         /* ----------
3447                                          * ABS(var1) == ABS(var2)
3448                                          * result = ZERO
3449                                          * ----------
3450                                          */
3451                                         zero_var(result);
3452                                         result->dscale = Max(var1->dscale, var2->dscale);
3453                                         break;
3454
3455                                 case 1:
3456                                         /* ----------
3457                                          * ABS(var1) > ABS(var2)
3458                                          * result = -(ABS(var1) - ABS(var2))
3459                                          * ----------
3460                                          */
3461                                         sub_abs(var1, var2, result);
3462                                         result->sign = NUMERIC_NEG;
3463                                         break;
3464
3465                                 case -1:
3466                                         /* ----------
3467                                          * ABS(var1) < ABS(var2)
3468                                          * result = +(ABS(var2) - ABS(var1))
3469                                          * ----------
3470                                          */
3471                                         sub_abs(var2, var1, result);
3472                                         result->sign = NUMERIC_POS;
3473                                         break;
3474                         }
3475                 }
3476                 else
3477                 {
3478                         /* ----------
3479                          * Both are negative
3480                          * result = -(ABS(var1) + ABS(var2))
3481                          * ----------
3482                          */
3483                         add_abs(var1, var2, result);
3484                         result->sign = NUMERIC_NEG;
3485                 }
3486         }
3487 }
3488
3489
3490 /*
3491  * sub_var() -
3492  *
3493  *      Full version of sub functionality on variable level (handling signs).
3494  *      result might point to one of the operands too without danger.
3495  */
3496 static void
3497 sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3498 {
3499         /*
3500          * Decide on the signs of the two variables what to do
3501          */
3502         if (var1->sign == NUMERIC_POS)
3503         {
3504                 if (var2->sign == NUMERIC_NEG)
3505                 {
3506                         /* ----------
3507                          * var1 is positive, var2 is negative
3508                          * result = +(ABS(var1) + ABS(var2))
3509                          * ----------
3510                          */
3511                         add_abs(var1, var2, result);
3512                         result->sign = NUMERIC_POS;
3513                 }
3514                 else
3515                 {
3516                         /* ----------
3517                          * Both are positive
3518                          * Must compare absolute values
3519                          * ----------
3520                          */
3521                         switch (cmp_abs(var1, var2))
3522                         {
3523                                 case 0:
3524                                         /* ----------
3525                                          * ABS(var1) == ABS(var2)
3526                                          * result = ZERO
3527                                          * ----------
3528                                          */
3529                                         zero_var(result);
3530                                         result->dscale = Max(var1->dscale, var2->dscale);
3531                                         break;
3532
3533                                 case 1:
3534                                         /* ----------
3535                                          * ABS(var1) > ABS(var2)
3536                                          * result = +(ABS(var1) - ABS(var2))
3537                                          * ----------
3538                                          */
3539                                         sub_abs(var1, var2, result);
3540                                         result->sign = NUMERIC_POS;
3541                                         break;
3542
3543                                 case -1:
3544                                         /* ----------
3545                                          * ABS(var1) < ABS(var2)
3546                                          * result = -(ABS(var2) - ABS(var1))
3547                                          * ----------
3548                                          */
3549                                         sub_abs(var2, var1, result);
3550                                         result->sign = NUMERIC_NEG;
3551                                         break;
3552                         }
3553                 }
3554         }
3555         else
3556         {
3557                 if (var2->sign == NUMERIC_NEG)
3558                 {
3559                         /* ----------
3560                          * Both are negative
3561                          * Must compare absolute values
3562                          * ----------
3563                          */
3564                         switch (cmp_abs(var1, var2))
3565                         {
3566                                 case 0:
3567                                         /* ----------
3568                                          * ABS(var1) == ABS(var2)
3569                                          * result = ZERO
3570                                          * ----------
3571                                          */
3572                                         zero_var(result);
3573                                         result->dscale = Max(var1->dscale, var2->dscale);
3574                                         break;
3575
3576                                 case 1:
3577                                         /* ----------
3578                                          * ABS(var1) > ABS(var2)
3579                                          * result = -(ABS(var1) - ABS(var2))
3580                                          * ----------
3581                                          */
3582                                         sub_abs(var1, var2, result);
3583                                         result->sign = NUMERIC_NEG;
3584                                         break;
3585
3586                                 case -1:
3587                                         /* ----------
3588                                          * ABS(var1) < ABS(var2)
3589                                          * result = +(ABS(var2) - ABS(var1))
3590                                          * ----------
3591                                          */
3592                                         sub_abs(var2, var1, result);
3593                                         result->sign = NUMERIC_POS;
3594                                         break;
3595                         }
3596                 }
3597                 else
3598                 {
3599                         /* ----------
3600                          * var1 is negative, var2 is positive
3601                          * result = -(ABS(var1) + ABS(var2))
3602                          * ----------
3603                          */
3604                         add_abs(var1, var2, result);
3605                         result->sign = NUMERIC_NEG;
3606                 }
3607         }
3608 }
3609
3610
3611 /*
3612  * mul_var() -
3613  *
3614  *      Multiplication on variable level. Product of var1 * var2 is stored
3615  *      in result.      Result is rounded to no more than rscale fractional digits.
3616  */
3617 static void
3618 mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
3619                 int rscale)
3620 {
3621         int                     res_ndigits;
3622         int                     res_sign;
3623         int                     res_weight;
3624         int                     maxdigits;
3625         int                *dig;
3626         int                     carry;
3627         int                     maxdig;
3628         int                     newdig;
3629         NumericDigit *res_digits;
3630         int                     i,
3631                                 ri,
3632                                 i1,
3633                                 i2;
3634
3635         /* copy these values into local vars for speed in inner loop */
3636         int                     var1ndigits = var1->ndigits;
3637         int                     var2ndigits = var2->ndigits;
3638         NumericDigit *var1digits = var1->digits;
3639         NumericDigit *var2digits = var2->digits;
3640
3641         if (var1ndigits == 0 || var2ndigits == 0)
3642         {
3643                 /* one or both inputs is zero; so is result */
3644                 zero_var(result);
3645                 result->dscale = rscale;
3646                 return;
3647         }
3648
3649         /* Determine result sign and (maximum possible) weight */
3650         if (var1->sign == var2->sign)
3651                 res_sign = NUMERIC_POS;
3652         else
3653                 res_sign = NUMERIC_NEG;
3654         res_weight = var1->weight + var2->weight + 2;
3655
3656         /*
3657          * Determine number of result digits to compute.  If the exact result
3658          * would have more than rscale fractional digits, truncate the
3659          * computation with MUL_GUARD_DIGITS guard digits.      We do that by
3660          * pretending that one or both inputs have fewer digits than they
3661          * really do.
3662          */
3663         res_ndigits = var1ndigits + var2ndigits + 1;
3664         maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS;
3665         if (res_ndigits > maxdigits)
3666         {
3667                 if (maxdigits < 3)
3668                 {
3669                         /* no useful precision at all in the result... */
3670                         zero_var(result);
3671                         result->dscale = rscale;
3672                         return;
3673                 }
3674                 /* force maxdigits odd so that input ndigits can be equal */
3675                 if ((maxdigits & 1) == 0)
3676                         maxdigits++;
3677                 if (var1ndigits > var2ndigits)
3678                 {
3679                         var1ndigits -= res_ndigits - maxdigits;
3680                         if (var1ndigits < var2ndigits)
3681                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
3682                 }
3683                 else
3684                 {
3685                         var2ndigits -= res_ndigits - maxdigits;
3686                         if (var2ndigits < var1ndigits)
3687                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
3688                 }
3689                 res_ndigits = maxdigits;
3690                 Assert(res_ndigits == var1ndigits + var2ndigits + 1);
3691         }
3692
3693         /*
3694          * We do the arithmetic in an array "dig[]" of signed int's.  Since
3695          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us
3696          * headroom to avoid normalizing carries immediately.
3697          *
3698          * maxdig tracks the maximum possible value of any dig[] entry; when this
3699          * threatens to exceed INT_MAX, we take the time to propagate carries.
3700          * To avoid overflow in maxdig itself, it actually represents the max
3701          * possible value divided by NBASE-1.
3702          */
3703         dig = (int *) palloc0(res_ndigits * sizeof(int));
3704         maxdig = 0;
3705
3706         ri = res_ndigits - 1;
3707         for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--)
3708         {
3709                 int                     var1digit = var1digits[i1];
3710
3711                 if (var1digit == 0)
3712                         continue;
3713
3714                 /* Time to normalize? */
3715                 maxdig += var1digit;
3716                 if (maxdig > INT_MAX / (NBASE - 1))
3717                 {
3718                         /* Yes, do it */
3719                         carry = 0;
3720                         for (i = res_ndigits - 1; i >= 0; i--)
3721                         {
3722                                 newdig = dig[i] + carry;
3723                                 if (newdig >= NBASE)
3724                                 {
3725                                         carry = newdig / NBASE;
3726                                         newdig -= carry * NBASE;
3727                                 }
3728                                 else
3729                                         carry = 0;
3730                                 dig[i] = newdig;
3731                         }
3732                         Assert(carry == 0);
3733                         /* Reset maxdig to indicate new worst-case */
3734                         maxdig = 1 + var1digit;
3735                 }
3736
3737                 /* Add appropriate multiple of var2 into the accumulator */
3738                 i = ri;
3739                 for (i2 = var2ndigits - 1; i2 >= 0; i2--)
3740                         dig[i--] += var1digit * var2digits[i2];
3741         }
3742
3743         /*
3744          * Now we do a final carry propagation pass to normalize the result,
3745          * which we combine with storing the result digits into the output.
3746          * Note that this is still done at full precision w/guard digits.
3747          */
3748         alloc_var(result, res_ndigits);
3749         res_digits = result->digits;
3750         carry = 0;
3751         for (i = res_ndigits - 1; i >= 0; i--)
3752         {
3753                 newdig = dig[i] + carry;
3754                 if (newdig >= NBASE)
3755                 {
3756                         carry = newdig / NBASE;
3757                         newdig -= carry * NBASE;
3758                 }
3759                 else
3760                         carry = 0;
3761                 res_digits[i] = newdig;
3762         }
3763         Assert(carry == 0);
3764
3765         pfree(dig);
3766
3767         /*
3768          * Finally, round the result to the requested precision.
3769          */
3770         result->weight = res_weight;
3771         result->sign = res_sign;
3772
3773         /* Round to target rscale (and set result->dscale) */
3774         round_var(result, rscale);
3775
3776         /* Strip leading and trailing zeroes */
3777         strip_var(result);
3778 }
3779
3780
3781 /*
3782  * div_var() -
3783  *
3784  *      Division on variable level. Quotient of var1 / var2 is stored
3785  *      in result.      Result is rounded to no more than rscale fractional digits.
3786  */
3787 static void
3788 div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
3789                 int rscale)
3790 {
3791         int                     div_ndigits;
3792         int                     res_sign;
3793         int                     res_weight;
3794         int                *div;
3795         int                     qdigit;
3796         int                     carry;
3797         int                     maxdiv;
3798         int                     newdig;
3799         NumericDigit *res_digits;
3800         double          fdividend,
3801                                 fdivisor,
3802                                 fdivisorinverse,
3803                                 fquotient;
3804         int                     qi;
3805         int                     i;
3806
3807         /* copy these values into local vars for speed in inner loop */
3808         int                     var1ndigits = var1->ndigits;
3809         int                     var2ndigits = var2->ndigits;
3810         NumericDigit *var1digits = var1->digits;
3811         NumericDigit *var2digits = var2->digits;
3812
3813         /*
3814          * First of all division by zero check; we must not be handed an
3815          * unnormalized divisor.
3816          */
3817         if (var2ndigits == 0 || var2digits[0] == 0)
3818                 ereport(ERROR,
3819                                 (errcode(ERRCODE_DIVISION_BY_ZERO),
3820                                  errmsg("division by zero")));
3821
3822         /*
3823          * Now result zero check
3824          */
3825         if (var1ndigits == 0)
3826         {
3827                 zero_var(result);
3828                 result->dscale = rscale;
3829                 return;
3830         }
3831
3832         /*
3833          * Determine the result sign, weight and number of digits to calculate
3834          */
3835         if (var1->sign == var2->sign)
3836                 res_sign = NUMERIC_POS;
3837         else
3838                 res_sign = NUMERIC_NEG;
3839         res_weight = var1->weight - var2->weight + 1;
3840         /* The number of accurate result digits we need to produce: */
3841         div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
3842         /* Add guard digits for roundoff error */
3843         div_ndigits += DIV_GUARD_DIGITS;
3844         if (div_ndigits < DIV_GUARD_DIGITS)
3845                 div_ndigits = DIV_GUARD_DIGITS;
3846         /* Must be at least var1ndigits, too, to simplify data-loading loop */
3847         if (div_ndigits < var1ndigits)
3848                 div_ndigits = var1ndigits;
3849
3850         /*
3851          * We do the arithmetic in an array "div[]" of signed int's.  Since
3852          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us
3853          * headroom to avoid normalizing carries immediately.
3854          *
3855          * We start with div[] containing one zero digit followed by the
3856          * dividend's digits (plus appended zeroes to reach the desired
3857          * precision including guard digits).  Each step of the main loop
3858          * computes an (approximate) quotient digit and stores it into div[],
3859          * removing one position of dividend space.  A final pass of carry
3860          * propagation takes care of any mistaken quotient digits.
3861          */
3862         div = (int *) palloc0((div_ndigits + 1) * sizeof(int));
3863         for (i = 0; i < var1ndigits; i++)
3864                 div[i + 1] = var1digits[i];
3865
3866         /*
3867          * We estimate each quotient digit using floating-point arithmetic,
3868          * taking the first four digits of the (current) dividend and divisor.
3869          * This must be float to avoid overflow.
3870          */
3871         fdivisor = (double) var2digits[0];
3872         for (i = 1; i < 4; i++)
3873         {
3874                 fdivisor *= NBASE;
3875                 if (i < var2ndigits)
3876                         fdivisor += (double) var2digits[i];
3877         }
3878         fdivisorinverse = 1.0 / fdivisor;
3879
3880         /*
3881          * maxdiv tracks the maximum possible absolute value of any div[]
3882          * entry; when this threatens to exceed INT_MAX, we take the time to
3883          * propagate carries.  To avoid overflow in maxdiv itself, it actually
3884          * represents the max possible abs. value divided by NBASE-1.
3885          */
3886         maxdiv = 1;
3887
3888         /*
3889          * Outer loop computes next quotient digit, which will go into div[qi]
3890          */
3891         for (qi = 0; qi < div_ndigits; qi++)
3892         {
3893                 /* Approximate the current dividend value */
3894                 fdividend = (double) div[qi];
3895                 for (i = 1; i < 4; i++)
3896                 {
3897                         fdividend *= NBASE;
3898                         if (qi + i <= div_ndigits)
3899                                 fdividend += (double) div[qi + i];
3900                 }
3901                 /* Compute the (approximate) quotient digit */
3902                 fquotient = fdividend * fdivisorinverse;
3903                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
3904                         (((int) fquotient) - 1);        /* truncate towards -infinity */
3905
3906                 if (qdigit != 0)
3907                 {
3908                         /* Do we need to normalize now? */
3909                         maxdiv += Abs(qdigit);
3910                         if (maxdiv > INT_MAX / (NBASE - 1))
3911                         {
3912                                 /* Yes, do it */
3913                                 carry = 0;
3914                                 for (i = div_ndigits; i > qi; i--)
3915                                 {
3916                                         newdig = div[i] + carry;
3917                                         if (newdig < 0)
3918                                         {
3919                                                 carry = -((-newdig - 1) / NBASE) - 1;
3920                                                 newdig -= carry * NBASE;
3921                                         }
3922                                         else if (newdig >= NBASE)
3923                                         {
3924                                                 carry = newdig / NBASE;
3925                                                 newdig -= carry * NBASE;
3926                                         }
3927                                         else
3928                                                 carry = 0;
3929                                         div[i] = newdig;
3930                                 }
3931                                 newdig = div[qi] + carry;
3932                                 div[qi] = newdig;
3933
3934                                 /*
3935                                  * All the div[] digits except possibly div[qi] are now in
3936                                  * the range 0..NBASE-1.
3937                                  */
3938                                 maxdiv = Abs(newdig) / (NBASE - 1);
3939                                 maxdiv = Max(maxdiv, 1);
3940
3941                                 /*
3942                                  * Recompute the quotient digit since new info may have
3943                                  * propagated into the top four dividend digits
3944                                  */
3945                                 fdividend = (double) div[qi];
3946                                 for (i = 1; i < 4; i++)
3947                                 {
3948                                         fdividend *= NBASE;
3949                                         if (qi + i <= div_ndigits)
3950                                                 fdividend += (double) div[qi + i];
3951                                 }
3952                                 /* Compute the (approximate) quotient digit */
3953                                 fquotient = fdividend * fdivisorinverse;
3954                                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
3955                                         (((int) fquotient) - 1);        /* truncate towards
3956                                                                                                  * -infinity */
3957                                 maxdiv += Abs(qdigit);
3958                         }
3959
3960                         /* Subtract off the appropriate multiple of the divisor */
3961                         if (qdigit != 0)
3962                         {
3963                                 int                     istop = Min(var2ndigits, div_ndigits - qi + 1);
3964
3965                                 for (i = 0; i < istop; i++)
3966                                         div[qi + i] -= qdigit * var2digits[i];
3967                         }
3968                 }
3969
3970                 /*
3971                  * The dividend digit we are about to replace might still be
3972                  * nonzero. Fold it into the next digit position.  We don't need
3973                  * to worry about overflow here since this should nearly cancel
3974                  * with the subtraction of the divisor.
3975                  */
3976                 div[qi + 1] += div[qi] * NBASE;
3977
3978                 div[qi] = qdigit;
3979         }
3980
3981         /*
3982          * Approximate and store the last quotient digit (div[div_ndigits])
3983          */
3984         fdividend = (double) div[qi];
3985         for (i = 1; i < 4; i++)
3986                 fdividend *= NBASE;
3987         fquotient = fdividend * fdivisorinverse;
3988         qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
3989                 (((int) fquotient) - 1);        /* truncate towards -infinity */
3990         div[qi] = qdigit;
3991
3992         /*
3993          * Now we do a final carry propagation pass to normalize the result,
3994          * which we combine with storing the result digits into the output.
3995          * Note that this is still done at full precision w/guard digits.
3996          */
3997         alloc_var(result, div_ndigits + 1);
3998         res_digits = result->digits;
3999         carry = 0;
4000         for (i = div_ndigits; i >= 0; i--)
4001         {
4002                 newdig = div[i] + carry;
4003                 if (newdig < 0)
4004                 {
4005                         carry = -((-newdig - 1) / NBASE) - 1;
4006                         newdig -= carry * NBASE;
4007                 }
4008                 else if (newdig >= NBASE)
4009                 {
4010                         carry = newdig / NBASE;
4011                         newdig -= carry * NBASE;
4012                 }
4013                 else
4014                         carry = 0;
4015                 res_digits[i] = newdig;
4016         }
4017         Assert(carry == 0);
4018
4019         pfree(div);
4020
4021         /*
4022          * Finally, round the result to the requested precision.
4023          */
4024         result->weight = res_weight;
4025         result->sign = res_sign;
4026
4027         /* Round to target rscale (and set result->dscale) */
4028         round_var(result, rscale);
4029
4030         /* Strip leading and trailing zeroes */
4031         strip_var(result);
4032 }
4033
4034
4035 /*
4036  * Default scale selection for division
4037  *
4038  * Returns the appropriate result scale for the division result.
4039  */
4040 static int
4041 select_div_scale(NumericVar *var1, NumericVar *var2)
4042 {
4043         int                     weight1,
4044                                 weight2,
4045                                 qweight,
4046                                 i;
4047         NumericDigit firstdigit1,
4048                                 firstdigit2;
4049         int                     rscale;
4050
4051         /*
4052          * The result scale of a division isn't specified in any SQL standard.
4053          * For PostgreSQL we select a result scale that will give at least
4054          * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a
4055          * result no less accurate than float8; but use a scale not less than
4056          * either input's display scale.
4057          */
4058
4059         /* Get the actual (normalized) weight and first digit of each input */
4060
4061         weight1 = 0;                            /* values to use if var1 is zero */
4062         firstdigit1 = 0;
4063         for (i = 0; i < var1->ndigits; i++)
4064         {
4065                 firstdigit1 = var1->digits[i];
4066                 if (firstdigit1 != 0)
4067                 {
4068                         weight1 = var1->weight - i;
4069                         break;
4070                 }
4071         }
4072
4073         weight2 = 0;                            /* values to use if var2 is zero */
4074         firstdigit2 = 0;
4075         for (i = 0; i < var2->ndigits; i++)
4076         {
4077                 firstdigit2 = var2->digits[i];
4078                 if (firstdigit2 != 0)
4079                 {
4080                         weight2 = var2->weight - i;
4081                         break;
4082                 }
4083         }
4084
4085         /*
4086          * Estimate weight of quotient.  If the two first digits are equal, we
4087          * can't be sure, but assume that var1 is less than var2.
4088          */
4089         qweight = weight1 - weight2;
4090         if (firstdigit1 <= firstdigit2)
4091                 qweight--;
4092
4093         /* Select result scale */
4094         rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS;
4095         rscale = Max(rscale, var1->dscale);
4096         rscale = Max(rscale, var2->dscale);
4097         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4098         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4099
4100         return rscale;
4101 }
4102
4103
4104 /*
4105  * mod_var() -
4106  *
4107  *      Calculate the modulo of two numerics at variable level
4108  */
4109 static void
4110 mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
4111 {
4112         NumericVar      tmp;
4113         int                     rscale;
4114
4115         init_var(&tmp);
4116
4117         /* ---------
4118          * We do this using the equation
4119          *              mod(x,y) = x - trunc(x/y)*y
4120          * We set rscale the same way numeric_div and numeric_mul do
4121          * to get the right answer from the equation.  The final result,
4122          * however, need not be displayed to more precision than the inputs.
4123          * ----------
4124          */
4125         rscale = select_div_scale(var1, var2);
4126
4127         div_var(var1, var2, &tmp, rscale);
4128
4129         trunc_var(&tmp, 0);
4130
4131         mul_var(var2, &tmp, &tmp, var2->dscale + tmp.dscale);
4132
4133         sub_var(var1, &tmp, result);
4134
4135         round_var(result, Max(var1->dscale, var2->dscale));
4136
4137         free_var(&tmp);
4138 }
4139
4140
4141 /*
4142  * ceil_var() -
4143  *
4144  *      Return the smallest integer greater than or equal to the argument
4145  *      on variable level
4146  */
4147 static void
4148 ceil_var(NumericVar *var, NumericVar *result)
4149 {
4150         NumericVar      tmp;
4151
4152         init_var(&tmp);
4153         set_var_from_var(var, &tmp);
4154
4155         trunc_var(&tmp, 0);
4156
4157         if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
4158                 add_var(&tmp, &const_one, &tmp);
4159
4160         set_var_from_var(&tmp, result);
4161         free_var(&tmp);
4162 }
4163
4164
4165 /*
4166  * floor_var() -
4167  *
4168  *      Return the largest integer equal to or less than the argument
4169  *      on variable level
4170  */
4171 static void
4172 floor_var(NumericVar *var, NumericVar *result)
4173 {
4174         NumericVar      tmp;
4175
4176         init_var(&tmp);
4177         set_var_from_var(var, &tmp);
4178
4179         trunc_var(&tmp, 0);
4180
4181         if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
4182                 sub_var(&tmp, &const_one, &tmp);
4183
4184         set_var_from_var(&tmp, result);
4185         free_var(&tmp);
4186 }
4187
4188
4189 /*
4190  * sqrt_var() -
4191  *
4192  *      Compute the square root of x using Newton's algorithm
4193  */
4194 static void
4195 sqrt_var(NumericVar *arg, NumericVar *result, int rscale)
4196 {
4197         NumericVar      tmp_arg;
4198         NumericVar      tmp_val;
4199         NumericVar      last_val;
4200         int                     local_rscale;
4201         int                     stat;
4202
4203         local_rscale = rscale + 8;
4204
4205         stat = cmp_var(arg, &const_zero);
4206         if (stat == 0)
4207         {
4208                 zero_var(result);
4209                 result->dscale = rscale;
4210                 return;
4211         }
4212
4213         /*
4214          * SQL2003 defines sqrt() in terms of power, so we need to emit the
4215          * right SQLSTATE error code if the operand is negative.
4216          */
4217         if (stat < 0)
4218                 ereport(ERROR,
4219                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
4220                                  errmsg("cannot take square root of a negative number")));
4221
4222         init_var(&tmp_arg);
4223         init_var(&tmp_val);
4224         init_var(&last_val);
4225
4226         /* Copy arg in case it is the same var as result */
4227         set_var_from_var(arg, &tmp_arg);
4228
4229         /*
4230          * Initialize the result to the first guess
4231          */
4232         alloc_var(result, 1);
4233         result->digits[0] = tmp_arg.digits[0] / 2;
4234         if (result->digits[0] == 0)
4235                 result->digits[0] = 1;
4236         result->weight = tmp_arg.weight / 2;
4237         result->sign = NUMERIC_POS;
4238
4239         set_var_from_var(result, &last_val);
4240
4241         for (;;)
4242         {
4243                 div_var(&tmp_arg, result, &tmp_val, local_rscale);
4244
4245                 add_var(result, &tmp_val, result);
4246                 mul_var(result, &const_zero_point_five, result, local_rscale);
4247
4248                 if (cmp_var(&last_val, result) == 0)
4249                         break;
4250                 set_var_from_var(result, &last_val);
4251         }
4252
4253         free_var(&last_val);
4254         free_var(&tmp_val);
4255         free_var(&tmp_arg);
4256
4257         /* Round to requested precision */
4258         round_var(result, rscale);
4259 }
4260
4261
4262 /*
4263  * exp_var() -
4264  *
4265  *      Raise e to the power of x
4266  */
4267 static void
4268 exp_var(NumericVar *arg, NumericVar *result, int rscale)
4269 {
4270         NumericVar      x;
4271         int                     xintval;
4272         bool            xneg = FALSE;
4273         int                     local_rscale;
4274
4275         /*----------
4276          * We separate the integral and fraction parts of x, then compute
4277          *              e^x = e^xint * e^xfrac
4278          * where e = exp(1) and e^xfrac = exp(xfrac) are computed by
4279          * exp_var_internal; the limited range of inputs allows that routine
4280          * to do a good job with a simple Taylor series.  Raising e^xint is
4281          * done by repeated multiplications in power_var_int.
4282          *----------
4283          */
4284         init_var(&x);
4285
4286         set_var_from_var(arg, &x);
4287
4288         if (x.sign == NUMERIC_NEG)
4289         {
4290                 xneg = TRUE;
4291                 x.sign = NUMERIC_POS;
4292         }
4293
4294         /* Extract the integer part, remove it from x */
4295         xintval = 0;
4296         while (x.weight >= 0)
4297         {
4298                 xintval *= NBASE;
4299                 if (x.ndigits > 0)
4300                 {
4301                         xintval += x.digits[0];
4302                         x.digits++;
4303                         x.ndigits--;
4304                 }
4305                 x.weight--;
4306                 /* Guard against overflow */
4307                 if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3)
4308                         ereport(ERROR,
4309                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
4310                                          errmsg("argument for function \"exp\" too big")));
4311         }
4312
4313         /* Select an appropriate scale for internal calculation */
4314         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
4315
4316         /* Compute e^xfrac */
4317         exp_var_internal(&x, result, local_rscale);
4318
4319         /* If there's an integer part, multiply by e^xint */
4320         if (xintval > 0)
4321         {
4322                 NumericVar      e;
4323
4324                 init_var(&e);
4325                 exp_var_internal(&const_one, &e, local_rscale);
4326                 power_var_int(&e, xintval, &e, local_rscale);
4327                 mul_var(&e, result, result, local_rscale);
4328                 free_var(&e);
4329         }
4330
4331         /* Compensate for input sign, and round to requested rscale */
4332         if (xneg)
4333                 div_var(&const_one, result, result, rscale);
4334         else
4335                 round_var(result, rscale);
4336
4337         free_var(&x);
4338 }
4339
4340
4341 /*
4342  * exp_var_internal() -
4343  *
4344  *      Raise e to the power of x, where 0 <= x <= 1
4345  *
4346  * NB: the result should be good to at least rscale digits, but it has
4347  * *not* been rounded off; the caller must do that if wanted.
4348  */
4349 static void
4350 exp_var_internal(NumericVar *arg, NumericVar *result, int rscale)
4351 {
4352         NumericVar      x;
4353         NumericVar      xpow;
4354         NumericVar      ifac;
4355         NumericVar      elem;
4356         NumericVar      ni;
4357         int                     ndiv2 = 0;
4358         int                     local_rscale;
4359
4360         init_var(&x);
4361         init_var(&xpow);
4362         init_var(&ifac);
4363         init_var(&elem);
4364         init_var(&ni);
4365
4366         set_var_from_var(arg, &x);
4367
4368         Assert(x.sign == NUMERIC_POS);
4369
4370         local_rscale = rscale + 8;
4371
4372         /* Reduce input into range 0 <= x <= 0.01 */
4373         while (cmp_var(&x, &const_zero_point_01) > 0)
4374         {
4375                 ndiv2++;
4376                 local_rscale++;
4377                 mul_var(&x, &const_zero_point_five, &x, x.dscale + 1);
4378         }
4379
4380         /*
4381          * Use the Taylor series
4382          *
4383          * exp(x) = 1 + x + x^2/2! + x^3/3! + ...
4384          *
4385          * Given the limited range of x, this should converge reasonably quickly.
4386          * We run the series until the terms fall below the local_rscale
4387          * limit.
4388          */
4389         add_var(&const_one, &x, result);
4390         set_var_from_var(&x, &xpow);
4391         set_var_from_var(&const_one, &ifac);
4392         set_var_from_var(&const_one, &ni);
4393
4394         for (;;)
4395         {
4396                 add_var(&ni, &const_one, &ni);
4397                 mul_var(&xpow, &x, &xpow, local_rscale);
4398                 mul_var(&ifac, &ni, &ifac, 0);
4399                 div_var(&xpow, &ifac, &elem, local_rscale);
4400
4401                 if (elem.ndigits == 0)
4402                         break;
4403
4404                 add_var(result, &elem, result);
4405         }
4406
4407         /* Compensate for argument range reduction */
4408         while (ndiv2-- > 0)
4409                 mul_var(result, result, result, local_rscale);
4410
4411         free_var(&x);
4412         free_var(&xpow);
4413         free_var(&ifac);
4414         free_var(&elem);
4415         free_var(&ni);
4416 }
4417
4418
4419 /*
4420  * ln_var() -
4421  *
4422  *      Compute the natural log of x
4423  */
4424 static void
4425 ln_var(NumericVar *arg, NumericVar *result, int rscale)
4426 {
4427         NumericVar      x;
4428         NumericVar      xx;
4429         NumericVar      ni;
4430         NumericVar      elem;
4431         NumericVar      fact;
4432         int                     local_rscale;
4433         int                     cmp;
4434
4435         cmp = cmp_var(arg, &const_zero);
4436         if (cmp == 0)
4437                 ereport(ERROR,
4438                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
4439                                  errmsg("cannot take logarithm of zero")));
4440         else if (cmp < 0)
4441                 ereport(ERROR,
4442                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
4443                                  errmsg("cannot take logarithm of a negative number")));
4444
4445         local_rscale = rscale + 8;
4446
4447         init_var(&x);
4448         init_var(&xx);
4449         init_var(&ni);
4450         init_var(&elem);
4451         init_var(&fact);
4452
4453         set_var_from_var(arg, &x);
4454         set_var_from_var(&const_two, &fact);
4455
4456         /* Reduce input into range 0.9 < x < 1.1 */
4457         while (cmp_var(&x, &const_zero_point_nine) <= 0)
4458         {
4459                 local_rscale++;
4460                 sqrt_var(&x, &x, local_rscale);
4461                 mul_var(&fact, &const_two, &fact, 0);
4462         }
4463         while (cmp_var(&x, &const_one_point_one) >= 0)
4464         {
4465                 local_rscale++;
4466                 sqrt_var(&x, &x, local_rscale);
4467                 mul_var(&fact, &const_two, &fact, 0);
4468         }
4469
4470         /*
4471          * We use the Taylor series for 0.5 * ln((1+z)/(1-z)),
4472          *
4473          * z + z^3/3 + z^5/5 + ...
4474          *
4475          * where z = (x-1)/(x+1) is in the range (approximately) -0.053 .. 0.048
4476          * due to the above range-reduction of x.
4477          *
4478          * The convergence of this is not as fast as one would like, but is
4479          * tolerable given that z is small.
4480          */
4481         sub_var(&x, &const_one, result);
4482         add_var(&x, &const_one, &elem);
4483         div_var(result, &elem, result, local_rscale);
4484         set_var_from_var(result, &xx);
4485         mul_var(result, result, &x, local_rscale);
4486
4487         set_var_from_var(&const_one, &ni);
4488
4489         for (;;)
4490         {
4491                 add_var(&ni, &const_two, &ni);
4492                 mul_var(&xx, &x, &xx, local_rscale);
4493                 div_var(&xx, &ni, &elem, local_rscale);
4494
4495                 if (elem.ndigits == 0)
4496                         break;
4497
4498                 add_var(result, &elem, result);
4499
4500                 if (elem.weight < (result->weight - local_rscale * 2 / DEC_DIGITS))
4501                         break;
4502         }
4503
4504         /* Compensate for argument range reduction, round to requested rscale */
4505         mul_var(result, &fact, result, rscale);
4506
4507         free_var(&x);
4508         free_var(&xx);
4509         free_var(&ni);
4510         free_var(&elem);
4511         free_var(&fact);
4512 }
4513
4514
4515 /*
4516  * log_var() -
4517  *
4518  *      Compute the logarithm of num in a given base.
4519  *
4520  *      Note: this routine chooses dscale of the result.
4521  */
4522 static void
4523 log_var(NumericVar *base, NumericVar *num, NumericVar *result)
4524 {
4525         NumericVar      ln_base;
4526         NumericVar      ln_num;
4527         int                     dec_digits;
4528         int                     rscale;
4529         int                     local_rscale;
4530
4531         init_var(&ln_base);
4532         init_var(&ln_num);
4533
4534         /* Set scale for ln() calculations --- compare numeric_ln() */
4535
4536         /* Approx decimal digits before decimal point */
4537         dec_digits = (num->weight + 1) * DEC_DIGITS;
4538
4539         if (dec_digits > 1)
4540                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
4541         else if (dec_digits < 1)
4542                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
4543         else
4544                 rscale = NUMERIC_MIN_SIG_DIGITS;
4545
4546         rscale = Max(rscale, base->dscale);
4547         rscale = Max(rscale, num->dscale);
4548         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4549         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4550
4551         local_rscale = rscale + 8;
4552
4553         /* Form natural logarithms */
4554         ln_var(base, &ln_base, local_rscale);
4555         ln_var(num, &ln_num, local_rscale);
4556
4557         ln_base.dscale = rscale;
4558         ln_num.dscale = rscale;
4559
4560         /* Select scale for division result */
4561         rscale = select_div_scale(&ln_num, &ln_base);
4562
4563         div_var(&ln_num, &ln_base, result, rscale);
4564
4565         free_var(&ln_num);
4566         free_var(&ln_base);
4567 }
4568
4569
4570 /*
4571  * power_var() -
4572  *
4573  *      Raise base to the power of exp
4574  *
4575  *      Note: this routine chooses dscale of the result.
4576  */
4577 static void
4578 power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
4579 {
4580         NumericVar      ln_base;
4581         NumericVar      ln_num;
4582         int                     dec_digits;
4583         int                     rscale;
4584         int                     local_rscale;
4585         double          val;
4586
4587         /* If exp can be represented as an integer, use power_var_int */
4588         if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1)
4589         {
4590                 /* exact integer, but does it fit in int? */
4591                 NumericVar      x;
4592                 int64           expval64;
4593
4594                 /* must copy because numericvar_to_int8() scribbles on input */
4595                 init_var(&x);
4596                 set_var_from_var(exp, &x);
4597                 if (numericvar_to_int8(&x, &expval64))
4598                 {
4599                         int                     expval = (int) expval64;
4600
4601                         /* Test for overflow by reverse-conversion. */
4602                         if ((int64) expval == expval64)
4603                         {
4604                                 /* Okay, select rscale */
4605                                 rscale = NUMERIC_MIN_SIG_DIGITS;
4606                                 rscale = Max(rscale, base->dscale);
4607                                 rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4608                                 rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4609
4610                                 power_var_int(base, expval, result, rscale);
4611
4612                                 free_var(&x);
4613                                 return;
4614                         }
4615                 }
4616                 free_var(&x);
4617         }
4618
4619         init_var(&ln_base);
4620         init_var(&ln_num);
4621
4622         /* Set scale for ln() calculation --- need extra accuracy here */
4623
4624         /* Approx decimal digits before decimal point */
4625         dec_digits = (base->weight + 1) * DEC_DIGITS;
4626
4627         if (dec_digits > 1)
4628                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(dec_digits - 1);
4629         else if (dec_digits < 1)
4630                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(1 - dec_digits);
4631         else
4632                 rscale = NUMERIC_MIN_SIG_DIGITS * 2;
4633
4634         rscale = Max(rscale, base->dscale * 2);
4635         rscale = Max(rscale, exp->dscale * 2);
4636         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2);
4637         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2);
4638
4639         local_rscale = rscale + 8;
4640
4641         ln_var(base, &ln_base, local_rscale);
4642
4643         mul_var(&ln_base, exp, &ln_num, local_rscale);
4644
4645         /* Set scale for exp() -- compare numeric_exp() */
4646
4647         /* convert input to float8, ignoring overflow */
4648         val = numericvar_to_double_no_overflow(&ln_num);
4649
4650         /*
4651          * log10(result) = num * log10(e), so this is approximately the
4652          * weight:
4653          */
4654         val *= 0.434294481903252;
4655
4656         /* limit to something that won't cause integer overflow */
4657         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
4658         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
4659
4660         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
4661         rscale = Max(rscale, base->dscale);
4662         rscale = Max(rscale, exp->dscale);
4663         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4664         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4665
4666         exp_var(&ln_num, result, rscale);
4667
4668         free_var(&ln_num);
4669         free_var(&ln_base);
4670 }
4671
4672 /*
4673  * power_var_int() -
4674  *
4675  *      Raise base to the power of exp, where exp is an integer.
4676  */
4677 static void
4678 power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale)
4679 {
4680         bool            neg;
4681         NumericVar      base_prod;
4682         int                     local_rscale;
4683
4684         /* Detect some special cases, particularly 0^0. */
4685
4686         switch (exp)
4687         {
4688                 case 0:
4689                         if (base->ndigits == 0)
4690                                 ereport(ERROR,
4691                                                 (errcode(ERRCODE_FLOATING_POINT_EXCEPTION),
4692                                                  errmsg("zero raised to zero is undefined")));
4693                         set_var_from_var(&const_one, result);
4694                         result->dscale = rscale;        /* no need to round */
4695                         return;
4696                 case 1:
4697                         set_var_from_var(base, result);
4698                         round_var(result, rscale);
4699                         return;
4700                 case -1:
4701                         div_var(&const_one, base, result, rscale);
4702                         return;
4703                 case 2:
4704                         mul_var(base, base, result, rscale);
4705                         return;
4706                 default:
4707                         break;
4708         }
4709
4710         /*
4711          * The general case repeatedly multiplies base according to the bit
4712          * pattern of exp.      We do the multiplications with some extra
4713          * precision.
4714          */
4715         neg = (exp < 0);
4716         exp = Abs(exp);
4717
4718         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
4719
4720         init_var(&base_prod);
4721         set_var_from_var(base, &base_prod);
4722
4723         if (exp & 1)
4724                 set_var_from_var(base, result);
4725         else
4726                 set_var_from_var(&const_one, result);
4727
4728         while ((exp >>= 1) > 0)
4729         {
4730                 mul_var(&base_prod, &base_prod, &base_prod, local_rscale);
4731                 if (exp & 1)
4732                         mul_var(&base_prod, result, result, local_rscale);
4733         }
4734
4735         free_var(&base_prod);
4736
4737         /* Compensate for input sign, and round to requested rscale */
4738         if (neg)
4739                 div_var(&const_one, result, result, rscale);
4740         else
4741                 round_var(result, rscale);
4742 }
4743
4744
4745 /* ----------------------------------------------------------------------
4746  *
4747  * Following are the lowest level functions that operate unsigned
4748  * on the variable level
4749  *
4750  * ----------------------------------------------------------------------
4751  */
4752
4753
4754 /* ----------
4755  * cmp_abs() -
4756  *
4757  *      Compare the absolute values of var1 and var2
4758  *      Returns:        -1 for ABS(var1) < ABS(var2)
4759  *                              0  for ABS(var1) == ABS(var2)
4760  *                              1  for ABS(var1) > ABS(var2)
4761  * ----------
4762  */
4763 static int
4764 cmp_abs(NumericVar *var1, NumericVar *var2)
4765 {
4766         NumericDigit *var1digits = var1->digits;
4767         NumericDigit *var2digits = var2->digits;
4768         int                     i1 = 0;
4769         int                     i2 = 0;
4770         int                     w1 = var1->weight;
4771         int                     w2 = var2->weight;
4772
4773         /* Check any digits before the first common digit */
4774
4775         while (w1 > w2 && i1 < var1->ndigits)
4776         {
4777                 if (var1digits[i1++] != 0)
4778                         return 1;
4779                 w1--;
4780         }
4781         while (w2 > w1 && i2 < var2->ndigits)
4782         {
4783                 if (var2digits[i2++] != 0)
4784                         return -1;
4785                 w2--;
4786         }
4787
4788         /* At this point, either w1 == w2 or we've run out of digits */
4789
4790         if (w1 == w2)
4791         {
4792                 while (i1 < var1->ndigits && i2 < var2->ndigits)
4793                 {
4794                         int                     stat = var1digits[i1++] - var2digits[i2++];
4795
4796                         if (stat)
4797                         {
4798                                 if (stat > 0)
4799                                         return 1;
4800                                 return -1;
4801                         }
4802                 }
4803         }
4804
4805         /*
4806          * At this point, we've run out of digits on one side or the other; so
4807          * any remaining nonzero digits imply that side is larger
4808          */
4809         while (i1 < var1->ndigits)
4810         {
4811                 if (var1digits[i1++] != 0)
4812                         return 1;
4813         }
4814         while (i2 < var2->ndigits)
4815         {
4816                 if (var2digits[i2++] != 0)
4817                         return -1;
4818         }
4819
4820         return 0;
4821 }
4822
4823
4824 /*
4825  * add_abs() -
4826  *
4827  *      Add the absolute values of two variables into result.
4828  *      result might point to one of the operands without danger.
4829  */
4830 static void
4831 add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
4832 {
4833         NumericDigit *res_buf;
4834         NumericDigit *res_digits;
4835         int                     res_ndigits;
4836         int                     res_weight;
4837         int                     res_rscale,
4838                                 rscale1,
4839                                 rscale2;
4840         int                     res_dscale;
4841         int                     i,
4842                                 i1,
4843                                 i2;
4844         int                     carry = 0;
4845
4846         /* copy these values into local vars for speed in inner loop */
4847         int                     var1ndigits = var1->ndigits;
4848         int                     var2ndigits = var2->ndigits;
4849         NumericDigit *var1digits = var1->digits;
4850         NumericDigit *var2digits = var2->digits;
4851
4852         res_weight = Max(var1->weight, var2->weight) + 1;
4853
4854         res_dscale = Max(var1->dscale, var2->dscale);
4855
4856         /* Note: here we are figuring rscale in base-NBASE digits */
4857         rscale1 = var1->ndigits - var1->weight - 1;
4858         rscale2 = var2->ndigits - var2->weight - 1;
4859         res_rscale = Max(rscale1, rscale2);
4860
4861         res_ndigits = res_rscale + res_weight + 1;
4862         if (res_ndigits <= 0)
4863                 res_ndigits = 1;
4864
4865         res_buf = digitbuf_alloc(res_ndigits + 1);
4866         res_buf[0] = 0;                         /* spare digit for later rounding */
4867         res_digits = res_buf + 1;
4868
4869         i1 = res_rscale + var1->weight + 1;
4870         i2 = res_rscale + var2->weight + 1;
4871         for (i = res_ndigits - 1; i >= 0; i--)
4872         {
4873                 i1--;
4874                 i2--;
4875                 if (i1 >= 0 && i1 < var1ndigits)
4876                         carry += var1digits[i1];
4877                 if (i2 >= 0 && i2 < var2ndigits)
4878                         carry += var2digits[i2];
4879
4880                 if (carry >= NBASE)
4881                 {
4882                         res_digits[i] = carry - NBASE;
4883                         carry = 1;
4884                 }
4885                 else
4886                 {
4887                         res_digits[i] = carry;
4888                         carry = 0;
4889                 }
4890         }
4891
4892         Assert(carry == 0);                     /* else we failed to allow for carry out */
4893
4894         digitbuf_free(result->buf);
4895         result->ndigits = res_ndigits;
4896         result->buf = res_buf;
4897         result->digits = res_digits;
4898         result->weight = res_weight;
4899         result->dscale = res_dscale;
4900
4901         /* Remove leading/trailing zeroes */
4902         strip_var(result);
4903 }
4904
4905
4906 /*
4907  * sub_abs()
4908  *
4909  *      Subtract the absolute value of var2 from the absolute value of var1
4910  *      and store in result. result might point to one of the operands
4911  *      without danger.
4912  *
4913  *      ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!!
4914  */
4915 static void
4916 sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
4917 {
4918         NumericDigit *res_buf;
4919         NumericDigit *res_digits;
4920         int                     res_ndigits;
4921         int                     res_weight;
4922         int                     res_rscale,
4923                                 rscale1,
4924                                 rscale2;
4925         int                     res_dscale;
4926         int                     i,
4927                                 i1,
4928                                 i2;
4929         int                     borrow = 0;
4930
4931         /* copy these values into local vars for speed in inner loop */
4932         int                     var1ndigits = var1->ndigits;
4933         int                     var2ndigits = var2->ndigits;
4934         NumericDigit *var1digits = var1->digits;
4935         NumericDigit *var2digits = var2->digits;
4936
4937         res_weight = var1->weight;
4938
4939         res_dscale = Max(var1->dscale, var2->dscale);
4940
4941         /* Note: here we are figuring rscale in base-NBASE digits */
4942         rscale1 = var1->ndigits - var1->weight - 1;
4943         rscale2 = var2->ndigits - var2->weight - 1;
4944         res_rscale = Max(rscale1, rscale2);
4945
4946         res_ndigits = res_rscale + res_weight + 1;
4947         if (res_ndigits <= 0)
4948                 res_ndigits = 1;
4949
4950         res_buf = digitbuf_alloc(res_ndigits + 1);
4951         res_buf[0] = 0;                         /* spare digit for later rounding */
4952         res_digits = res_buf + 1;
4953
4954         i1 = res_rscale + var1->weight + 1;
4955         i2 = res_rscale + var2->weight + 1;
4956         for (i = res_ndigits - 1; i >= 0; i--)
4957         {
4958                 i1--;
4959                 i2--;
4960                 if (i1 >= 0 && i1 < var1ndigits)
4961                         borrow += var1digits[i1];
4962                 if (i2 >= 0 && i2 < var2ndigits)
4963                         borrow -= var2digits[i2];
4964
4965                 if (borrow < 0)
4966                 {
4967                         res_digits[i] = borrow + NBASE;
4968                         borrow = -1;
4969                 }
4970                 else
4971                 {
4972                         res_digits[i] = borrow;
4973                         borrow = 0;
4974                 }
4975         }
4976
4977         Assert(borrow == 0);            /* else caller gave us var1 < var2 */
4978
4979         digitbuf_free(result->buf);
4980         result->ndigits = res_ndigits;
4981         result->buf = res_buf;
4982         result->digits = res_digits;
4983         result->weight = res_weight;
4984         result->dscale = res_dscale;
4985
4986         /* Remove leading/trailing zeroes */
4987         strip_var(result);
4988 }
4989
4990 /*
4991  * round_var
4992  *
4993  * Round the value of a variable to no more than rscale decimal digits
4994  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
4995  * rounding before the decimal point.
4996  */
4997 static void
4998 round_var(NumericVar *var, int rscale)
4999 {
5000         NumericDigit *digits = var->digits;
5001         int                     di;
5002         int                     ndigits;
5003         int                     carry;
5004
5005         var->dscale = rscale;
5006
5007         /* decimal digits wanted */
5008         di = (var->weight + 1) * DEC_DIGITS + rscale;
5009
5010         /*
5011          * If di = 0, the value loses all digits, but could round up to 1 if
5012          * its first extra digit is >= 5.  If di < 0 the result must be 0.
5013          */
5014         if (di < 0)
5015         {
5016                 var->ndigits = 0;
5017                 var->weight = 0;
5018                 var->sign = NUMERIC_POS;
5019         }
5020         else
5021         {
5022                 /* NBASE digits wanted */
5023                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5024
5025                 /* 0, or number of decimal digits to keep in last NBASE digit */
5026                 di %= DEC_DIGITS;
5027
5028                 if (ndigits < var->ndigits ||
5029                         (ndigits == var->ndigits && di > 0))
5030                 {
5031                         var->ndigits = ndigits;
5032
5033 #if DEC_DIGITS == 1
5034                         /* di must be zero */
5035                         carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5036 #else
5037                         if (di == 0)
5038                                 carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5039                         else
5040                         {
5041                                 /* Must round within last NBASE digit */
5042                                 int                     extra,
5043                                                         pow10;
5044
5045 #if DEC_DIGITS == 4
5046                                 pow10 = round_powers[di];
5047 #elif DEC_DIGITS == 2
5048                                 pow10 = 10;
5049 #else
5050 #error unsupported NBASE
5051 #endif
5052                                 extra = digits[--ndigits] % pow10;
5053                                 digits[ndigits] -= extra;
5054                                 carry = 0;
5055                                 if (extra >= pow10 / 2)
5056                                 {
5057                                         pow10 += digits[ndigits];
5058                                         if (pow10 >= NBASE)
5059                                         {
5060                                                 pow10 -= NBASE;
5061                                                 carry = 1;
5062                                         }
5063                                         digits[ndigits] = pow10;
5064                                 }
5065                         }
5066 #endif
5067
5068                         /* Propagate carry if needed */
5069                         while (carry)
5070                         {
5071                                 carry += digits[--ndigits];
5072                                 if (carry >= NBASE)
5073                                 {
5074                                         digits[ndigits] = carry - NBASE;
5075                                         carry = 1;
5076                                 }
5077                                 else
5078                                 {
5079                                         digits[ndigits] = carry;
5080                                         carry = 0;
5081                                 }
5082                         }
5083
5084                         if (ndigits < 0)
5085                         {
5086                                 Assert(ndigits == -1);  /* better not have added > 1 digit */
5087                                 Assert(var->digits > var->buf);
5088                                 var->digits--;
5089                                 var->ndigits++;
5090                                 var->weight++;
5091                         }
5092                 }
5093         }
5094 }
5095
5096 /*
5097  * trunc_var
5098  *
5099  * Truncate the value of a variable at rscale decimal digits after the
5100  * decimal point.  NOTE: we allow rscale < 0 here, implying
5101  * truncation before the decimal point.
5102  */
5103 static void
5104 trunc_var(NumericVar *var, int rscale)
5105 {
5106         int                     di;
5107         int                     ndigits;
5108
5109         var->dscale = rscale;
5110
5111         /* decimal digits wanted */
5112         di = (var->weight + 1) * DEC_DIGITS + rscale;
5113
5114         /*
5115          * If di <= 0, the value loses all digits.
5116          */
5117         if (di <= 0)
5118         {
5119                 var->ndigits = 0;
5120                 var->weight = 0;
5121                 var->sign = NUMERIC_POS;
5122         }
5123         else
5124         {
5125                 /* NBASE digits wanted */
5126                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5127
5128                 if (ndigits <= var->ndigits)
5129                 {
5130                         var->ndigits = ndigits;
5131
5132 #if DEC_DIGITS == 1
5133                         /* no within-digit stuff to worry about */
5134 #else
5135                         /* 0, or number of decimal digits to keep in last NBASE digit */
5136                         di %= DEC_DIGITS;
5137
5138                         if (di > 0)
5139                         {
5140                                 /* Must truncate within last NBASE digit */
5141                                 NumericDigit *digits = var->digits;
5142                                 int                     extra,
5143                                                         pow10;
5144
5145 #if DEC_DIGITS == 4
5146                                 pow10 = round_powers[di];
5147 #elif DEC_DIGITS == 2
5148                                 pow10 = 10;
5149 #else
5150 #error unsupported NBASE
5151 #endif
5152                                 extra = digits[--ndigits] % pow10;
5153                                 digits[ndigits] -= extra;
5154                         }
5155 #endif
5156                 }
5157         }
5158 }
5159
5160 /*
5161  * strip_var
5162  *
5163  * Strip any leading and trailing zeroes from a numeric variable
5164  */
5165 static void
5166 strip_var(NumericVar *var)
5167 {
5168         NumericDigit *digits = var->digits;
5169         int                     ndigits = var->ndigits;
5170
5171         /* Strip leading zeroes */
5172         while (ndigits > 0 && *digits == 0)
5173         {
5174                 digits++;
5175                 var->weight--;
5176                 ndigits--;
5177         }
5178
5179         /* Strip trailing zeroes */
5180         while (ndigits > 0 && digits[ndigits - 1] == 0)
5181                 ndigits--;
5182
5183         /* If it's zero, normalize the sign and weight */
5184         if (ndigits == 0)
5185         {
5186                 var->sign = NUMERIC_POS;
5187                 var->weight = 0;
5188         }
5189
5190         var->digits = digits;
5191         var->ndigits = ndigits;
5192 }