]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/numeric.c
f173f30211c7a62d414e2d4f18bc2165223d7e44
[postgresql] / src / backend / utils / adt / numeric.c
1 /*-------------------------------------------------------------------------
2  *
3  * numeric.c
4  *        An exact numeric data type for the Postgres database system
5  *
6  * Original coding 1998, Jan Wieck.  Heavily revised 2003, Tom Lane.
7  *
8  * Many of the algorithmic ideas are borrowed from David M. Smith's "FM"
9  * multiple-precision math library, most recently published as Algorithm
10  * 786: Multiple-Precision Complex Arithmetic and Functions, ACM
11  * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998,
12  * pages 359-367.
13  *
14  * Copyright (c) 1998-2007, PostgreSQL Global Development Group
15  *
16  * IDENTIFICATION
17  *        $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.101 2007/02/27 23:48:08 tgl Exp $
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include <ctype.h>
25 #include <float.h>
26 #include <limits.h>
27 #include <math.h>
28
29 #include "catalog/pg_type.h"
30 #include "libpq/pqformat.h"
31 #include "utils/array.h"
32 #include "utils/builtins.h"
33 #include "utils/int8.h"
34 #include "utils/numeric.h"
35
36 /* ----------
37  * Uncomment the following to enable compilation of dump_numeric()
38  * and dump_var() and to get a dump of any result produced by make_result().
39  * ----------
40 #define NUMERIC_DEBUG
41  */
42
43
44 /* ----------
45  * Local data types
46  *
47  * Numeric values are represented in a base-NBASE floating point format.
48  * Each "digit" ranges from 0 to NBASE-1.  The type NumericDigit is signed
49  * and wide enough to store a digit.  We assume that NBASE*NBASE can fit in
50  * an int.      Although the purely calculational routines could handle any even
51  * NBASE that's less than sqrt(INT_MAX), in practice we are only interested
52  * in NBASE a power of ten, so that I/O conversions and decimal rounding
53  * are easy.  Also, it's actually more efficient if NBASE is rather less than
54  * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var to
55  * postpone processing carries.
56  * ----------
57  */
58
59 #if 0
60 #define NBASE           10
61 #define HALF_NBASE      5
62 #define DEC_DIGITS      1                       /* decimal digits per NBASE digit */
63 #define MUL_GUARD_DIGITS        4       /* these are measured in NBASE digits */
64 #define DIV_GUARD_DIGITS        8
65
66 typedef signed char NumericDigit;
67 #endif
68
69 #if 0
70 #define NBASE           100
71 #define HALF_NBASE      50
72 #define DEC_DIGITS      2                       /* decimal digits per NBASE digit */
73 #define MUL_GUARD_DIGITS        3       /* these are measured in NBASE digits */
74 #define DIV_GUARD_DIGITS        6
75
76 typedef signed char NumericDigit;
77 #endif
78
79 #if 1
80 #define NBASE           10000
81 #define HALF_NBASE      5000
82 #define DEC_DIGITS      4                       /* decimal digits per NBASE digit */
83 #define MUL_GUARD_DIGITS        2       /* these are measured in NBASE digits */
84 #define DIV_GUARD_DIGITS        4
85
86 typedef int16 NumericDigit;
87 #endif
88
89
90 /* ----------
91  * The value represented by a NumericVar is determined by the sign, weight,
92  * ndigits, and digits[] array.
93  * Note: the first digit of a NumericVar's value is assumed to be multiplied
94  * by NBASE ** weight.  Another way to say it is that there are weight+1
95  * digits before the decimal point.  It is possible to have weight < 0.
96  *
97  * buf points at the physical start of the palloc'd digit buffer for the
98  * NumericVar.  digits points at the first digit in actual use (the one
99  * with the specified weight).  We normally leave an unused digit or two
100  * (preset to zeroes) between buf and digits, so that there is room to store
101  * a carry out of the top digit without special pushups.  We just need to
102  * decrement digits (and increment weight) to make room for the carry digit.
103  * (There is no such extra space in a numeric value stored in the database,
104  * only in a NumericVar in memory.)
105  *
106  * If buf is NULL then the digit buffer isn't actually palloc'd and should
107  * not be freed --- see the constants below for an example.
108  *
109  * dscale, or display scale, is the nominal precision expressed as number
110  * of digits after the decimal point (it must always be >= 0 at present).
111  * dscale may be more than the number of physically stored fractional digits,
112  * implying that we have suppressed storage of significant trailing zeroes.
113  * It should never be less than the number of stored digits, since that would
114  * imply hiding digits that are present.  NOTE that dscale is always expressed
115  * in *decimal* digits, and so it may correspond to a fractional number of
116  * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits.
117  *
118  * rscale, or result scale, is the target precision for a computation.
119  * Like dscale it is expressed as number of *decimal* digits after the decimal
120  * point, and is always >= 0 at present.
121  * Note that rscale is not stored in variables --- it's figured on-the-fly
122  * from the dscales of the inputs.
123  *
124  * NB: All the variable-level functions are written in a style that makes it
125  * possible to give one and the same variable as argument and destination.
126  * This is feasible because the digit buffer is separate from the variable.
127  * ----------
128  */
129 typedef struct NumericVar
130 {
131         int                     ndigits;                /* # of digits in digits[] - can be 0! */
132         int                     weight;                 /* weight of first digit */
133         int                     sign;                   /* NUMERIC_POS, NUMERIC_NEG, or NUMERIC_NAN */
134         int                     dscale;                 /* display scale */
135         NumericDigit *buf;                      /* start of palloc'd space for digits[] */
136         NumericDigit *digits;           /* base-NBASE digits */
137 } NumericVar;
138
139
140 /* ----------
141  * Some preinitialized constants
142  * ----------
143  */
144 static NumericDigit const_zero_data[1] = {0};
145 static NumericVar const_zero =
146 {0, 0, NUMERIC_POS, 0, NULL, const_zero_data};
147
148 static NumericDigit const_one_data[1] = {1};
149 static NumericVar const_one =
150 {1, 0, NUMERIC_POS, 0, NULL, const_one_data};
151
152 static NumericDigit const_two_data[1] = {2};
153 static NumericVar const_two =
154 {1, 0, NUMERIC_POS, 0, NULL, const_two_data};
155
156 #if DEC_DIGITS == 4
157 static NumericDigit const_zero_point_five_data[1] = {5000};
158 #elif DEC_DIGITS == 2
159 static NumericDigit const_zero_point_five_data[1] = {50};
160 #elif DEC_DIGITS == 1
161 static NumericDigit const_zero_point_five_data[1] = {5};
162 #endif
163 static NumericVar const_zero_point_five =
164 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data};
165
166 #if DEC_DIGITS == 4
167 static NumericDigit const_zero_point_nine_data[1] = {9000};
168 #elif DEC_DIGITS == 2
169 static NumericDigit const_zero_point_nine_data[1] = {90};
170 #elif DEC_DIGITS == 1
171 static NumericDigit const_zero_point_nine_data[1] = {9};
172 #endif
173 static NumericVar const_zero_point_nine =
174 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data};
175
176 #if DEC_DIGITS == 4
177 static NumericDigit const_zero_point_01_data[1] = {100};
178 static NumericVar const_zero_point_01 =
179 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
180 #elif DEC_DIGITS == 2
181 static NumericDigit const_zero_point_01_data[1] = {1};
182 static NumericVar const_zero_point_01 =
183 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
184 #elif DEC_DIGITS == 1
185 static NumericDigit const_zero_point_01_data[1] = {1};
186 static NumericVar const_zero_point_01 =
187 {1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
188 #endif
189
190 #if DEC_DIGITS == 4
191 static NumericDigit const_one_point_one_data[2] = {1, 1000};
192 #elif DEC_DIGITS == 2
193 static NumericDigit const_one_point_one_data[2] = {1, 10};
194 #elif DEC_DIGITS == 1
195 static NumericDigit const_one_point_one_data[2] = {1, 1};
196 #endif
197 static NumericVar const_one_point_one =
198 {2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data};
199
200 static NumericVar const_nan =
201 {0, 0, NUMERIC_NAN, 0, NULL, NULL};
202
203 #if DEC_DIGITS == 4
204 static const int round_powers[4] = {0, 1000, 100, 10};
205 #endif
206
207
208 /* ----------
209  * Local functions
210  * ----------
211  */
212
213 #ifdef NUMERIC_DEBUG
214 static void dump_numeric(const char *str, Numeric num);
215 static void dump_var(const char *str, NumericVar *var);
216 #else
217 #define dump_numeric(s,n)
218 #define dump_var(s,v)
219 #endif
220
221 #define digitbuf_alloc(ndigits)  \
222         ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit)))
223 #define digitbuf_free(buf)      \
224         do { \
225                  if ((buf) != NULL) \
226                          pfree(buf); \
227         } while (0)
228
229 #define init_var(v)             MemSetAligned(v, 0, sizeof(NumericVar))
230
231 #define NUMERIC_DIGITS(num) ((NumericDigit *)(num)->n_data)
232 #define NUMERIC_NDIGITS(num) \
233         ((VARSIZE(num) - NUMERIC_HDRSZ) / sizeof(NumericDigit))
234
235 static void alloc_var(NumericVar *var, int ndigits);
236 static void free_var(NumericVar *var);
237 static void zero_var(NumericVar *var);
238
239 static void set_var_from_str(const char *str, NumericVar *dest);
240 static void set_var_from_num(Numeric value, NumericVar *dest);
241 static void set_var_from_var(NumericVar *value, NumericVar *dest);
242 static char *get_str_from_var(NumericVar *var, int dscale);
243
244 static Numeric make_result(NumericVar *var);
245
246 static void apply_typmod(NumericVar *var, int32 typmod);
247
248 static int32 numericvar_to_int4(NumericVar *var);
249 static bool numericvar_to_int8(NumericVar *var, int64 *result);
250 static void int8_to_numericvar(int64 val, NumericVar *var);
251 static double numeric_to_double_no_overflow(Numeric num);
252 static double numericvar_to_double_no_overflow(NumericVar *var);
253
254 static int      cmp_numerics(Numeric num1, Numeric num2);
255 static int      cmp_var(NumericVar *var1, NumericVar *var2);
256 static int cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
257                            int var1weight, int var1sign,
258                            const NumericDigit *var2digits, int var2ndigits,
259                            int var2weight, int var2sign);
260 static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
261 static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
262 static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
263                 int rscale);
264 static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
265                 int rscale, bool round);
266 static int      select_div_scale(NumericVar *var1, NumericVar *var2);
267 static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
268 static void ceil_var(NumericVar *var, NumericVar *result);
269 static void floor_var(NumericVar *var, NumericVar *result);
270
271 static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale);
272 static void exp_var(NumericVar *arg, NumericVar *result, int rscale);
273 static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale);
274 static void ln_var(NumericVar *arg, NumericVar *result, int rscale);
275 static void log_var(NumericVar *base, NumericVar *num, NumericVar *result);
276 static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result);
277 static void power_var_int(NumericVar *base, int exp, NumericVar *result,
278                           int rscale);
279
280 static int      cmp_abs(NumericVar *var1, NumericVar *var2);
281 static int cmp_abs_common(const NumericDigit *var1digits, int var1ndigits,
282                            int var1weight,
283                            const NumericDigit *var2digits, int var2ndigits,
284                            int var2weight);
285 static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
286 static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
287 static void round_var(NumericVar *var, int rscale);
288 static void trunc_var(NumericVar *var, int rscale);
289 static void strip_var(NumericVar *var);
290 static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
291                            NumericVar *count_var, NumericVar *result_var);
292
293
294 /* ----------------------------------------------------------------------
295  *
296  * Input-, output- and rounding-functions
297  *
298  * ----------------------------------------------------------------------
299  */
300
301
302 /*
303  * numeric_in() -
304  *
305  *      Input function for numeric data type
306  */
307 Datum
308 numeric_in(PG_FUNCTION_ARGS)
309 {
310         char       *str = PG_GETARG_CSTRING(0);
311
312 #ifdef NOT_USED
313         Oid                     typelem = PG_GETARG_OID(1);
314 #endif
315         int32           typmod = PG_GETARG_INT32(2);
316         NumericVar      value;
317         Numeric         res;
318
319         /*
320          * Check for NaN
321          */
322         if (pg_strcasecmp(str, "NaN") == 0)
323                 PG_RETURN_NUMERIC(make_result(&const_nan));
324
325         /*
326          * Use set_var_from_str() to parse the input string and return it in the
327          * packed DB storage format
328          */
329         init_var(&value);
330         set_var_from_str(str, &value);
331
332         apply_typmod(&value, typmod);
333
334         res = make_result(&value);
335         free_var(&value);
336
337         PG_RETURN_NUMERIC(res);
338 }
339
340
341 /*
342  * numeric_out() -
343  *
344  *      Output function for numeric data type
345  */
346 Datum
347 numeric_out(PG_FUNCTION_ARGS)
348 {
349         Numeric         num = PG_GETARG_NUMERIC(0);
350         NumericVar      x;
351         char       *str;
352
353         /*
354          * Handle NaN
355          */
356         if (NUMERIC_IS_NAN(num))
357                 PG_RETURN_CSTRING(pstrdup("NaN"));
358
359         /*
360          * Get the number in the variable format.
361          *
362          * Even if we didn't need to change format, we'd still need to copy the
363          * value to have a modifiable copy for rounding.  set_var_from_num() also
364          * guarantees there is extra digit space in case we produce a carry out
365          * from rounding.
366          */
367         init_var(&x);
368         set_var_from_num(num, &x);
369
370         str = get_str_from_var(&x, x.dscale);
371
372         free_var(&x);
373
374         PG_RETURN_CSTRING(str);
375 }
376
377 /*
378  *              numeric_recv                    - converts external binary format to numeric
379  *
380  * External format is a sequence of int16's:
381  * ndigits, weight, sign, dscale, NumericDigits.
382  */
383 Datum
384 numeric_recv(PG_FUNCTION_ARGS)
385 {
386         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
387
388 #ifdef NOT_USED
389         Oid                     typelem = PG_GETARG_OID(1);
390 #endif
391         int32           typmod = PG_GETARG_INT32(2);
392         NumericVar      value;
393         Numeric         res;
394         int                     len,
395                                 i;
396
397         init_var(&value);
398
399         len = (uint16) pq_getmsgint(buf, sizeof(uint16));
400         if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE)
401                 ereport(ERROR,
402                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
403                                  errmsg("invalid length in external \"numeric\" value")));
404
405         alloc_var(&value, len);
406
407         value.weight = (int16) pq_getmsgint(buf, sizeof(int16));
408         value.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
409         if (!(value.sign == NUMERIC_POS ||
410                   value.sign == NUMERIC_NEG ||
411                   value.sign == NUMERIC_NAN))
412                 ereport(ERROR,
413                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
414                                  errmsg("invalid sign in external \"numeric\" value")));
415
416         value.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
417         for (i = 0; i < len; i++)
418         {
419                 NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
420
421                 if (d < 0 || d >= NBASE)
422                         ereport(ERROR,
423                                         (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
424                                          errmsg("invalid digit in external \"numeric\" value")));
425                 value.digits[i] = d;
426         }
427
428         apply_typmod(&value, typmod);
429
430         res = make_result(&value);
431         free_var(&value);
432
433         PG_RETURN_NUMERIC(res);
434 }
435
436 /*
437  *              numeric_send                    - converts numeric to binary format
438  */
439 Datum
440 numeric_send(PG_FUNCTION_ARGS)
441 {
442         Numeric         num = PG_GETARG_NUMERIC(0);
443         NumericVar      x;
444         StringInfoData buf;
445         int                     i;
446
447         init_var(&x);
448         set_var_from_num(num, &x);
449
450         pq_begintypsend(&buf);
451
452         pq_sendint(&buf, x.ndigits, sizeof(int16));
453         pq_sendint(&buf, x.weight, sizeof(int16));
454         pq_sendint(&buf, x.sign, sizeof(int16));
455         pq_sendint(&buf, x.dscale, sizeof(int16));
456         for (i = 0; i < x.ndigits; i++)
457                 pq_sendint(&buf, x.digits[i], sizeof(NumericDigit));
458
459         free_var(&x);
460
461         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
462 }
463
464
465 /*
466  * numeric() -
467  *
468  *      This is a special function called by the Postgres database system
469  *      before a value is stored in a tuple's attribute. The precision and
470  *      scale of the attribute have to be applied on the value.
471  */
472 Datum
473 numeric(PG_FUNCTION_ARGS)
474 {
475         Numeric         num = PG_GETARG_NUMERIC(0);
476         int32           typmod = PG_GETARG_INT32(1);
477         Numeric         new;
478         int32           tmp_typmod;
479         int                     precision;
480         int                     scale;
481         int                     ddigits;
482         int                     maxdigits;
483         NumericVar      var;
484
485         /*
486          * Handle NaN
487          */
488         if (NUMERIC_IS_NAN(num))
489                 PG_RETURN_NUMERIC(make_result(&const_nan));
490
491         /*
492          * If the value isn't a valid type modifier, simply return a copy of the
493          * input value
494          */
495         if (typmod < (int32) (VARHDRSZ))
496         {
497                 new = (Numeric) palloc(VARSIZE(num));
498                 memcpy(new, num, VARSIZE(num));
499                 PG_RETURN_NUMERIC(new);
500         }
501
502         /*
503          * Get the precision and scale out of the typmod value
504          */
505         tmp_typmod = typmod - VARHDRSZ;
506         precision = (tmp_typmod >> 16) & 0xffff;
507         scale = tmp_typmod & 0xffff;
508         maxdigits = precision - scale;
509
510         /*
511          * If the number is certainly in bounds and due to the target scale no
512          * rounding could be necessary, just make a copy of the input and modify
513          * its scale fields.  (Note we assume the existing dscale is honest...)
514          */
515         ddigits = (num->n_weight + 1) * DEC_DIGITS;
516         if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num))
517         {
518                 new = (Numeric) palloc(VARSIZE(num));
519                 memcpy(new, num, VARSIZE(num));
520                 new->n_sign_dscale = NUMERIC_SIGN(new) |
521                         ((uint16) scale & NUMERIC_DSCALE_MASK);
522                 PG_RETURN_NUMERIC(new);
523         }
524
525         /*
526          * We really need to fiddle with things - unpack the number into a
527          * variable and let apply_typmod() do it.
528          */
529         init_var(&var);
530
531         set_var_from_num(num, &var);
532         apply_typmod(&var, typmod);
533         new = make_result(&var);
534
535         free_var(&var);
536
537         PG_RETURN_NUMERIC(new);
538 }
539
540 Datum
541 numerictypmodin(PG_FUNCTION_ARGS)
542 {
543         ArrayType   *ta = PG_GETARG_ARRAYTYPE_P(0);
544         int32           *tl;
545         int                     n;
546         int32           typmod;
547
548         tl = ArrayGetTypmods(ta, &n);
549
550         if (n == 2)
551         {
552                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
553                         ereport(ERROR,
554                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
555                                          errmsg("NUMERIC precision %d must be between 1 and %d",
556                                                         tl[0], NUMERIC_MAX_PRECISION)));
557                 if (tl[1] < 0 || tl[1] > tl[0])
558                         ereport(ERROR,
559                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
560                                          errmsg("NUMERIC scale %d must be between 0 and precision %d",
561                                                         tl[1], tl[0])));
562                 typmod = ((tl[0] << 16) | tl[1]) + VARHDRSZ;
563         }
564         else if (n == 1)
565         {
566                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
567                         ereport(ERROR,
568                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
569                                          errmsg("NUMERIC precision %d must be between 1 and %d",
570                                                         tl[0], NUMERIC_MAX_PRECISION)));
571                 /* scale defaults to zero */
572                 typmod = (tl[0] << 16) + VARHDRSZ;
573         }
574         else
575         {
576                 ereport(ERROR,
577                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
578                                                 errmsg("invalid NUMERIC type modifier")));
579                 typmod = 0;                             /* keep compiler quiet */
580         }
581
582         PG_RETURN_INT32(typmod);
583 }
584
585 Datum
586 numerictypmodout(PG_FUNCTION_ARGS)
587 {
588         int32   typmod = PG_GETARG_INT32(0);
589         char   *res = (char *) palloc(64);
590
591         if (typmod >= 0)
592                 snprintf(res, 64, "(%d,%d)",
593                                  ((typmod - VARHDRSZ) >> 16) & 0xffff,
594                                  (typmod - VARHDRSZ) & 0xffff);
595         else
596                 *res = '\0';
597
598         PG_RETURN_CSTRING(res);
599 }
600
601
602 /* ----------------------------------------------------------------------
603  *
604  * Sign manipulation, rounding and the like
605  *
606  * ----------------------------------------------------------------------
607  */
608
609 Datum
610 numeric_abs(PG_FUNCTION_ARGS)
611 {
612         Numeric         num = PG_GETARG_NUMERIC(0);
613         Numeric         res;
614
615         /*
616          * Handle NaN
617          */
618         if (NUMERIC_IS_NAN(num))
619                 PG_RETURN_NUMERIC(make_result(&const_nan));
620
621         /*
622          * Do it the easy way directly on the packed format
623          */
624         res = (Numeric) palloc(VARSIZE(num));
625         memcpy(res, num, VARSIZE(num));
626
627         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
628
629         PG_RETURN_NUMERIC(res);
630 }
631
632
633 Datum
634 numeric_uminus(PG_FUNCTION_ARGS)
635 {
636         Numeric         num = PG_GETARG_NUMERIC(0);
637         Numeric         res;
638
639         /*
640          * Handle NaN
641          */
642         if (NUMERIC_IS_NAN(num))
643                 PG_RETURN_NUMERIC(make_result(&const_nan));
644
645         /*
646          * Do it the easy way directly on the packed format
647          */
648         res = (Numeric) palloc(VARSIZE(num));
649         memcpy(res, num, VARSIZE(num));
650
651         /*
652          * The packed format is known to be totally zero digit trimmed always. So
653          * we can identify a ZERO by the fact that there are no digits at all.  Do
654          * nothing to a zero.
655          */
656         if (VARSIZE(num) != NUMERIC_HDRSZ)
657         {
658                 /* Else, flip the sign */
659                 if (NUMERIC_SIGN(num) == NUMERIC_POS)
660                         res->n_sign_dscale = NUMERIC_NEG | NUMERIC_DSCALE(num);
661                 else
662                         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
663         }
664
665         PG_RETURN_NUMERIC(res);
666 }
667
668
669 Datum
670 numeric_uplus(PG_FUNCTION_ARGS)
671 {
672         Numeric         num = PG_GETARG_NUMERIC(0);
673         Numeric         res;
674
675         res = (Numeric) palloc(VARSIZE(num));
676         memcpy(res, num, VARSIZE(num));
677
678         PG_RETURN_NUMERIC(res);
679 }
680
681 /*
682  * numeric_sign() -
683  *
684  * returns -1 if the argument is less than 0, 0 if the argument is equal
685  * to 0, and 1 if the argument is greater than zero.
686  */
687 Datum
688 numeric_sign(PG_FUNCTION_ARGS)
689 {
690         Numeric         num = PG_GETARG_NUMERIC(0);
691         Numeric         res;
692         NumericVar      result;
693
694         /*
695          * Handle NaN
696          */
697         if (NUMERIC_IS_NAN(num))
698                 PG_RETURN_NUMERIC(make_result(&const_nan));
699
700         init_var(&result);
701
702         /*
703          * The packed format is known to be totally zero digit trimmed always. So
704          * we can identify a ZERO by the fact that there are no digits at all.
705          */
706         if (VARSIZE(num) == NUMERIC_HDRSZ)
707                 set_var_from_var(&const_zero, &result);
708         else
709         {
710                 /*
711                  * And if there are some, we return a copy of ONE with the sign of our
712                  * argument
713                  */
714                 set_var_from_var(&const_one, &result);
715                 result.sign = NUMERIC_SIGN(num);
716         }
717
718         res = make_result(&result);
719         free_var(&result);
720
721         PG_RETURN_NUMERIC(res);
722 }
723
724
725 /*
726  * numeric_round() -
727  *
728  *      Round a value to have 'scale' digits after the decimal point.
729  *      We allow negative 'scale', implying rounding before the decimal
730  *      point --- Oracle interprets rounding that way.
731  */
732 Datum
733 numeric_round(PG_FUNCTION_ARGS)
734 {
735         Numeric         num = PG_GETARG_NUMERIC(0);
736         int32           scale = PG_GETARG_INT32(1);
737         Numeric         res;
738         NumericVar      arg;
739
740         /*
741          * Handle NaN
742          */
743         if (NUMERIC_IS_NAN(num))
744                 PG_RETURN_NUMERIC(make_result(&const_nan));
745
746         /*
747          * Limit the scale value to avoid possible overflow in calculations
748          */
749         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
750         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
751
752         /*
753          * Unpack the argument and round it at the proper digit position
754          */
755         init_var(&arg);
756         set_var_from_num(num, &arg);
757
758         round_var(&arg, scale);
759
760         /* We don't allow negative output dscale */
761         if (scale < 0)
762                 arg.dscale = 0;
763
764         /*
765          * Return the rounded result
766          */
767         res = make_result(&arg);
768
769         free_var(&arg);
770         PG_RETURN_NUMERIC(res);
771 }
772
773
774 /*
775  * numeric_trunc() -
776  *
777  *      Truncate a value to have 'scale' digits after the decimal point.
778  *      We allow negative 'scale', implying a truncation before the decimal
779  *      point --- Oracle interprets truncation that way.
780  */
781 Datum
782 numeric_trunc(PG_FUNCTION_ARGS)
783 {
784         Numeric         num = PG_GETARG_NUMERIC(0);
785         int32           scale = PG_GETARG_INT32(1);
786         Numeric         res;
787         NumericVar      arg;
788
789         /*
790          * Handle NaN
791          */
792         if (NUMERIC_IS_NAN(num))
793                 PG_RETURN_NUMERIC(make_result(&const_nan));
794
795         /*
796          * Limit the scale value to avoid possible overflow in calculations
797          */
798         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
799         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
800
801         /*
802          * Unpack the argument and truncate it at the proper digit position
803          */
804         init_var(&arg);
805         set_var_from_num(num, &arg);
806
807         trunc_var(&arg, scale);
808
809         /* We don't allow negative output dscale */
810         if (scale < 0)
811                 arg.dscale = 0;
812
813         /*
814          * Return the truncated result
815          */
816         res = make_result(&arg);
817
818         free_var(&arg);
819         PG_RETURN_NUMERIC(res);
820 }
821
822
823 /*
824  * numeric_ceil() -
825  *
826  *      Return the smallest integer greater than or equal to the argument
827  */
828 Datum
829 numeric_ceil(PG_FUNCTION_ARGS)
830 {
831         Numeric         num = PG_GETARG_NUMERIC(0);
832         Numeric         res;
833         NumericVar      result;
834
835         if (NUMERIC_IS_NAN(num))
836                 PG_RETURN_NUMERIC(make_result(&const_nan));
837
838         init_var(&result);
839
840         set_var_from_num(num, &result);
841         ceil_var(&result, &result);
842
843         res = make_result(&result);
844         free_var(&result);
845
846         PG_RETURN_NUMERIC(res);
847 }
848
849
850 /*
851  * numeric_floor() -
852  *
853  *      Return the largest integer equal to or less than the argument
854  */
855 Datum
856 numeric_floor(PG_FUNCTION_ARGS)
857 {
858         Numeric         num = PG_GETARG_NUMERIC(0);
859         Numeric         res;
860         NumericVar      result;
861
862         if (NUMERIC_IS_NAN(num))
863                 PG_RETURN_NUMERIC(make_result(&const_nan));
864
865         init_var(&result);
866
867         set_var_from_num(num, &result);
868         floor_var(&result, &result);
869
870         res = make_result(&result);
871         free_var(&result);
872
873         PG_RETURN_NUMERIC(res);
874 }
875
876 /*
877  * Implements the numeric version of the width_bucket() function
878  * defined by SQL2003. See also width_bucket_float8().
879  *
880  * 'bound1' and 'bound2' are the lower and upper bounds of the
881  * histogram's range, respectively. 'count' is the number of buckets
882  * in the histogram. width_bucket() returns an integer indicating the
883  * bucket number that 'operand' belongs to in an equiwidth histogram
884  * with the specified characteristics. An operand smaller than the
885  * lower bound is assigned to bucket 0. An operand greater than the
886  * upper bound is assigned to an additional bucket (with number
887  * count+1). We don't allow "NaN" for any of the numeric arguments.
888  */
889 Datum
890 width_bucket_numeric(PG_FUNCTION_ARGS)
891 {
892         Numeric         operand = PG_GETARG_NUMERIC(0);
893         Numeric         bound1 = PG_GETARG_NUMERIC(1);
894         Numeric         bound2 = PG_GETARG_NUMERIC(2);
895         int32           count = PG_GETARG_INT32(3);
896         NumericVar      count_var;
897         NumericVar      result_var;
898         int32           result;
899
900         if (count <= 0)
901                 ereport(ERROR,
902                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
903                                  errmsg("count must be greater than zero")));
904
905         if (NUMERIC_IS_NAN(operand) ||
906                 NUMERIC_IS_NAN(bound1) ||
907                 NUMERIC_IS_NAN(bound2))
908                 ereport(ERROR,
909                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
910                                  errmsg("operand, lower bound and upper bound cannot be NaN")));
911
912         init_var(&result_var);
913         init_var(&count_var);
914
915         /* Convert 'count' to a numeric, for ease of use later */
916         int8_to_numericvar((int64) count, &count_var);
917
918         switch (cmp_numerics(bound1, bound2))
919         {
920                 case 0:
921                         ereport(ERROR,
922                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
923                                  errmsg("lower bound cannot equal upper bound")));
924
925                         /* bound1 < bound2 */
926                 case -1:
927                         if (cmp_numerics(operand, bound1) < 0)
928                                 set_var_from_var(&const_zero, &result_var);
929                         else if (cmp_numerics(operand, bound2) >= 0)
930                                 add_var(&count_var, &const_one, &result_var);
931                         else
932                                 compute_bucket(operand, bound1, bound2,
933                                                            &count_var, &result_var);
934                         break;
935
936                         /* bound1 > bound2 */
937                 case 1:
938                         if (cmp_numerics(operand, bound1) > 0)
939                                 set_var_from_var(&const_zero, &result_var);
940                         else if (cmp_numerics(operand, bound2) <= 0)
941                                 add_var(&count_var, &const_one, &result_var);
942                         else
943                                 compute_bucket(operand, bound1, bound2,
944                                                            &count_var, &result_var);
945                         break;
946         }
947
948         /* if result exceeds the range of a legal int4, we ereport here */
949         result = numericvar_to_int4(&result_var);
950
951         free_var(&count_var);
952         free_var(&result_var);
953
954         PG_RETURN_INT32(result);
955 }
956
957 /*
958  * If 'operand' is not outside the bucket range, determine the correct
959  * bucket for it to go. The calculations performed by this function
960  * are derived directly from the SQL2003 spec.
961  */
962 static void
963 compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
964                            NumericVar *count_var, NumericVar *result_var)
965 {
966         NumericVar      bound1_var;
967         NumericVar      bound2_var;
968         NumericVar      operand_var;
969
970         init_var(&bound1_var);
971         init_var(&bound2_var);
972         init_var(&operand_var);
973
974         set_var_from_num(bound1, &bound1_var);
975         set_var_from_num(bound2, &bound2_var);
976         set_var_from_num(operand, &operand_var);
977
978         if (cmp_var(&bound1_var, &bound2_var) < 0)
979         {
980                 sub_var(&operand_var, &bound1_var, &operand_var);
981                 sub_var(&bound2_var, &bound1_var, &bound2_var);
982                 div_var(&operand_var, &bound2_var, result_var,
983                                 select_div_scale(&operand_var, &bound2_var), true);
984         }
985         else
986         {
987                 sub_var(&bound1_var, &operand_var, &operand_var);
988                 sub_var(&bound1_var, &bound2_var, &bound1_var);
989                 div_var(&operand_var, &bound1_var, result_var,
990                                 select_div_scale(&operand_var, &bound1_var), true);
991         }
992
993         mul_var(result_var, count_var, result_var,
994                         result_var->dscale + count_var->dscale);
995         add_var(result_var, &const_one, result_var);
996         floor_var(result_var, result_var);
997
998         free_var(&bound1_var);
999         free_var(&bound2_var);
1000         free_var(&operand_var);
1001 }
1002
1003 /* ----------------------------------------------------------------------
1004  *
1005  * Comparison functions
1006  *
1007  * Note: btree indexes need these routines not to leak memory; therefore,
1008  * be careful to free working copies of toasted datums.  Most places don't
1009  * need to be so careful.
1010  * ----------------------------------------------------------------------
1011  */
1012
1013
1014 Datum
1015 numeric_cmp(PG_FUNCTION_ARGS)
1016 {
1017         Numeric         num1 = PG_GETARG_NUMERIC(0);
1018         Numeric         num2 = PG_GETARG_NUMERIC(1);
1019         int                     result;
1020
1021         result = cmp_numerics(num1, num2);
1022
1023         PG_FREE_IF_COPY(num1, 0);
1024         PG_FREE_IF_COPY(num2, 1);
1025
1026         PG_RETURN_INT32(result);
1027 }
1028
1029
1030 Datum
1031 numeric_eq(PG_FUNCTION_ARGS)
1032 {
1033         Numeric         num1 = PG_GETARG_NUMERIC(0);
1034         Numeric         num2 = PG_GETARG_NUMERIC(1);
1035         bool            result;
1036
1037         result = cmp_numerics(num1, num2) == 0;
1038
1039         PG_FREE_IF_COPY(num1, 0);
1040         PG_FREE_IF_COPY(num2, 1);
1041
1042         PG_RETURN_BOOL(result);
1043 }
1044
1045 Datum
1046 numeric_ne(PG_FUNCTION_ARGS)
1047 {
1048         Numeric         num1 = PG_GETARG_NUMERIC(0);
1049         Numeric         num2 = PG_GETARG_NUMERIC(1);
1050         bool            result;
1051
1052         result = cmp_numerics(num1, num2) != 0;
1053
1054         PG_FREE_IF_COPY(num1, 0);
1055         PG_FREE_IF_COPY(num2, 1);
1056
1057         PG_RETURN_BOOL(result);
1058 }
1059
1060 Datum
1061 numeric_gt(PG_FUNCTION_ARGS)
1062 {
1063         Numeric         num1 = PG_GETARG_NUMERIC(0);
1064         Numeric         num2 = PG_GETARG_NUMERIC(1);
1065         bool            result;
1066
1067         result = cmp_numerics(num1, num2) > 0;
1068
1069         PG_FREE_IF_COPY(num1, 0);
1070         PG_FREE_IF_COPY(num2, 1);
1071
1072         PG_RETURN_BOOL(result);
1073 }
1074
1075 Datum
1076 numeric_ge(PG_FUNCTION_ARGS)
1077 {
1078         Numeric         num1 = PG_GETARG_NUMERIC(0);
1079         Numeric         num2 = PG_GETARG_NUMERIC(1);
1080         bool            result;
1081
1082         result = cmp_numerics(num1, num2) >= 0;
1083
1084         PG_FREE_IF_COPY(num1, 0);
1085         PG_FREE_IF_COPY(num2, 1);
1086
1087         PG_RETURN_BOOL(result);
1088 }
1089
1090 Datum
1091 numeric_lt(PG_FUNCTION_ARGS)
1092 {
1093         Numeric         num1 = PG_GETARG_NUMERIC(0);
1094         Numeric         num2 = PG_GETARG_NUMERIC(1);
1095         bool            result;
1096
1097         result = cmp_numerics(num1, num2) < 0;
1098
1099         PG_FREE_IF_COPY(num1, 0);
1100         PG_FREE_IF_COPY(num2, 1);
1101
1102         PG_RETURN_BOOL(result);
1103 }
1104
1105 Datum
1106 numeric_le(PG_FUNCTION_ARGS)
1107 {
1108         Numeric         num1 = PG_GETARG_NUMERIC(0);
1109         Numeric         num2 = PG_GETARG_NUMERIC(1);
1110         bool            result;
1111
1112         result = cmp_numerics(num1, num2) <= 0;
1113
1114         PG_FREE_IF_COPY(num1, 0);
1115         PG_FREE_IF_COPY(num2, 1);
1116
1117         PG_RETURN_BOOL(result);
1118 }
1119
1120 static int
1121 cmp_numerics(Numeric num1, Numeric num2)
1122 {
1123         int                     result;
1124
1125         /*
1126          * We consider all NANs to be equal and larger than any non-NAN. This is
1127          * somewhat arbitrary; the important thing is to have a consistent sort
1128          * order.
1129          */
1130         if (NUMERIC_IS_NAN(num1))
1131         {
1132                 if (NUMERIC_IS_NAN(num2))
1133                         result = 0;                     /* NAN = NAN */
1134                 else
1135                         result = 1;                     /* NAN > non-NAN */
1136         }
1137         else if (NUMERIC_IS_NAN(num2))
1138         {
1139                 result = -1;                    /* non-NAN < NAN */
1140         }
1141         else
1142         {
1143                 result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
1144                                                                 num1->n_weight, NUMERIC_SIGN(num1),
1145                                                                 NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
1146                                                                 num2->n_weight, NUMERIC_SIGN(num2));
1147         }
1148
1149         return result;
1150 }
1151
1152
1153 /* ----------------------------------------------------------------------
1154  *
1155  * Basic arithmetic functions
1156  *
1157  * ----------------------------------------------------------------------
1158  */
1159
1160
1161 /*
1162  * numeric_add() -
1163  *
1164  *      Add two numerics
1165  */
1166 Datum
1167 numeric_add(PG_FUNCTION_ARGS)
1168 {
1169         Numeric         num1 = PG_GETARG_NUMERIC(0);
1170         Numeric         num2 = PG_GETARG_NUMERIC(1);
1171         NumericVar      arg1;
1172         NumericVar      arg2;
1173         NumericVar      result;
1174         Numeric         res;
1175
1176         /*
1177          * Handle NaN
1178          */
1179         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1180                 PG_RETURN_NUMERIC(make_result(&const_nan));
1181
1182         /*
1183          * Unpack the values, let add_var() compute the result and return it.
1184          */
1185         init_var(&arg1);
1186         init_var(&arg2);
1187         init_var(&result);
1188
1189         set_var_from_num(num1, &arg1);
1190         set_var_from_num(num2, &arg2);
1191
1192         add_var(&arg1, &arg2, &result);
1193
1194         res = make_result(&result);
1195
1196         free_var(&arg1);
1197         free_var(&arg2);
1198         free_var(&result);
1199
1200         PG_RETURN_NUMERIC(res);
1201 }
1202
1203
1204 /*
1205  * numeric_sub() -
1206  *
1207  *      Subtract one numeric from another
1208  */
1209 Datum
1210 numeric_sub(PG_FUNCTION_ARGS)
1211 {
1212         Numeric         num1 = PG_GETARG_NUMERIC(0);
1213         Numeric         num2 = PG_GETARG_NUMERIC(1);
1214         NumericVar      arg1;
1215         NumericVar      arg2;
1216         NumericVar      result;
1217         Numeric         res;
1218
1219         /*
1220          * Handle NaN
1221          */
1222         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1223                 PG_RETURN_NUMERIC(make_result(&const_nan));
1224
1225         /*
1226          * Unpack the values, let sub_var() compute the result and return it.
1227          */
1228         init_var(&arg1);
1229         init_var(&arg2);
1230         init_var(&result);
1231
1232         set_var_from_num(num1, &arg1);
1233         set_var_from_num(num2, &arg2);
1234
1235         sub_var(&arg1, &arg2, &result);
1236
1237         res = make_result(&result);
1238
1239         free_var(&arg1);
1240         free_var(&arg2);
1241         free_var(&result);
1242
1243         PG_RETURN_NUMERIC(res);
1244 }
1245
1246
1247 /*
1248  * numeric_mul() -
1249  *
1250  *      Calculate the product of two numerics
1251  */
1252 Datum
1253 numeric_mul(PG_FUNCTION_ARGS)
1254 {
1255         Numeric         num1 = PG_GETARG_NUMERIC(0);
1256         Numeric         num2 = PG_GETARG_NUMERIC(1);
1257         NumericVar      arg1;
1258         NumericVar      arg2;
1259         NumericVar      result;
1260         Numeric         res;
1261
1262         /*
1263          * Handle NaN
1264          */
1265         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1266                 PG_RETURN_NUMERIC(make_result(&const_nan));
1267
1268         /*
1269          * Unpack the values, let mul_var() compute the result and return it.
1270          * Unlike add_var() and sub_var(), mul_var() will round its result. In the
1271          * case of numeric_mul(), which is invoked for the * operator on numerics,
1272          * we request exact representation for the product (rscale = sum(dscale of
1273          * arg1, dscale of arg2)).
1274          */
1275         init_var(&arg1);
1276         init_var(&arg2);
1277         init_var(&result);
1278
1279         set_var_from_num(num1, &arg1);
1280         set_var_from_num(num2, &arg2);
1281
1282         mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
1283
1284         res = make_result(&result);
1285
1286         free_var(&arg1);
1287         free_var(&arg2);
1288         free_var(&result);
1289
1290         PG_RETURN_NUMERIC(res);
1291 }
1292
1293
1294 /*
1295  * numeric_div() -
1296  *
1297  *      Divide one numeric into another
1298  */
1299 Datum
1300 numeric_div(PG_FUNCTION_ARGS)
1301 {
1302         Numeric         num1 = PG_GETARG_NUMERIC(0);
1303         Numeric         num2 = PG_GETARG_NUMERIC(1);
1304         NumericVar      arg1;
1305         NumericVar      arg2;
1306         NumericVar      result;
1307         Numeric         res;
1308         int                     rscale;
1309
1310         /*
1311          * Handle NaN
1312          */
1313         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1314                 PG_RETURN_NUMERIC(make_result(&const_nan));
1315
1316         /*
1317          * Unpack the arguments
1318          */
1319         init_var(&arg1);
1320         init_var(&arg2);
1321         init_var(&result);
1322
1323         set_var_from_num(num1, &arg1);
1324         set_var_from_num(num2, &arg2);
1325
1326         /*
1327          * Select scale for division result
1328          */
1329         rscale = select_div_scale(&arg1, &arg2);
1330
1331         /*
1332          * Do the divide and return the result
1333          */
1334         div_var(&arg1, &arg2, &result, rscale, true);
1335
1336         res = make_result(&result);
1337
1338         free_var(&arg1);
1339         free_var(&arg2);
1340         free_var(&result);
1341
1342         PG_RETURN_NUMERIC(res);
1343 }
1344
1345
1346 /*
1347  * numeric_mod() -
1348  *
1349  *      Calculate the modulo of two numerics
1350  */
1351 Datum
1352 numeric_mod(PG_FUNCTION_ARGS)
1353 {
1354         Numeric         num1 = PG_GETARG_NUMERIC(0);
1355         Numeric         num2 = PG_GETARG_NUMERIC(1);
1356         Numeric         res;
1357         NumericVar      arg1;
1358         NumericVar      arg2;
1359         NumericVar      result;
1360
1361         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1362                 PG_RETURN_NUMERIC(make_result(&const_nan));
1363
1364         init_var(&arg1);
1365         init_var(&arg2);
1366         init_var(&result);
1367
1368         set_var_from_num(num1, &arg1);
1369         set_var_from_num(num2, &arg2);
1370
1371         mod_var(&arg1, &arg2, &result);
1372
1373         res = make_result(&result);
1374
1375         free_var(&result);
1376         free_var(&arg2);
1377         free_var(&arg1);
1378
1379         PG_RETURN_NUMERIC(res);
1380 }
1381
1382
1383 /*
1384  * numeric_inc() -
1385  *
1386  *      Increment a number by one
1387  */
1388 Datum
1389 numeric_inc(PG_FUNCTION_ARGS)
1390 {
1391         Numeric         num = PG_GETARG_NUMERIC(0);
1392         NumericVar      arg;
1393         Numeric         res;
1394
1395         /*
1396          * Handle NaN
1397          */
1398         if (NUMERIC_IS_NAN(num))
1399                 PG_RETURN_NUMERIC(make_result(&const_nan));
1400
1401         /*
1402          * Compute the result and return it
1403          */
1404         init_var(&arg);
1405
1406         set_var_from_num(num, &arg);
1407
1408         add_var(&arg, &const_one, &arg);
1409
1410         res = make_result(&arg);
1411
1412         free_var(&arg);
1413
1414         PG_RETURN_NUMERIC(res);
1415 }
1416
1417
1418 /*
1419  * numeric_smaller() -
1420  *
1421  *      Return the smaller of two numbers
1422  */
1423 Datum
1424 numeric_smaller(PG_FUNCTION_ARGS)
1425 {
1426         Numeric         num1 = PG_GETARG_NUMERIC(0);
1427         Numeric         num2 = PG_GETARG_NUMERIC(1);
1428
1429         /*
1430          * Use cmp_numerics so that this will agree with the comparison operators,
1431          * particularly as regards comparisons involving NaN.
1432          */
1433         if (cmp_numerics(num1, num2) < 0)
1434                 PG_RETURN_NUMERIC(num1);
1435         else
1436                 PG_RETURN_NUMERIC(num2);
1437 }
1438
1439
1440 /*
1441  * numeric_larger() -
1442  *
1443  *      Return the larger of two numbers
1444  */
1445 Datum
1446 numeric_larger(PG_FUNCTION_ARGS)
1447 {
1448         Numeric         num1 = PG_GETARG_NUMERIC(0);
1449         Numeric         num2 = PG_GETARG_NUMERIC(1);
1450
1451         /*
1452          * Use cmp_numerics so that this will agree with the comparison operators,
1453          * particularly as regards comparisons involving NaN.
1454          */
1455         if (cmp_numerics(num1, num2) > 0)
1456                 PG_RETURN_NUMERIC(num1);
1457         else
1458                 PG_RETURN_NUMERIC(num2);
1459 }
1460
1461
1462 /* ----------------------------------------------------------------------
1463  *
1464  * Advanced math functions
1465  *
1466  * ----------------------------------------------------------------------
1467  */
1468
1469 /*
1470  * numeric_fac()
1471  *
1472  * Compute factorial
1473  */
1474 Datum
1475 numeric_fac(PG_FUNCTION_ARGS)
1476 {
1477         int64           num = PG_GETARG_INT64(0);
1478         Numeric         res;
1479         NumericVar      fact;
1480         NumericVar      result;
1481
1482         if (num <= 1)
1483         {
1484                 res = make_result(&const_one);
1485                 PG_RETURN_NUMERIC(res);
1486         }
1487
1488         init_var(&fact);
1489         init_var(&result);
1490
1491         int8_to_numericvar(num, &result);
1492
1493         for (num = num - 1; num > 1; num--)
1494         {
1495                 int8_to_numericvar(num, &fact);
1496
1497                 mul_var(&result, &fact, &result, 0);
1498         }
1499
1500         res = make_result(&result);
1501
1502         free_var(&fact);
1503         free_var(&result);
1504
1505         PG_RETURN_NUMERIC(res);
1506 }
1507
1508
1509 /*
1510  * numeric_sqrt() -
1511  *
1512  *      Compute the square root of a numeric.
1513  */
1514 Datum
1515 numeric_sqrt(PG_FUNCTION_ARGS)
1516 {
1517         Numeric         num = PG_GETARG_NUMERIC(0);
1518         Numeric         res;
1519         NumericVar      arg;
1520         NumericVar      result;
1521         int                     sweight;
1522         int                     rscale;
1523
1524         /*
1525          * Handle NaN
1526          */
1527         if (NUMERIC_IS_NAN(num))
1528                 PG_RETURN_NUMERIC(make_result(&const_nan));
1529
1530         /*
1531          * Unpack the argument and determine the result scale.  We choose a scale
1532          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1533          * case not less than the input's dscale.
1534          */
1535         init_var(&arg);
1536         init_var(&result);
1537
1538         set_var_from_num(num, &arg);
1539
1540         /* Assume the input was normalized, so arg.weight is accurate */
1541         sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
1542
1543         rscale = NUMERIC_MIN_SIG_DIGITS - sweight;
1544         rscale = Max(rscale, arg.dscale);
1545         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1546         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1547
1548         /*
1549          * Let sqrt_var() do the calculation and return the result.
1550          */
1551         sqrt_var(&arg, &result, rscale);
1552
1553         res = make_result(&result);
1554
1555         free_var(&result);
1556         free_var(&arg);
1557
1558         PG_RETURN_NUMERIC(res);
1559 }
1560
1561
1562 /*
1563  * numeric_exp() -
1564  *
1565  *      Raise e to the power of x
1566  */
1567 Datum
1568 numeric_exp(PG_FUNCTION_ARGS)
1569 {
1570         Numeric         num = PG_GETARG_NUMERIC(0);
1571         Numeric         res;
1572         NumericVar      arg;
1573         NumericVar      result;
1574         int                     rscale;
1575         double          val;
1576
1577         /*
1578          * Handle NaN
1579          */
1580         if (NUMERIC_IS_NAN(num))
1581                 PG_RETURN_NUMERIC(make_result(&const_nan));
1582
1583         /*
1584          * Unpack the argument and determine the result scale.  We choose a scale
1585          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1586          * case not less than the input's dscale.
1587          */
1588         init_var(&arg);
1589         init_var(&result);
1590
1591         set_var_from_num(num, &arg);
1592
1593         /* convert input to float8, ignoring overflow */
1594         val = numericvar_to_double_no_overflow(&arg);
1595
1596         /*
1597          * log10(result) = num * log10(e), so this is approximately the decimal
1598          * weight of the result:
1599          */
1600         val *= 0.434294481903252;
1601
1602         /* limit to something that won't cause integer overflow */
1603         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
1604         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
1605
1606         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
1607         rscale = Max(rscale, arg.dscale);
1608         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1609         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1610
1611         /*
1612          * Let exp_var() do the calculation and return the result.
1613          */
1614         exp_var(&arg, &result, rscale);
1615
1616         res = make_result(&result);
1617
1618         free_var(&result);
1619         free_var(&arg);
1620
1621         PG_RETURN_NUMERIC(res);
1622 }
1623
1624
1625 /*
1626  * numeric_ln() -
1627  *
1628  *      Compute the natural logarithm of x
1629  */
1630 Datum
1631 numeric_ln(PG_FUNCTION_ARGS)
1632 {
1633         Numeric         num = PG_GETARG_NUMERIC(0);
1634         Numeric         res;
1635         NumericVar      arg;
1636         NumericVar      result;
1637         int                     dec_digits;
1638         int                     rscale;
1639
1640         /*
1641          * Handle NaN
1642          */
1643         if (NUMERIC_IS_NAN(num))
1644                 PG_RETURN_NUMERIC(make_result(&const_nan));
1645
1646         init_var(&arg);
1647         init_var(&result);
1648
1649         set_var_from_num(num, &arg);
1650
1651         /* Approx decimal digits before decimal point */
1652         dec_digits = (arg.weight + 1) * DEC_DIGITS;
1653
1654         if (dec_digits > 1)
1655                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
1656         else if (dec_digits < 1)
1657                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
1658         else
1659                 rscale = NUMERIC_MIN_SIG_DIGITS;
1660
1661         rscale = Max(rscale, arg.dscale);
1662         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1663         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1664
1665         ln_var(&arg, &result, rscale);
1666
1667         res = make_result(&result);
1668
1669         free_var(&result);
1670         free_var(&arg);
1671
1672         PG_RETURN_NUMERIC(res);
1673 }
1674
1675
1676 /*
1677  * numeric_log() -
1678  *
1679  *      Compute the logarithm of x in a given base
1680  */
1681 Datum
1682 numeric_log(PG_FUNCTION_ARGS)
1683 {
1684         Numeric         num1 = PG_GETARG_NUMERIC(0);
1685         Numeric         num2 = PG_GETARG_NUMERIC(1);
1686         Numeric         res;
1687         NumericVar      arg1;
1688         NumericVar      arg2;
1689         NumericVar      result;
1690
1691         /*
1692          * Handle NaN
1693          */
1694         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1695                 PG_RETURN_NUMERIC(make_result(&const_nan));
1696
1697         /*
1698          * Initialize things
1699          */
1700         init_var(&arg1);
1701         init_var(&arg2);
1702         init_var(&result);
1703
1704         set_var_from_num(num1, &arg1);
1705         set_var_from_num(num2, &arg2);
1706
1707         /*
1708          * Call log_var() to compute and return the result; note it handles scale
1709          * selection itself.
1710          */
1711         log_var(&arg1, &arg2, &result);
1712
1713         res = make_result(&result);
1714
1715         free_var(&result);
1716         free_var(&arg2);
1717         free_var(&arg1);
1718
1719         PG_RETURN_NUMERIC(res);
1720 }
1721
1722
1723 /*
1724  * numeric_power() -
1725  *
1726  *      Raise b to the power of x
1727  */
1728 Datum
1729 numeric_power(PG_FUNCTION_ARGS)
1730 {
1731         Numeric         num1 = PG_GETARG_NUMERIC(0);
1732         Numeric         num2 = PG_GETARG_NUMERIC(1);
1733         Numeric         res;
1734         NumericVar      arg1;
1735         NumericVar      arg2;
1736         NumericVar      arg2_trunc;
1737         NumericVar      result;
1738
1739         /*
1740          * Handle NaN
1741          */
1742         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1743                 PG_RETURN_NUMERIC(make_result(&const_nan));
1744
1745         /*
1746          * Initialize things
1747          */
1748         init_var(&arg1);
1749         init_var(&arg2);
1750         init_var(&arg2_trunc);
1751         init_var(&result);
1752
1753         set_var_from_num(num1, &arg1);
1754         set_var_from_num(num2, &arg2);
1755         set_var_from_var(&arg2, &arg2_trunc);
1756
1757         trunc_var(&arg2_trunc, 0);
1758
1759         /*
1760          * Return special SQLSTATE error codes for a few conditions mandated by
1761          * the standard.
1762          */
1763         if ((cmp_var(&arg1, &const_zero) == 0 &&
1764                  cmp_var(&arg2, &const_zero) < 0) ||
1765                 (cmp_var(&arg1, &const_zero) < 0 &&
1766                  cmp_var(&arg2, &arg2_trunc) != 0))
1767                 ereport(ERROR,
1768                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
1769                                  errmsg("invalid argument for power function")));
1770
1771         /*
1772          * Call power_var() to compute and return the result; note it handles
1773          * scale selection itself.
1774          */
1775         power_var(&arg1, &arg2, &result);
1776
1777         res = make_result(&result);
1778
1779         free_var(&result);
1780         free_var(&arg2);
1781         free_var(&arg2_trunc);
1782         free_var(&arg1);
1783
1784         PG_RETURN_NUMERIC(res);
1785 }
1786
1787
1788 /* ----------------------------------------------------------------------
1789  *
1790  * Type conversion functions
1791  *
1792  * ----------------------------------------------------------------------
1793  */
1794
1795
1796 Datum
1797 int4_numeric(PG_FUNCTION_ARGS)
1798 {
1799         int32           val = PG_GETARG_INT32(0);
1800         Numeric         res;
1801         NumericVar      result;
1802
1803         init_var(&result);
1804
1805         int8_to_numericvar((int64) val, &result);
1806
1807         res = make_result(&result);
1808
1809         free_var(&result);
1810
1811         PG_RETURN_NUMERIC(res);
1812 }
1813
1814
1815 Datum
1816 numeric_int4(PG_FUNCTION_ARGS)
1817 {
1818         Numeric         num = PG_GETARG_NUMERIC(0);
1819         NumericVar      x;
1820         int32           result;
1821
1822         /* XXX would it be better to return NULL? */
1823         if (NUMERIC_IS_NAN(num))
1824                 ereport(ERROR,
1825                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1826                                  errmsg("cannot convert NaN to integer")));
1827
1828         /* Convert to variable format, then convert to int4 */
1829         init_var(&x);
1830         set_var_from_num(num, &x);
1831         result = numericvar_to_int4(&x);
1832         free_var(&x);
1833         PG_RETURN_INT32(result);
1834 }
1835
1836 /*
1837  * Given a NumericVar, convert it to an int32. If the NumericVar
1838  * exceeds the range of an int32, raise the appropriate error via
1839  * ereport(). The input NumericVar is *not* free'd.
1840  */
1841 static int32
1842 numericvar_to_int4(NumericVar *var)
1843 {
1844         int32           result;
1845         int64           val;
1846
1847         if (!numericvar_to_int8(var, &val))
1848                 ereport(ERROR,
1849                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1850                                  errmsg("integer out of range")));
1851
1852         /* Down-convert to int4 */
1853         result = (int32) val;
1854
1855         /* Test for overflow by reverse-conversion. */
1856         if ((int64) result != val)
1857                 ereport(ERROR,
1858                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1859                                  errmsg("integer out of range")));
1860
1861         return result;
1862 }
1863
1864 Datum
1865 int8_numeric(PG_FUNCTION_ARGS)
1866 {
1867         int64           val = PG_GETARG_INT64(0);
1868         Numeric         res;
1869         NumericVar      result;
1870
1871         init_var(&result);
1872
1873         int8_to_numericvar(val, &result);
1874
1875         res = make_result(&result);
1876
1877         free_var(&result);
1878
1879         PG_RETURN_NUMERIC(res);
1880 }
1881
1882
1883 Datum
1884 numeric_int8(PG_FUNCTION_ARGS)
1885 {
1886         Numeric         num = PG_GETARG_NUMERIC(0);
1887         NumericVar      x;
1888         int64           result;
1889
1890         /* XXX would it be better to return NULL? */
1891         if (NUMERIC_IS_NAN(num))
1892                 ereport(ERROR,
1893                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1894                                  errmsg("cannot convert NaN to bigint")));
1895
1896         /* Convert to variable format and thence to int8 */
1897         init_var(&x);
1898         set_var_from_num(num, &x);
1899
1900         if (!numericvar_to_int8(&x, &result))
1901                 ereport(ERROR,
1902                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1903                                  errmsg("bigint out of range")));
1904
1905         free_var(&x);
1906
1907         PG_RETURN_INT64(result);
1908 }
1909
1910
1911 Datum
1912 int2_numeric(PG_FUNCTION_ARGS)
1913 {
1914         int16           val = PG_GETARG_INT16(0);
1915         Numeric         res;
1916         NumericVar      result;
1917
1918         init_var(&result);
1919
1920         int8_to_numericvar((int64) val, &result);
1921
1922         res = make_result(&result);
1923
1924         free_var(&result);
1925
1926         PG_RETURN_NUMERIC(res);
1927 }
1928
1929
1930 Datum
1931 numeric_int2(PG_FUNCTION_ARGS)
1932 {
1933         Numeric         num = PG_GETARG_NUMERIC(0);
1934         NumericVar      x;
1935         int64           val;
1936         int16           result;
1937
1938         /* XXX would it be better to return NULL? */
1939         if (NUMERIC_IS_NAN(num))
1940                 ereport(ERROR,
1941                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1942                                  errmsg("cannot convert NaN to smallint")));
1943
1944         /* Convert to variable format and thence to int8 */
1945         init_var(&x);
1946         set_var_from_num(num, &x);
1947
1948         if (!numericvar_to_int8(&x, &val))
1949                 ereport(ERROR,
1950                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1951                                  errmsg("smallint out of range")));
1952
1953         free_var(&x);
1954
1955         /* Down-convert to int2 */
1956         result = (int16) val;
1957
1958         /* Test for overflow by reverse-conversion. */
1959         if ((int64) result != val)
1960                 ereport(ERROR,
1961                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1962                                  errmsg("smallint out of range")));
1963
1964         PG_RETURN_INT16(result);
1965 }
1966
1967
1968 Datum
1969 float8_numeric(PG_FUNCTION_ARGS)
1970 {
1971         float8          val = PG_GETARG_FLOAT8(0);
1972         Numeric         res;
1973         NumericVar      result;
1974         char            buf[DBL_DIG + 100];
1975
1976         if (isnan(val))
1977                 PG_RETURN_NUMERIC(make_result(&const_nan));
1978
1979         sprintf(buf, "%.*g", DBL_DIG, val);
1980
1981         init_var(&result);
1982
1983         set_var_from_str(buf, &result);
1984         res = make_result(&result);
1985
1986         free_var(&result);
1987
1988         PG_RETURN_NUMERIC(res);
1989 }
1990
1991
1992 Datum
1993 numeric_float8(PG_FUNCTION_ARGS)
1994 {
1995         Numeric         num = PG_GETARG_NUMERIC(0);
1996         char       *tmp;
1997         Datum           result;
1998
1999         if (NUMERIC_IS_NAN(num))
2000                 PG_RETURN_FLOAT8(get_float8_nan());
2001
2002         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2003                                                                                           NumericGetDatum(num)));
2004
2005         result = DirectFunctionCall1(float8in, CStringGetDatum(tmp));
2006
2007         pfree(tmp);
2008
2009         PG_RETURN_DATUM(result);
2010 }
2011
2012
2013 /* Convert numeric to float8; if out of range, return +/- HUGE_VAL */
2014 Datum
2015 numeric_float8_no_overflow(PG_FUNCTION_ARGS)
2016 {
2017         Numeric         num = PG_GETARG_NUMERIC(0);
2018         double          val;
2019
2020         if (NUMERIC_IS_NAN(num))
2021                 PG_RETURN_FLOAT8(get_float8_nan());
2022
2023         val = numeric_to_double_no_overflow(num);
2024
2025         PG_RETURN_FLOAT8(val);
2026 }
2027
2028 Datum
2029 float4_numeric(PG_FUNCTION_ARGS)
2030 {
2031         float4          val = PG_GETARG_FLOAT4(0);
2032         Numeric         res;
2033         NumericVar      result;
2034         char            buf[FLT_DIG + 100];
2035
2036         if (isnan(val))
2037                 PG_RETURN_NUMERIC(make_result(&const_nan));
2038
2039         sprintf(buf, "%.*g", FLT_DIG, val);
2040
2041         init_var(&result);
2042
2043         set_var_from_str(buf, &result);
2044         res = make_result(&result);
2045
2046         free_var(&result);
2047
2048         PG_RETURN_NUMERIC(res);
2049 }
2050
2051
2052 Datum
2053 numeric_float4(PG_FUNCTION_ARGS)
2054 {
2055         Numeric         num = PG_GETARG_NUMERIC(0);
2056         char       *tmp;
2057         Datum           result;
2058
2059         if (NUMERIC_IS_NAN(num))
2060                 PG_RETURN_FLOAT4(get_float4_nan());
2061
2062         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2063                                                                                           NumericGetDatum(num)));
2064
2065         result = DirectFunctionCall1(float4in, CStringGetDatum(tmp));
2066
2067         pfree(tmp);
2068
2069         PG_RETURN_DATUM(result);
2070 }
2071
2072
2073 Datum
2074 text_numeric(PG_FUNCTION_ARGS)
2075 {
2076         text       *str = PG_GETARG_TEXT_P(0);
2077         int                     len;
2078         char       *s;
2079         Datum           result;
2080
2081         len = (VARSIZE(str) - VARHDRSZ);
2082         s = palloc(len + 1);
2083         memcpy(s, VARDATA(str), len);
2084         *(s + len) = '\0';
2085
2086         result = DirectFunctionCall3(numeric_in, CStringGetDatum(s),
2087                                                                  ObjectIdGetDatum(0), Int32GetDatum(-1));
2088
2089         pfree(s);
2090
2091         return result;
2092 }
2093
2094 Datum
2095 numeric_text(PG_FUNCTION_ARGS)
2096 {
2097         /* val is numeric, but easier to leave it as Datum */
2098         Datum           val = PG_GETARG_DATUM(0);
2099         char       *s;
2100         int                     len;
2101         text       *result;
2102
2103         s = DatumGetCString(DirectFunctionCall1(numeric_out, val));
2104         len = strlen(s);
2105
2106         result = (text *) palloc(VARHDRSZ + len);
2107
2108         SET_VARSIZE(result, VARHDRSZ + len);
2109         memcpy(VARDATA(result), s, len);
2110
2111         pfree(s);
2112
2113         PG_RETURN_TEXT_P(result);
2114 }
2115
2116
2117 /* ----------------------------------------------------------------------
2118  *
2119  * Aggregate functions
2120  *
2121  * The transition datatype for all these aggregates is a 3-element array
2122  * of Numeric, holding the values N, sum(X), sum(X*X) in that order.
2123  *
2124  * We represent N as a numeric mainly to avoid having to build a special
2125  * datatype; it's unlikely it'd overflow an int4, but ...
2126  *
2127  * ----------------------------------------------------------------------
2128  */
2129
2130 static ArrayType *
2131 do_numeric_accum(ArrayType *transarray, Numeric newval)
2132 {
2133         Datum      *transdatums;
2134         int                     ndatums;
2135         Datum           N,
2136                                 sumX,
2137                                 sumX2;
2138         ArrayType  *result;
2139
2140         /* We assume the input is array of numeric */
2141         deconstruct_array(transarray,
2142                                           NUMERICOID, -1, false, 'i',
2143                                           &transdatums, NULL, &ndatums);
2144         if (ndatums != 3)
2145                 elog(ERROR, "expected 3-element numeric array");
2146         N = transdatums[0];
2147         sumX = transdatums[1];
2148         sumX2 = transdatums[2];
2149
2150         N = DirectFunctionCall1(numeric_inc, N);
2151         sumX = DirectFunctionCall2(numeric_add, sumX,
2152                                                            NumericGetDatum(newval));
2153         sumX2 = DirectFunctionCall2(numeric_add, sumX2,
2154                                                                 DirectFunctionCall2(numeric_mul,
2155                                                                                                         NumericGetDatum(newval),
2156                                                                                                         NumericGetDatum(newval)));
2157
2158         transdatums[0] = N;
2159         transdatums[1] = sumX;
2160         transdatums[2] = sumX2;
2161
2162         result = construct_array(transdatums, 3,
2163                                                          NUMERICOID, -1, false, 'i');
2164
2165         return result;
2166 }
2167
2168 /*
2169  * Improve avg performance by not caclulating sum(X*X).
2170  */
2171 static ArrayType *
2172 do_numeric_avg_accum(ArrayType *transarray, Numeric newval)
2173 {
2174         Datum      *transdatums;
2175         int                     ndatums;
2176         Datum           N,
2177                                 sumX;
2178         ArrayType  *result;
2179
2180         /* We assume the input is array of numeric */
2181         deconstruct_array(transarray,
2182                                           NUMERICOID, -1, false, 'i',
2183                                           &transdatums, NULL, &ndatums);
2184         if (ndatums != 2)
2185                 elog(ERROR, "expected 2-element numeric array");
2186         N = transdatums[0];
2187         sumX = transdatums[1];
2188
2189         N = DirectFunctionCall1(numeric_inc, N);
2190         sumX = DirectFunctionCall2(numeric_add, sumX,
2191                                                            NumericGetDatum(newval));
2192
2193         transdatums[0] = N;
2194         transdatums[1] = sumX;
2195
2196         result = construct_array(transdatums, 2,
2197                                                          NUMERICOID, -1, false, 'i');
2198
2199         return result;
2200 }
2201
2202 Datum
2203 numeric_accum(PG_FUNCTION_ARGS)
2204 {
2205         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2206         Numeric         newval = PG_GETARG_NUMERIC(1);
2207
2208         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2209 }
2210
2211 /*
2212  * Optimized case for average of numeric.
2213  */
2214 Datum
2215 numeric_avg_accum(PG_FUNCTION_ARGS)
2216 {
2217         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2218         Numeric         newval = PG_GETARG_NUMERIC(1);
2219
2220         PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
2221 }
2222
2223 /*
2224  * Integer data types all use Numeric accumulators to share code and
2225  * avoid risk of overflow.      For int2 and int4 inputs, Numeric accumulation
2226  * is overkill for the N and sum(X) values, but definitely not overkill
2227  * for the sum(X*X) value.      Hence, we use int2_accum and int4_accum only
2228  * for stddev/variance --- there are faster special-purpose accumulator
2229  * routines for SUM and AVG of these datatypes.
2230  */
2231
2232 Datum
2233 int2_accum(PG_FUNCTION_ARGS)
2234 {
2235         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2236         Datum           newval2 = PG_GETARG_DATUM(1);
2237         Numeric         newval;
2238
2239         newval = DatumGetNumeric(DirectFunctionCall1(int2_numeric, newval2));
2240
2241         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2242 }
2243
2244 Datum
2245 int4_accum(PG_FUNCTION_ARGS)
2246 {
2247         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2248         Datum           newval4 = PG_GETARG_DATUM(1);
2249         Numeric         newval;
2250
2251         newval = DatumGetNumeric(DirectFunctionCall1(int4_numeric, newval4));
2252
2253         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2254 }
2255
2256 Datum
2257 int8_accum(PG_FUNCTION_ARGS)
2258 {
2259         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2260         Datum           newval8 = PG_GETARG_DATUM(1);
2261         Numeric         newval;
2262
2263         newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
2264
2265         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2266 }
2267
2268 /*
2269  * Optimized case for average of int8.
2270  */
2271 Datum
2272 int8_avg_accum(PG_FUNCTION_ARGS)
2273 {
2274         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2275         Datum           newval8 = PG_GETARG_DATUM(1);
2276         Numeric         newval;
2277
2278         newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
2279
2280         PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
2281 }
2282
2283
2284 Datum
2285 numeric_avg(PG_FUNCTION_ARGS)
2286 {
2287         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2288         Datum      *transdatums;
2289         int                     ndatums;
2290         Numeric         N,
2291                                 sumX;
2292
2293         /* We assume the input is array of numeric */
2294         deconstruct_array(transarray,
2295                                           NUMERICOID, -1, false, 'i',
2296                                           &transdatums, NULL, &ndatums);
2297         if (ndatums != 2)
2298                 elog(ERROR, "expected 2-element numeric array");
2299         N = DatumGetNumeric(transdatums[0]);
2300         sumX = DatumGetNumeric(transdatums[1]);
2301
2302         /* SQL92 defines AVG of no values to be NULL */
2303         /* N is zero iff no digits (cf. numeric_uminus) */
2304         if (VARSIZE(N) == NUMERIC_HDRSZ)
2305                 PG_RETURN_NULL();
2306
2307         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div,
2308                                                                                 NumericGetDatum(sumX),
2309                                                                                 NumericGetDatum(N)));
2310 }
2311
2312 /*
2313  * Workhorse routine for the standard deviance and variance
2314  * aggregates. 'transarray' is the aggregate's transition
2315  * array. 'variance' specifies whether we should calculate the
2316  * variance or the standard deviation. 'sample' indicates whether the
2317  * caller is interested in the sample or the population
2318  * variance/stddev.
2319  *
2320  * If appropriate variance statistic is undefined for the input,
2321  * *is_null is set to true and NULL is returned.
2322  */
2323 static Numeric
2324 numeric_stddev_internal(ArrayType *transarray,
2325                                                 bool variance, bool sample,
2326                                                 bool *is_null)
2327 {
2328         Datum      *transdatums;
2329         int                     ndatums;
2330         Numeric         N,
2331                                 sumX,
2332                                 sumX2,
2333                                 res;
2334         NumericVar      vN,
2335                                 vsumX,
2336                                 vsumX2,
2337                                 vNminus1;
2338         NumericVar *comp;
2339         int                     rscale;
2340
2341         *is_null = false;
2342
2343         /* We assume the input is array of numeric */
2344         deconstruct_array(transarray,
2345                                           NUMERICOID, -1, false, 'i',
2346                                           &transdatums, NULL, &ndatums);
2347         if (ndatums != 3)
2348                 elog(ERROR, "expected 3-element numeric array");
2349         N = DatumGetNumeric(transdatums[0]);
2350         sumX = DatumGetNumeric(transdatums[1]);
2351         sumX2 = DatumGetNumeric(transdatums[2]);
2352
2353         if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2))
2354                 return make_result(&const_nan);
2355
2356         init_var(&vN);
2357         set_var_from_num(N, &vN);
2358
2359         /*
2360          * Sample stddev and variance are undefined when N <= 1; population stddev
2361          * is undefined when N == 0. Return NULL in either case.
2362          */
2363         if (sample)
2364                 comp = &const_one;
2365         else
2366                 comp = &const_zero;
2367
2368         if (cmp_var(&vN, comp) <= 0)
2369         {
2370                 free_var(&vN);
2371                 *is_null = true;
2372                 return NULL;
2373         }
2374
2375         init_var(&vNminus1);
2376         sub_var(&vN, &const_one, &vNminus1);
2377
2378         init_var(&vsumX);
2379         set_var_from_num(sumX, &vsumX);
2380         init_var(&vsumX2);
2381         set_var_from_num(sumX2, &vsumX2);
2382
2383         /* compute rscale for mul_var calls */
2384         rscale = vsumX.dscale * 2;
2385
2386         mul_var(&vsumX, &vsumX, &vsumX, rscale);        /* vsumX = sumX * sumX */
2387         mul_var(&vN, &vsumX2, &vsumX2, rscale);         /* vsumX2 = N * sumX2 */
2388         sub_var(&vsumX2, &vsumX, &vsumX2);      /* N * sumX2 - sumX * sumX */
2389
2390         if (cmp_var(&vsumX2, &const_zero) <= 0)
2391         {
2392                 /* Watch out for roundoff error producing a negative numerator */
2393                 res = make_result(&const_zero);
2394         }
2395         else
2396         {
2397                 mul_var(&vN, &vNminus1, &vNminus1, 0);  /* N * (N - 1) */
2398                 rscale = select_div_scale(&vsumX2, &vNminus1);
2399                 div_var(&vsumX2, &vNminus1, &vsumX, rscale, true);              /* variance */
2400                 if (!variance)
2401                         sqrt_var(&vsumX, &vsumX, rscale);       /* stddev */
2402
2403                 res = make_result(&vsumX);
2404         }
2405
2406         free_var(&vN);
2407         free_var(&vNminus1);
2408         free_var(&vsumX);
2409         free_var(&vsumX2);
2410
2411         return res;
2412 }
2413
2414 Datum
2415 numeric_var_samp(PG_FUNCTION_ARGS)
2416 {
2417         Numeric         res;
2418         bool            is_null;
2419
2420         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2421                                                                   true, true, &is_null);
2422
2423         if (is_null)
2424                 PG_RETURN_NULL();
2425         else
2426                 PG_RETURN_NUMERIC(res);
2427 }
2428
2429 Datum
2430 numeric_stddev_samp(PG_FUNCTION_ARGS)
2431 {
2432         Numeric         res;
2433         bool            is_null;
2434
2435         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2436                                                                   false, true, &is_null);
2437
2438         if (is_null)
2439                 PG_RETURN_NULL();
2440         else
2441                 PG_RETURN_NUMERIC(res);
2442 }
2443
2444 Datum
2445 numeric_var_pop(PG_FUNCTION_ARGS)
2446 {
2447         Numeric         res;
2448         bool            is_null;
2449
2450         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2451                                                                   true, false, &is_null);
2452
2453         if (is_null)
2454                 PG_RETURN_NULL();
2455         else
2456                 PG_RETURN_NUMERIC(res);
2457 }
2458
2459 Datum
2460 numeric_stddev_pop(PG_FUNCTION_ARGS)
2461 {
2462         Numeric         res;
2463         bool            is_null;
2464
2465         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2466                                                                   false, false, &is_null);
2467
2468         if (is_null)
2469                 PG_RETURN_NULL();
2470         else
2471                 PG_RETURN_NUMERIC(res);
2472 }
2473
2474 /*
2475  * SUM transition functions for integer datatypes.
2476  *
2477  * To avoid overflow, we use accumulators wider than the input datatype.
2478  * A Numeric accumulator is needed for int8 input; for int4 and int2
2479  * inputs, we use int8 accumulators which should be sufficient for practical
2480  * purposes.  (The latter two therefore don't really belong in this file,
2481  * but we keep them here anyway.)
2482  *
2483  * Because SQL92 defines the SUM() of no values to be NULL, not zero,
2484  * the initial condition of the transition data value needs to be NULL. This
2485  * means we can't rely on ExecAgg to automatically insert the first non-null
2486  * data value into the transition data: it doesn't know how to do the type
2487  * conversion.  The upshot is that these routines have to be marked non-strict
2488  * and handle substitution of the first non-null input themselves.
2489  */
2490
2491 Datum
2492 int2_sum(PG_FUNCTION_ARGS)
2493 {
2494         int64           newval;
2495
2496         if (PG_ARGISNULL(0))
2497         {
2498                 /* No non-null input seen so far... */
2499                 if (PG_ARGISNULL(1))
2500                         PG_RETURN_NULL();       /* still no non-null */
2501                 /* This is the first non-null input. */
2502                 newval = (int64) PG_GETARG_INT16(1);
2503                 PG_RETURN_INT64(newval);
2504         }
2505
2506         /*
2507          * If we're invoked by nodeAgg, we can cheat and modify out first
2508          * parameter in-place to avoid palloc overhead. If not, we need to return
2509          * the new value of the transition variable.
2510          */
2511         if (fcinfo->context && IsA(fcinfo->context, AggState))
2512         {
2513                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
2514
2515                 /* Leave the running sum unchanged in the new input is null */
2516                 if (!PG_ARGISNULL(1))
2517                         *oldsum = *oldsum + (int64) PG_GETARG_INT16(1);
2518
2519                 PG_RETURN_POINTER(oldsum);
2520         }
2521         else
2522         {
2523                 int64           oldsum = PG_GETARG_INT64(0);
2524
2525                 /* Leave sum unchanged if new input is null. */
2526                 if (PG_ARGISNULL(1))
2527                         PG_RETURN_INT64(oldsum);
2528
2529                 /* OK to do the addition. */
2530                 newval = oldsum + (int64) PG_GETARG_INT16(1);
2531
2532                 PG_RETURN_INT64(newval);
2533         }
2534 }
2535
2536 Datum
2537 int4_sum(PG_FUNCTION_ARGS)
2538 {
2539         int64           newval;
2540
2541         if (PG_ARGISNULL(0))
2542         {
2543                 /* No non-null input seen so far... */
2544                 if (PG_ARGISNULL(1))
2545                         PG_RETURN_NULL();       /* still no non-null */
2546                 /* This is the first non-null input. */
2547                 newval = (int64) PG_GETARG_INT32(1);
2548                 PG_RETURN_INT64(newval);
2549         }
2550
2551         /*
2552          * If we're invoked by nodeAgg, we can cheat and modify out first
2553          * parameter in-place to avoid palloc overhead. If not, we need to return
2554          * the new value of the transition variable.
2555          */
2556         if (fcinfo->context && IsA(fcinfo->context, AggState))
2557         {
2558                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
2559
2560                 /* Leave the running sum unchanged in the new input is null */
2561                 if (!PG_ARGISNULL(1))
2562                         *oldsum = *oldsum + (int64) PG_GETARG_INT32(1);
2563
2564                 PG_RETURN_POINTER(oldsum);
2565         }
2566         else
2567         {
2568                 int64           oldsum = PG_GETARG_INT64(0);
2569
2570                 /* Leave sum unchanged if new input is null. */
2571                 if (PG_ARGISNULL(1))
2572                         PG_RETURN_INT64(oldsum);
2573
2574                 /* OK to do the addition. */
2575                 newval = oldsum + (int64) PG_GETARG_INT32(1);
2576
2577                 PG_RETURN_INT64(newval);
2578         }
2579 }
2580
2581 Datum
2582 int8_sum(PG_FUNCTION_ARGS)
2583 {
2584         Numeric         oldsum;
2585         Datum           newval;
2586
2587         if (PG_ARGISNULL(0))
2588         {
2589                 /* No non-null input seen so far... */
2590                 if (PG_ARGISNULL(1))
2591                         PG_RETURN_NULL();       /* still no non-null */
2592                 /* This is the first non-null input. */
2593                 newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2594                 PG_RETURN_DATUM(newval);
2595         }
2596
2597         /*
2598          * Note that we cannot special-case the nodeAgg case here, as we do for
2599          * int2_sum and int4_sum: numeric is of variable size, so we cannot modify
2600          * our first parameter in-place.
2601          */
2602
2603         oldsum = PG_GETARG_NUMERIC(0);
2604
2605         /* Leave sum unchanged if new input is null. */
2606         if (PG_ARGISNULL(1))
2607                 PG_RETURN_NUMERIC(oldsum);
2608
2609         /* OK to do the addition. */
2610         newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2611
2612         PG_RETURN_DATUM(DirectFunctionCall2(numeric_add,
2613                                                                                 NumericGetDatum(oldsum), newval));
2614 }
2615
2616
2617 /*
2618  * Routines for avg(int2) and avg(int4).  The transition datatype
2619  * is a two-element int8 array, holding count and sum.
2620  */
2621
2622 typedef struct Int8TransTypeData
2623 {
2624 #ifndef INT64_IS_BUSTED
2625         int64           count;
2626         int64           sum;
2627 #else
2628         /* "int64" isn't really 64 bits, so fake up properly-aligned fields */
2629         int32           count;
2630         int32           pad1;
2631         int32           sum;
2632         int32           pad2;
2633 #endif
2634 } Int8TransTypeData;
2635
2636 Datum
2637 int2_avg_accum(PG_FUNCTION_ARGS)
2638 {
2639         ArrayType  *transarray;
2640         int16           newval = PG_GETARG_INT16(1);
2641         Int8TransTypeData *transdata;
2642
2643         /*
2644          * If we're invoked by nodeAgg, we can cheat and modify our first
2645          * parameter in-place to reduce palloc overhead. Otherwise we need to make
2646          * a copy of it before scribbling on it.
2647          */
2648         if (fcinfo->context && IsA(fcinfo->context, AggState))
2649                 transarray = PG_GETARG_ARRAYTYPE_P(0);
2650         else
2651                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2652
2653         if (ARR_HASNULL(transarray) ||
2654                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2655                 elog(ERROR, "expected 2-element int8 array");
2656
2657         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2658         transdata->count++;
2659         transdata->sum += newval;
2660
2661         PG_RETURN_ARRAYTYPE_P(transarray);
2662 }
2663
2664 Datum
2665 int4_avg_accum(PG_FUNCTION_ARGS)
2666 {
2667         ArrayType  *transarray;
2668         int32           newval = PG_GETARG_INT32(1);
2669         Int8TransTypeData *transdata;
2670
2671         /*
2672          * If we're invoked by nodeAgg, we can cheat and modify our first
2673          * parameter in-place to reduce palloc overhead. Otherwise we need to make
2674          * a copy of it before scribbling on it.
2675          */
2676         if (fcinfo->context && IsA(fcinfo->context, AggState))
2677                 transarray = PG_GETARG_ARRAYTYPE_P(0);
2678         else
2679                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2680
2681         if (ARR_HASNULL(transarray) ||
2682                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2683                 elog(ERROR, "expected 2-element int8 array");
2684
2685         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2686         transdata->count++;
2687         transdata->sum += newval;
2688
2689         PG_RETURN_ARRAYTYPE_P(transarray);
2690 }
2691
2692 Datum
2693 int8_avg(PG_FUNCTION_ARGS)
2694 {
2695         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2696         Int8TransTypeData *transdata;
2697         Datum           countd,
2698                                 sumd;
2699
2700         if (ARR_HASNULL(transarray) ||
2701                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2702                 elog(ERROR, "expected 2-element int8 array");
2703         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2704
2705         /* SQL92 defines AVG of no values to be NULL */
2706         if (transdata->count == 0)
2707                 PG_RETURN_NULL();
2708
2709         countd = DirectFunctionCall1(int8_numeric,
2710                                                                  Int64GetDatumFast(transdata->count));
2711         sumd = DirectFunctionCall1(int8_numeric,
2712                                                            Int64GetDatumFast(transdata->sum));
2713
2714         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumd, countd));
2715 }
2716
2717
2718 /* ----------------------------------------------------------------------
2719  *
2720  * Debug support
2721  *
2722  * ----------------------------------------------------------------------
2723  */
2724
2725 #ifdef NUMERIC_DEBUG
2726
2727 /*
2728  * dump_numeric() - Dump a value in the db storage format for debugging
2729  */
2730 static void
2731 dump_numeric(const char *str, Numeric num)
2732 {
2733         NumericDigit *digits = NUMERIC_DIGITS(num);
2734         int                     ndigits;
2735         int                     i;
2736
2737         ndigits = NUMERIC_NDIGITS(num);
2738
2739         printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num));
2740         switch (NUMERIC_SIGN(num))
2741         {
2742                 case NUMERIC_POS:
2743                         printf("POS");
2744                         break;
2745                 case NUMERIC_NEG:
2746                         printf("NEG");
2747                         break;
2748                 case NUMERIC_NAN:
2749                         printf("NaN");
2750                         break;
2751                 default:
2752                         printf("SIGN=0x%x", NUMERIC_SIGN(num));
2753                         break;
2754         }
2755
2756         for (i = 0; i < ndigits; i++)
2757                 printf(" %0*d", DEC_DIGITS, digits[i]);
2758         printf("\n");
2759 }
2760
2761
2762 /*
2763  * dump_var() - Dump a value in the variable format for debugging
2764  */
2765 static void
2766 dump_var(const char *str, NumericVar *var)
2767 {
2768         int                     i;
2769
2770         printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale);
2771         switch (var->sign)
2772         {
2773                 case NUMERIC_POS:
2774                         printf("POS");
2775                         break;
2776                 case NUMERIC_NEG:
2777                         printf("NEG");
2778                         break;
2779                 case NUMERIC_NAN:
2780                         printf("NaN");
2781                         break;
2782                 default:
2783                         printf("SIGN=0x%x", var->sign);
2784                         break;
2785         }
2786
2787         for (i = 0; i < var->ndigits; i++)
2788                 printf(" %0*d", DEC_DIGITS, var->digits[i]);
2789
2790         printf("\n");
2791 }
2792 #endif   /* NUMERIC_DEBUG */
2793
2794
2795 /* ----------------------------------------------------------------------
2796  *
2797  * Local functions follow
2798  *
2799  * In general, these do not support NaNs --- callers must eliminate
2800  * the possibility of NaN first.  (make_result() is an exception.)
2801  *
2802  * ----------------------------------------------------------------------
2803  */
2804
2805
2806 /*
2807  * alloc_var() -
2808  *
2809  *      Allocate a digit buffer of ndigits digits (plus a spare digit for rounding)
2810  */
2811 static void
2812 alloc_var(NumericVar *var, int ndigits)
2813 {
2814         digitbuf_free(var->buf);
2815         var->buf = digitbuf_alloc(ndigits + 1);
2816         var->buf[0] = 0;                        /* spare digit for rounding */
2817         var->digits = var->buf + 1;
2818         var->ndigits = ndigits;
2819 }
2820
2821
2822 /*
2823  * free_var() -
2824  *
2825  *      Return the digit buffer of a variable to the free pool
2826  */
2827 static void
2828 free_var(NumericVar *var)
2829 {
2830         digitbuf_free(var->buf);
2831         var->buf = NULL;
2832         var->digits = NULL;
2833         var->sign = NUMERIC_NAN;
2834 }
2835
2836
2837 /*
2838  * zero_var() -
2839  *
2840  *      Set a variable to ZERO.
2841  *      Note: its dscale is not touched.
2842  */
2843 static void
2844 zero_var(NumericVar *var)
2845 {
2846         digitbuf_free(var->buf);
2847         var->buf = NULL;
2848         var->digits = NULL;
2849         var->ndigits = 0;
2850         var->weight = 0;                        /* by convention; doesn't really matter */
2851         var->sign = NUMERIC_POS;        /* anything but NAN... */
2852 }
2853
2854
2855 /*
2856  * set_var_from_str()
2857  *
2858  *      Parse a string and put the number into a variable
2859  */
2860 static void
2861 set_var_from_str(const char *str, NumericVar *dest)
2862 {
2863         const char *cp = str;
2864         bool            have_dp = FALSE;
2865         int                     i;
2866         unsigned char *decdigits;
2867         int                     sign = NUMERIC_POS;
2868         int                     dweight = -1;
2869         int                     ddigits;
2870         int                     dscale = 0;
2871         int                     weight;
2872         int                     ndigits;
2873         int                     offset;
2874         NumericDigit *digits;
2875
2876         /*
2877          * We first parse the string to extract decimal digits and determine the
2878          * correct decimal weight.      Then convert to NBASE representation.
2879          */
2880
2881         /* skip leading spaces */
2882         while (*cp)
2883         {
2884                 if (!isspace((unsigned char) *cp))
2885                         break;
2886                 cp++;
2887         }
2888
2889         switch (*cp)
2890         {
2891                 case '+':
2892                         sign = NUMERIC_POS;
2893                         cp++;
2894                         break;
2895
2896                 case '-':
2897                         sign = NUMERIC_NEG;
2898                         cp++;
2899                         break;
2900         }
2901
2902         if (*cp == '.')
2903         {
2904                 have_dp = TRUE;
2905                 cp++;
2906         }
2907
2908         if (!isdigit((unsigned char) *cp))
2909                 ereport(ERROR,
2910                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2911                           errmsg("invalid input syntax for type numeric: \"%s\"", str)));
2912
2913         decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS * 2);
2914
2915         /* leading padding for digit alignment later */
2916         memset(decdigits, 0, DEC_DIGITS);
2917         i = DEC_DIGITS;
2918
2919         while (*cp)
2920         {
2921                 if (isdigit((unsigned char) *cp))
2922                 {
2923                         decdigits[i++] = *cp++ - '0';
2924                         if (!have_dp)
2925                                 dweight++;
2926                         else
2927                                 dscale++;
2928                 }
2929                 else if (*cp == '.')
2930                 {
2931                         if (have_dp)
2932                                 ereport(ERROR,
2933                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2934                                           errmsg("invalid input syntax for type numeric: \"%s\"",
2935                                                          str)));
2936                         have_dp = TRUE;
2937                         cp++;
2938                 }
2939                 else
2940                         break;
2941         }
2942
2943         ddigits = i - DEC_DIGITS;
2944         /* trailing padding for digit alignment later */
2945         memset(decdigits + i, 0, DEC_DIGITS - 1);
2946
2947         /* Handle exponent, if any */
2948         if (*cp == 'e' || *cp == 'E')
2949         {
2950                 long            exponent;
2951                 char       *endptr;
2952
2953                 cp++;
2954                 exponent = strtol(cp, &endptr, 10);
2955                 if (endptr == cp)
2956                         ereport(ERROR,
2957                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2958                                          errmsg("invalid input syntax for type numeric: \"%s\"",
2959                                                         str)));
2960                 cp = endptr;
2961                 if (exponent > NUMERIC_MAX_PRECISION ||
2962                         exponent < -NUMERIC_MAX_PRECISION)
2963                         ereport(ERROR,
2964                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2965                                          errmsg("invalid input syntax for type numeric: \"%s\"",
2966                                                         str)));
2967                 dweight += (int) exponent;
2968                 dscale -= (int) exponent;
2969                 if (dscale < 0)
2970                         dscale = 0;
2971         }
2972
2973         /* Should be nothing left but spaces */
2974         while (*cp)
2975         {
2976                 if (!isspace((unsigned char) *cp))
2977                         ereport(ERROR,
2978                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2979                                          errmsg("invalid input syntax for type numeric: \"%s\"",
2980                                                         str)));
2981                 cp++;
2982         }
2983
2984         /*
2985          * Okay, convert pure-decimal representation to base NBASE.  First we need
2986          * to determine the converted weight and ndigits.  offset is the number of
2987          * decimal zeroes to insert before the first given digit to have a
2988          * correctly aligned first NBASE digit.
2989          */
2990         if (dweight >= 0)
2991                 weight = (dweight + 1 + DEC_DIGITS - 1) / DEC_DIGITS - 1;
2992         else
2993                 weight = -((-dweight - 1) / DEC_DIGITS + 1);
2994         offset = (weight + 1) * DEC_DIGITS - (dweight + 1);
2995         ndigits = (ddigits + offset + DEC_DIGITS - 1) / DEC_DIGITS;
2996
2997         alloc_var(dest, ndigits);
2998         dest->sign = sign;
2999         dest->weight = weight;
3000         dest->dscale = dscale;
3001
3002         i = DEC_DIGITS - offset;
3003         digits = dest->digits;
3004
3005         while (ndigits-- > 0)
3006         {
3007 #if DEC_DIGITS == 4
3008                 *digits++ = ((decdigits[i] * 10 + decdigits[i + 1]) * 10 +
3009                                          decdigits[i + 2]) * 10 + decdigits[i + 3];
3010 #elif DEC_DIGITS == 2
3011                 *digits++ = decdigits[i] * 10 + decdigits[i + 1];
3012 #elif DEC_DIGITS == 1
3013                 *digits++ = decdigits[i];
3014 #else
3015 #error unsupported NBASE
3016 #endif
3017                 i += DEC_DIGITS;
3018         }
3019
3020         pfree(decdigits);
3021
3022         /* Strip any leading/trailing zeroes, and normalize weight if zero */
3023         strip_var(dest);
3024 }
3025
3026
3027 /*
3028  * set_var_from_num() -
3029  *
3030  *      Convert the packed db format into a variable
3031  */
3032 static void
3033 set_var_from_num(Numeric num, NumericVar *dest)
3034 {
3035         int                     ndigits;
3036
3037         ndigits = NUMERIC_NDIGITS(num);
3038
3039         alloc_var(dest, ndigits);
3040
3041         dest->weight = num->n_weight;
3042         dest->sign = NUMERIC_SIGN(num);
3043         dest->dscale = NUMERIC_DSCALE(num);
3044
3045         memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit));
3046 }
3047
3048
3049 /*
3050  * set_var_from_var() -
3051  *
3052  *      Copy one variable into another
3053  */
3054 static void
3055 set_var_from_var(NumericVar *value, NumericVar *dest)
3056 {
3057         NumericDigit *newbuf;
3058
3059         newbuf = digitbuf_alloc(value->ndigits + 1);
3060         newbuf[0] = 0;                          /* spare digit for rounding */
3061         memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit));
3062
3063         digitbuf_free(dest->buf);
3064
3065         memmove(dest, value, sizeof(NumericVar));
3066         dest->buf = newbuf;
3067         dest->digits = newbuf + 1;
3068 }
3069
3070
3071 /*
3072  * get_str_from_var() -
3073  *
3074  *      Convert a var to text representation (guts of numeric_out).
3075  *      CAUTION: var's contents may be modified by rounding!
3076  *      Returns a palloc'd string.
3077  */
3078 static char *
3079 get_str_from_var(NumericVar *var, int dscale)
3080 {
3081         char       *str;
3082         char       *cp;
3083         char       *endcp;
3084         int                     i;
3085         int                     d;
3086         NumericDigit dig;
3087
3088 #if DEC_DIGITS > 1
3089         NumericDigit d1;
3090 #endif
3091
3092         if (dscale < 0)
3093                 dscale = 0;
3094
3095         /*
3096          * Check if we must round up before printing the value and do so.
3097          */
3098         round_var(var, dscale);
3099
3100         /*
3101          * Allocate space for the result.
3102          *
3103          * i is set to to # of decimal digits before decimal point. dscale is the
3104          * # of decimal digits we will print after decimal point. We may generate
3105          * as many as DEC_DIGITS-1 excess digits at the end, and in addition we
3106          * need room for sign, decimal point, null terminator.
3107          */
3108         i = (var->weight + 1) * DEC_DIGITS;
3109         if (i <= 0)
3110                 i = 1;
3111
3112         str = palloc(i + dscale + DEC_DIGITS + 2);
3113         cp = str;
3114
3115         /*
3116          * Output a dash for negative values
3117          */
3118         if (var->sign == NUMERIC_NEG)
3119                 *cp++ = '-';
3120
3121         /*
3122          * Output all digits before the decimal point
3123          */
3124         if (var->weight < 0)
3125         {
3126                 d = var->weight + 1;
3127                 *cp++ = '0';
3128         }
3129         else
3130         {
3131                 for (d = 0; d <= var->weight; d++)
3132                 {
3133                         dig = (d < var->ndigits) ? var->digits[d] : 0;
3134                         /* In the first digit, suppress extra leading decimal zeroes */
3135 #if DEC_DIGITS == 4
3136                         {
3137                                 bool            putit = (d > 0);
3138
3139                                 d1 = dig / 1000;
3140                                 dig -= d1 * 1000;
3141                                 putit |= (d1 > 0);
3142                                 if (putit)
3143                                         *cp++ = d1 + '0';
3144                                 d1 = dig / 100;
3145                                 dig -= d1 * 100;
3146                                 putit |= (d1 > 0);
3147                                 if (putit)
3148                                         *cp++ = d1 + '0';
3149                                 d1 = dig / 10;
3150                                 dig -= d1 * 10;
3151                                 putit |= (d1 > 0);
3152                                 if (putit)
3153                                         *cp++ = d1 + '0';
3154                                 *cp++ = dig + '0';
3155                         }
3156 #elif DEC_DIGITS == 2
3157                         d1 = dig / 10;
3158                         dig -= d1 * 10;
3159                         if (d1 > 0 || d > 0)
3160                                 *cp++ = d1 + '0';
3161                         *cp++ = dig + '0';
3162 #elif DEC_DIGITS == 1
3163                         *cp++ = dig + '0';
3164 #else
3165 #error unsupported NBASE
3166 #endif
3167                 }
3168         }
3169
3170         /*
3171          * If requested, output a decimal point and all the digits that follow it.
3172          * We initially put out a multiple of DEC_DIGITS digits, then truncate if
3173          * needed.
3174          */
3175         if (dscale > 0)
3176         {
3177                 *cp++ = '.';
3178                 endcp = cp + dscale;
3179                 for (i = 0; i < dscale; d++, i += DEC_DIGITS)
3180                 {
3181                         dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0;
3182 #if DEC_DIGITS == 4
3183                         d1 = dig / 1000;
3184                         dig -= d1 * 1000;
3185                         *cp++ = d1 + '0';
3186                         d1 = dig / 100;
3187                         dig -= d1 * 100;
3188                         *cp++ = d1 + '0';
3189                         d1 = dig / 10;
3190                         dig -= d1 * 10;
3191                         *cp++ = d1 + '0';
3192                         *cp++ = dig + '0';
3193 #elif DEC_DIGITS == 2
3194                         d1 = dig / 10;
3195                         dig -= d1 * 10;
3196                         *cp++ = d1 + '0';
3197                         *cp++ = dig + '0';
3198 #elif DEC_DIGITS == 1
3199                         *cp++ = dig + '0';
3200 #else
3201 #error unsupported NBASE
3202 #endif
3203                 }
3204                 cp = endcp;
3205         }
3206
3207         /*
3208          * terminate the string and return it
3209          */
3210         *cp = '\0';
3211         return str;
3212 }
3213
3214
3215 /*
3216  * make_result() -
3217  *
3218  *      Create the packed db numeric format in palloc()'d memory from
3219  *      a variable.
3220  */
3221 static Numeric
3222 make_result(NumericVar *var)
3223 {
3224         Numeric         result;
3225         NumericDigit *digits = var->digits;
3226         int                     weight = var->weight;
3227         int                     sign = var->sign;
3228         int                     n;
3229         Size            len;
3230
3231         if (sign == NUMERIC_NAN)
3232         {
3233                 result = (Numeric) palloc(NUMERIC_HDRSZ);
3234
3235                 SET_VARSIZE(result, NUMERIC_HDRSZ);
3236                 result->n_weight = 0;
3237                 result->n_sign_dscale = NUMERIC_NAN;
3238
3239                 dump_numeric("make_result()", result);
3240                 return result;
3241         }
3242
3243         n = var->ndigits;
3244
3245         /* truncate leading zeroes */
3246         while (n > 0 && *digits == 0)
3247         {
3248                 digits++;
3249                 weight--;
3250                 n--;
3251         }
3252         /* truncate trailing zeroes */
3253         while (n > 0 && digits[n - 1] == 0)
3254                 n--;
3255
3256         /* If zero result, force to weight=0 and positive sign */
3257         if (n == 0)
3258         {
3259                 weight = 0;
3260                 sign = NUMERIC_POS;
3261         }
3262
3263         /* Build the result */
3264         len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
3265         result = (Numeric) palloc(len);
3266         SET_VARSIZE(result, len);
3267         result->n_weight = weight;
3268         result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK);
3269
3270         memcpy(result->n_data, digits, n * sizeof(NumericDigit));
3271
3272         /* Check for overflow of int16 fields */
3273         if (result->n_weight != weight ||
3274                 NUMERIC_DSCALE(result) != var->dscale)
3275                 ereport(ERROR,
3276                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3277                                  errmsg("value overflows numeric format")));
3278
3279         dump_numeric("make_result()", result);
3280         return result;
3281 }
3282
3283
3284 /*
3285  * apply_typmod() -
3286  *
3287  *      Do bounds checking and rounding according to the attributes
3288  *      typmod field.
3289  */
3290 static void
3291 apply_typmod(NumericVar *var, int32 typmod)
3292 {
3293         int                     precision;
3294         int                     scale;
3295         int                     maxdigits;
3296         int                     ddigits;
3297         int                     i;
3298
3299         /* Do nothing if we have a default typmod (-1) */
3300         if (typmod < (int32) (VARHDRSZ))
3301                 return;
3302
3303         typmod -= VARHDRSZ;
3304         precision = (typmod >> 16) & 0xffff;
3305         scale = typmod & 0xffff;
3306         maxdigits = precision - scale;
3307
3308         /* Round to target scale (and set var->dscale) */
3309         round_var(var, scale);
3310
3311         /*
3312          * Check for overflow - note we can't do this before rounding, because
3313          * rounding could raise the weight.  Also note that the var's weight could
3314          * be inflated by leading zeroes, which will be stripped before storage
3315          * but perhaps might not have been yet. In any case, we must recognize a
3316          * true zero, whose weight doesn't mean anything.
3317          */
3318         ddigits = (var->weight + 1) * DEC_DIGITS;
3319         if (ddigits > maxdigits)
3320         {
3321                 /* Determine true weight; and check for all-zero result */
3322                 for (i = 0; i < var->ndigits; i++)
3323                 {
3324                         NumericDigit dig = var->digits[i];
3325
3326                         if (dig)
3327                         {
3328                                 /* Adjust for any high-order decimal zero digits */
3329 #if DEC_DIGITS == 4
3330                                 if (dig < 10)
3331                                         ddigits -= 3;
3332                                 else if (dig < 100)
3333                                         ddigits -= 2;
3334                                 else if (dig < 1000)
3335                                         ddigits -= 1;
3336 #elif DEC_DIGITS == 2
3337                                 if (dig < 10)
3338                                         ddigits -= 1;
3339 #elif DEC_DIGITS == 1
3340                                 /* no adjustment */
3341 #else
3342 #error unsupported NBASE
3343 #endif
3344                                 if (ddigits > maxdigits)
3345                                         ereport(ERROR,
3346                                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3347                                                          errmsg("numeric field overflow"),
3348                                                          errdetail("A field with precision %d, scale %d must round to an absolute value less than %s%d.",
3349                                                                            precision, scale,
3350                                         /* Display 10^0 as 1 */
3351                                                                            maxdigits ? "10^" : "",
3352                                                                            maxdigits ? maxdigits : 1
3353                                                                            )));
3354                                 break;
3355                         }
3356                         ddigits -= DEC_DIGITS;
3357                 }
3358         }
3359 }
3360
3361 /*
3362  * Convert numeric to int8, rounding if needed.
3363  *
3364  * If overflow, return FALSE (no error is raised).      Return TRUE if okay.
3365  *
3366  *      CAUTION: var's contents may be modified by rounding!
3367  */
3368 static bool
3369 numericvar_to_int8(NumericVar *var, int64 *result)
3370 {
3371         NumericDigit *digits;
3372         int                     ndigits;
3373         int                     weight;
3374         int                     i;
3375         int64           val,
3376                                 oldval;
3377         bool            neg;
3378
3379         /* Round to nearest integer */
3380         round_var(var, 0);
3381
3382         /* Check for zero input */
3383         strip_var(var);
3384         ndigits = var->ndigits;
3385         if (ndigits == 0)
3386         {
3387                 *result = 0;
3388                 return true;
3389         }
3390
3391         /*
3392          * For input like 10000000000, we must treat stripped digits as real. So
3393          * the loop assumes there are weight+1 digits before the decimal point.
3394          */
3395         weight = var->weight;
3396         Assert(weight >= 0 && ndigits <= weight + 1);
3397
3398         /* Construct the result */
3399         digits = var->digits;
3400         neg = (var->sign == NUMERIC_NEG);
3401         val = digits[0];
3402         for (i = 1; i <= weight; i++)
3403         {
3404                 oldval = val;
3405                 val *= NBASE;
3406                 if (i < ndigits)
3407                         val += digits[i];
3408
3409                 /*
3410                  * The overflow check is a bit tricky because we want to accept
3411                  * INT64_MIN, which will overflow the positive accumulator.  We can
3412                  * detect this case easily though because INT64_MIN is the only
3413                  * nonzero value for which -val == val (on a two's complement machine,
3414                  * anyway).
3415                  */
3416                 if ((val / NBASE) != oldval)    /* possible overflow? */
3417                 {
3418                         if (!neg || (-val) != val || val == 0 || oldval < 0)
3419                                 return false;
3420                 }
3421         }
3422
3423         *result = neg ? -val : val;
3424         return true;
3425 }
3426
3427 /*
3428  * Convert int8 value to numeric.
3429  */
3430 static void
3431 int8_to_numericvar(int64 val, NumericVar *var)
3432 {
3433         uint64          uval,
3434                                 newuval;
3435         NumericDigit *ptr;
3436         int                     ndigits;
3437
3438         /* int8 can require at most 19 decimal digits; add one for safety */
3439         alloc_var(var, 20 / DEC_DIGITS);
3440         if (val < 0)
3441         {
3442                 var->sign = NUMERIC_NEG;
3443                 uval = -val;
3444         }
3445         else
3446         {
3447                 var->sign = NUMERIC_POS;
3448                 uval = val;
3449         }
3450         var->dscale = 0;
3451         if (val == 0)
3452         {
3453                 var->ndigits = 0;
3454                 var->weight = 0;
3455                 return;
3456         }
3457         ptr = var->digits + var->ndigits;
3458         ndigits = 0;
3459         do
3460         {
3461                 ptr--;
3462                 ndigits++;
3463                 newuval = uval / NBASE;
3464                 *ptr = uval - newuval * NBASE;
3465                 uval = newuval;
3466         } while (uval);
3467         var->digits = ptr;
3468         var->ndigits = ndigits;
3469         var->weight = ndigits - 1;
3470 }
3471
3472 /*
3473  * Convert numeric to float8; if out of range, return +/- HUGE_VAL
3474  */
3475 static double
3476 numeric_to_double_no_overflow(Numeric num)
3477 {
3478         char       *tmp;
3479         double          val;
3480         char       *endptr;
3481
3482         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
3483                                                                                           NumericGetDatum(num)));
3484
3485         /* unlike float8in, we ignore ERANGE from strtod */
3486         val = strtod(tmp, &endptr);
3487         if (*endptr != '\0')
3488         {
3489                 /* shouldn't happen ... */
3490                 ereport(ERROR,
3491                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3492                          errmsg("invalid input syntax for type double precision: \"%s\"",
3493                                         tmp)));
3494         }
3495
3496         pfree(tmp);
3497
3498         return val;
3499 }
3500
3501 /* As above, but work from a NumericVar */
3502 static double
3503 numericvar_to_double_no_overflow(NumericVar *var)
3504 {
3505         char       *tmp;
3506         double          val;
3507         char       *endptr;
3508
3509         tmp = get_str_from_var(var, var->dscale);
3510
3511         /* unlike float8in, we ignore ERANGE from strtod */
3512         val = strtod(tmp, &endptr);
3513         if (*endptr != '\0')
3514         {
3515                 /* shouldn't happen ... */
3516                 ereport(ERROR,
3517                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3518                          errmsg("invalid input syntax for type double precision: \"%s\"",
3519                                         tmp)));
3520         }
3521
3522         pfree(tmp);
3523
3524         return val;
3525 }
3526
3527
3528 /*
3529  * cmp_var() -
3530  *
3531  *      Compare two values on variable level.  We assume zeroes have been
3532  *      truncated to no digits.
3533  */
3534 static int
3535 cmp_var(NumericVar *var1, NumericVar *var2)
3536 {
3537         return cmp_var_common(var1->digits, var1->ndigits,
3538                                                   var1->weight, var1->sign,
3539                                                   var2->digits, var2->ndigits,
3540                                                   var2->weight, var2->sign);
3541 }
3542
3543 /*
3544  * cmp_var_common() -
3545  *
3546  *      Main routine of cmp_var(). This function can be used by both
3547  *      NumericVar and Numeric.
3548  */
3549 static int
3550 cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
3551                            int var1weight, int var1sign,
3552                            const NumericDigit *var2digits, int var2ndigits,
3553                            int var2weight, int var2sign)
3554 {
3555         if (var1ndigits == 0)
3556         {
3557                 if (var2ndigits == 0)
3558                         return 0;
3559                 if (var2sign == NUMERIC_NEG)
3560                         return 1;
3561                 return -1;
3562         }
3563         if (var2ndigits == 0)
3564         {
3565                 if (var1sign == NUMERIC_POS)
3566                         return 1;
3567                 return -1;
3568         }
3569
3570         if (var1sign == NUMERIC_POS)
3571         {
3572                 if (var2sign == NUMERIC_NEG)
3573                         return 1;
3574                 return cmp_abs_common(var1digits, var1ndigits, var1weight,
3575                                                           var2digits, var2ndigits, var2weight);
3576         }
3577
3578         if (var2sign == NUMERIC_POS)
3579                 return -1;
3580
3581         return cmp_abs_common(var2digits, var2ndigits, var2weight,
3582                                                   var1digits, var1ndigits, var1weight);
3583 }
3584
3585
3586 /*
3587  * add_var() -
3588  *
3589  *      Full version of add functionality on variable level (handling signs).
3590  *      result might point to one of the operands too without danger.
3591  */
3592 static void
3593 add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3594 {
3595         /*
3596          * Decide on the signs of the two variables what to do
3597          */
3598         if (var1->sign == NUMERIC_POS)
3599         {
3600                 if (var2->sign == NUMERIC_POS)
3601                 {
3602                         /*
3603                          * Both are positive result = +(ABS(var1) + ABS(var2))
3604                          */
3605                         add_abs(var1, var2, result);
3606                         result->sign = NUMERIC_POS;
3607                 }
3608                 else
3609                 {
3610                         /*
3611                          * var1 is positive, var2 is negative Must compare absolute values
3612                          */
3613                         switch (cmp_abs(var1, var2))
3614                         {
3615                                 case 0:
3616                                         /* ----------
3617                                          * ABS(var1) == ABS(var2)
3618                                          * result = ZERO
3619                                          * ----------
3620                                          */
3621                                         zero_var(result);
3622                                         result->dscale = Max(var1->dscale, var2->dscale);
3623                                         break;
3624
3625                                 case 1:
3626                                         /* ----------
3627                                          * ABS(var1) > ABS(var2)
3628                                          * result = +(ABS(var1) - ABS(var2))
3629                                          * ----------
3630                                          */
3631                                         sub_abs(var1, var2, result);
3632                                         result->sign = NUMERIC_POS;
3633                                         break;
3634
3635                                 case -1:
3636                                         /* ----------
3637                                          * ABS(var1) < ABS(var2)
3638                                          * result = -(ABS(var2) - ABS(var1))
3639                                          * ----------
3640                                          */
3641                                         sub_abs(var2, var1, result);
3642                                         result->sign = NUMERIC_NEG;
3643                                         break;
3644                         }
3645                 }
3646         }
3647         else
3648         {
3649                 if (var2->sign == NUMERIC_POS)
3650                 {
3651                         /* ----------
3652                          * var1 is negative, var2 is positive
3653                          * Must compare absolute values
3654                          * ----------
3655                          */
3656                         switch (cmp_abs(var1, var2))
3657                         {
3658                                 case 0:
3659                                         /* ----------
3660                                          * ABS(var1) == ABS(var2)
3661                                          * result = ZERO
3662                                          * ----------
3663                                          */
3664                                         zero_var(result);
3665                                         result->dscale = Max(var1->dscale, var2->dscale);
3666                                         break;
3667
3668                                 case 1:
3669                                         /* ----------
3670                                          * ABS(var1) > ABS(var2)
3671                                          * result = -(ABS(var1) - ABS(var2))
3672                                          * ----------
3673                                          */
3674                                         sub_abs(var1, var2, result);
3675                                         result->sign = NUMERIC_NEG;
3676                                         break;
3677
3678                                 case -1:
3679                                         /* ----------
3680                                          * ABS(var1) < ABS(var2)
3681                                          * result = +(ABS(var2) - ABS(var1))
3682                                          * ----------
3683                                          */
3684                                         sub_abs(var2, var1, result);
3685                                         result->sign = NUMERIC_POS;
3686                                         break;
3687                         }
3688                 }
3689                 else
3690                 {
3691                         /* ----------
3692                          * Both are negative
3693                          * result = -(ABS(var1) + ABS(var2))
3694                          * ----------
3695                          */
3696                         add_abs(var1, var2, result);
3697                         result->sign = NUMERIC_NEG;
3698                 }
3699         }
3700 }
3701
3702
3703 /*
3704  * sub_var() -
3705  *
3706  *      Full version of sub functionality on variable level (handling signs).
3707  *      result might point to one of the operands too without danger.
3708  */
3709 static void
3710 sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3711 {
3712         /*
3713          * Decide on the signs of the two variables what to do
3714          */
3715         if (var1->sign == NUMERIC_POS)
3716         {
3717                 if (var2->sign == NUMERIC_NEG)
3718                 {
3719                         /* ----------
3720                          * var1 is positive, var2 is negative
3721                          * result = +(ABS(var1) + ABS(var2))
3722                          * ----------
3723                          */
3724                         add_abs(var1, var2, result);
3725                         result->sign = NUMERIC_POS;
3726                 }
3727                 else
3728                 {
3729                         /* ----------
3730                          * Both are positive
3731                          * Must compare absolute values
3732                          * ----------
3733                          */
3734                         switch (cmp_abs(var1, var2))
3735                         {
3736                                 case 0:
3737                                         /* ----------
3738                                          * ABS(var1) == ABS(var2)
3739                                          * result = ZERO
3740                                          * ----------
3741                                          */
3742                                         zero_var(result);
3743                                         result->dscale = Max(var1->dscale, var2->dscale);
3744                                         break;
3745
3746                                 case 1:
3747                                         /* ----------
3748                                          * ABS(var1) > ABS(var2)
3749                                          * result = +(ABS(var1) - ABS(var2))
3750                                          * ----------
3751                                          */
3752                                         sub_abs(var1, var2, result);
3753                                         result->sign = NUMERIC_POS;
3754                                         break;
3755
3756                                 case -1:
3757                                         /* ----------
3758                                          * ABS(var1) < ABS(var2)
3759                                          * result = -(ABS(var2) - ABS(var1))
3760                                          * ----------
3761                                          */
3762                                         sub_abs(var2, var1, result);
3763                                         result->sign = NUMERIC_NEG;
3764                                         break;
3765                         }
3766                 }
3767         }
3768         else
3769         {
3770                 if (var2->sign == NUMERIC_NEG)
3771                 {
3772                         /* ----------
3773                          * Both are negative
3774                          * Must compare absolute values
3775                          * ----------
3776                          */
3777                         switch (cmp_abs(var1, var2))
3778                         {
3779                                 case 0:
3780                                         /* ----------
3781                                          * ABS(var1) == ABS(var2)
3782                                          * result = ZERO
3783                                          * ----------
3784                                          */
3785                                         zero_var(result);
3786                                         result->dscale = Max(var1->dscale, var2->dscale);
3787                                         break;
3788
3789                                 case 1:
3790                                         /* ----------
3791                                          * ABS(var1) > ABS(var2)
3792                                          * result = -(ABS(var1) - ABS(var2))
3793                                          * ----------
3794                                          */
3795                                         sub_abs(var1, var2, result);
3796                                         result->sign = NUMERIC_NEG;
3797                                         break;
3798
3799                                 case -1:
3800                                         /* ----------
3801                                          * ABS(var1) < ABS(var2)
3802                                          * result = +(ABS(var2) - ABS(var1))
3803                                          * ----------
3804                                          */
3805                                         sub_abs(var2, var1, result);
3806                                         result->sign = NUMERIC_POS;
3807                                         break;
3808                         }
3809                 }
3810                 else
3811                 {
3812                         /* ----------
3813                          * var1 is negative, var2 is positive
3814                          * result = -(ABS(var1) + ABS(var2))
3815                          * ----------
3816                          */
3817                         add_abs(var1, var2, result);
3818                         result->sign = NUMERIC_NEG;
3819                 }
3820         }
3821 }
3822
3823
3824 /*
3825  * mul_var() -
3826  *
3827  *      Multiplication on variable level. Product of var1 * var2 is stored
3828  *      in result.      Result is rounded to no more than rscale fractional digits.
3829  */
3830 static void
3831 mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
3832                 int rscale)
3833 {
3834         int                     res_ndigits;
3835         int                     res_sign;
3836         int                     res_weight;
3837         int                     maxdigits;
3838         int                *dig;
3839         int                     carry;
3840         int                     maxdig;
3841         int                     newdig;
3842         NumericDigit *res_digits;
3843         int                     i,
3844                                 ri,
3845                                 i1,
3846                                 i2;
3847
3848         /* copy these values into local vars for speed in inner loop */
3849         int                     var1ndigits = var1->ndigits;
3850         int                     var2ndigits = var2->ndigits;
3851         NumericDigit *var1digits = var1->digits;
3852         NumericDigit *var2digits = var2->digits;
3853
3854         if (var1ndigits == 0 || var2ndigits == 0)
3855         {
3856                 /* one or both inputs is zero; so is result */
3857                 zero_var(result);
3858                 result->dscale = rscale;
3859                 return;
3860         }
3861
3862         /* Determine result sign and (maximum possible) weight */
3863         if (var1->sign == var2->sign)
3864                 res_sign = NUMERIC_POS;
3865         else
3866                 res_sign = NUMERIC_NEG;
3867         res_weight = var1->weight + var2->weight + 2;
3868
3869         /*
3870          * Determine number of result digits to compute.  If the exact result
3871          * would have more than rscale fractional digits, truncate the computation
3872          * with MUL_GUARD_DIGITS guard digits.  We do that by pretending that one
3873          * or both inputs have fewer digits than they really do.
3874          */
3875         res_ndigits = var1ndigits + var2ndigits + 1;
3876         maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS;
3877         if (res_ndigits > maxdigits)
3878         {
3879                 if (maxdigits < 3)
3880                 {
3881                         /* no useful precision at all in the result... */
3882                         zero_var(result);
3883                         result->dscale = rscale;
3884                         return;
3885                 }
3886                 /* force maxdigits odd so that input ndigits can be equal */
3887                 if ((maxdigits & 1) == 0)
3888                         maxdigits++;
3889                 if (var1ndigits > var2ndigits)
3890                 {
3891                         var1ndigits -= res_ndigits - maxdigits;
3892                         if (var1ndigits < var2ndigits)
3893                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
3894                 }
3895                 else
3896                 {
3897                         var2ndigits -= res_ndigits - maxdigits;
3898                         if (var2ndigits < var1ndigits)
3899                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
3900                 }
3901                 res_ndigits = maxdigits;
3902                 Assert(res_ndigits == var1ndigits + var2ndigits + 1);
3903         }
3904
3905         /*
3906          * We do the arithmetic in an array "dig[]" of signed int's.  Since
3907          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
3908          * to avoid normalizing carries immediately.
3909          *
3910          * maxdig tracks the maximum possible value of any dig[] entry; when this
3911          * threatens to exceed INT_MAX, we take the time to propagate carries. To
3912          * avoid overflow in maxdig itself, it actually represents the max
3913          * possible value divided by NBASE-1.
3914          */
3915         dig = (int *) palloc0(res_ndigits * sizeof(int));
3916         maxdig = 0;
3917
3918         ri = res_ndigits - 1;
3919         for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--)
3920         {
3921                 int                     var1digit = var1digits[i1];
3922
3923                 if (var1digit == 0)
3924                         continue;
3925
3926                 /* Time to normalize? */
3927                 maxdig += var1digit;
3928                 if (maxdig > INT_MAX / (NBASE - 1))
3929                 {
3930                         /* Yes, do it */
3931                         carry = 0;
3932                         for (i = res_ndigits - 1; i >= 0; i--)
3933                         {
3934                                 newdig = dig[i] + carry;
3935                                 if (newdig >= NBASE)
3936                                 {
3937                                         carry = newdig / NBASE;
3938                                         newdig -= carry * NBASE;
3939                                 }
3940                                 else
3941                                         carry = 0;
3942                                 dig[i] = newdig;
3943                         }
3944                         Assert(carry == 0);
3945                         /* Reset maxdig to indicate new worst-case */
3946                         maxdig = 1 + var1digit;
3947                 }
3948
3949                 /* Add appropriate multiple of var2 into the accumulator */
3950                 i = ri;
3951                 for (i2 = var2ndigits - 1; i2 >= 0; i2--)
3952                         dig[i--] += var1digit * var2digits[i2];
3953         }
3954
3955         /*
3956          * Now we do a final carry propagation pass to normalize the result, which
3957          * we combine with storing the result digits into the output. Note that
3958          * this is still done at full precision w/guard digits.
3959          */
3960         alloc_var(result, res_ndigits);
3961         res_digits = result->digits;
3962         carry = 0;
3963         for (i = res_ndigits - 1; i >= 0; i--)
3964         {
3965                 newdig = dig[i] + carry;
3966                 if (newdig >= NBASE)
3967                 {
3968                         carry = newdig / NBASE;
3969                         newdig -= carry * NBASE;
3970                 }
3971                 else
3972                         carry = 0;
3973                 res_digits[i] = newdig;
3974         }
3975         Assert(carry == 0);
3976
3977         pfree(dig);
3978
3979         /*
3980          * Finally, round the result to the requested precision.
3981          */
3982         result->weight = res_weight;
3983         result->sign = res_sign;
3984
3985         /* Round to target rscale (and set result->dscale) */
3986         round_var(result, rscale);
3987
3988         /* Strip leading and trailing zeroes */
3989         strip_var(result);
3990 }
3991
3992
3993 /*
3994  * div_var() -
3995  *
3996  *      Division on variable level. Quotient of var1 / var2 is stored
3997  *      in result.      Result is rounded to no more than rscale fractional digits.
3998  */
3999 static void
4000 div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
4001                 int rscale, bool round)
4002 {
4003         int                     div_ndigits;
4004         int                     res_sign;
4005         int                     res_weight;
4006         int                *div;
4007         int                     qdigit;
4008         int                     carry;
4009         int                     maxdiv;
4010         int                     newdig;
4011         NumericDigit *res_digits;
4012         double          fdividend,
4013                                 fdivisor,
4014                                 fdivisorinverse,
4015                                 fquotient;
4016         int                     qi;
4017         int                     i;
4018
4019         /* copy these values into local vars for speed in inner loop */
4020         int                     var1ndigits = var1->ndigits;
4021         int                     var2ndigits = var2->ndigits;
4022         NumericDigit *var1digits = var1->digits;
4023         NumericDigit *var2digits = var2->digits;
4024
4025         /*
4026          * First of all division by zero check; we must not be handed an
4027          * unnormalized divisor.
4028          */
4029         if (var2ndigits == 0 || var2digits[0] == 0)
4030                 ereport(ERROR,
4031                                 (errcode(ERRCODE_DIVISION_BY_ZERO),
4032                                  errmsg("division by zero")));
4033
4034         /*
4035          * Now result zero check
4036          */
4037         if (var1ndigits == 0)
4038         {
4039                 zero_var(result);
4040                 result->dscale = rscale;
4041                 return;
4042         }
4043
4044         /*
4045          * Determine the result sign, weight and number of digits to calculate
4046          */
4047         if (var1->sign == var2->sign)
4048                 res_sign = NUMERIC_POS;
4049         else
4050                 res_sign = NUMERIC_NEG;
4051         res_weight = var1->weight - var2->weight + 1;
4052         /* The number of accurate result digits we need to produce: */
4053         div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
4054         /* Add guard digits for roundoff error */
4055         div_ndigits += DIV_GUARD_DIGITS;
4056         if (div_ndigits < DIV_GUARD_DIGITS)
4057                 div_ndigits = DIV_GUARD_DIGITS;
4058         /* Must be at least var1ndigits, too, to simplify data-loading loop */
4059         if (div_ndigits < var1ndigits)
4060                 div_ndigits = var1ndigits;
4061
4062         /*
4063          * We do the arithmetic in an array "div[]" of signed int's.  Since
4064          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
4065          * to avoid normalizing carries immediately.
4066          *
4067          * We start with div[] containing one zero digit followed by the
4068          * dividend's digits (plus appended zeroes to reach the desired precision
4069          * including guard digits).  Each step of the main loop computes an
4070          * (approximate) quotient digit and stores it into div[], removing one
4071          * position of dividend space.  A final pass of carry propagation takes
4072          * care of any mistaken quotient digits.
4073          */
4074         div = (int *) palloc0((div_ndigits + 1) * sizeof(int));
4075         for (i = 0; i < var1ndigits; i++)
4076                 div[i + 1] = var1digits[i];
4077
4078         /*
4079          * We estimate each quotient digit using floating-point arithmetic, taking
4080          * the first four digits of the (current) dividend and divisor. This must
4081          * be float to avoid overflow.
4082          */
4083         fdivisor = (double) var2digits[0];
4084         for (i = 1; i < 4; i++)
4085         {
4086                 fdivisor *= NBASE;
4087                 if (i < var2ndigits)
4088                         fdivisor += (double) var2digits[i];
4089         }
4090         fdivisorinverse = 1.0 / fdivisor;
4091
4092         /*
4093          * maxdiv tracks the maximum possible absolute value of any div[] entry;
4094          * when this threatens to exceed INT_MAX, we take the time to propagate
4095          * carries.  To avoid overflow in maxdiv itself, it actually represents
4096          * the max possible abs. value divided by NBASE-1.
4097          */
4098         maxdiv = 1;
4099
4100         /*
4101          * Outer loop computes next quotient digit, which will go into div[qi]
4102          */
4103         for (qi = 0; qi < div_ndigits; qi++)
4104         {
4105                 /* Approximate the current dividend value */
4106                 fdividend = (double) div[qi];
4107                 for (i = 1; i < 4; i++)
4108                 {
4109                         fdividend *= NBASE;
4110                         if (qi + i <= div_ndigits)
4111                                 fdividend += (double) div[qi + i];
4112                 }
4113                 /* Compute the (approximate) quotient digit */
4114                 fquotient = fdividend * fdivisorinverse;
4115                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4116                         (((int) fquotient) - 1);        /* truncate towards -infinity */
4117
4118                 if (qdigit != 0)
4119                 {
4120                         /* Do we need to normalize now? */
4121                         maxdiv += Abs(qdigit);
4122                         if (maxdiv > INT_MAX / (NBASE - 1))
4123                         {
4124                                 /* Yes, do it */
4125                                 carry = 0;
4126                                 for (i = div_ndigits; i > qi; i--)
4127                                 {
4128                                         newdig = div[i] + carry;
4129                                         if (newdig < 0)
4130                                         {
4131                                                 carry = -((-newdig - 1) / NBASE) - 1;
4132                                                 newdig -= carry * NBASE;
4133                                         }
4134                                         else if (newdig >= NBASE)
4135                                         {
4136                                                 carry = newdig / NBASE;
4137                                                 newdig -= carry * NBASE;
4138                                         }
4139                                         else
4140                                                 carry = 0;
4141                                         div[i] = newdig;
4142                                 }
4143                                 newdig = div[qi] + carry;
4144                                 div[qi] = newdig;
4145
4146                                 /*
4147                                  * All the div[] digits except possibly div[qi] are now in the
4148                                  * range 0..NBASE-1.
4149                                  */
4150                                 maxdiv = Abs(newdig) / (NBASE - 1);
4151                                 maxdiv = Max(maxdiv, 1);
4152
4153                                 /*
4154                                  * Recompute the quotient digit since new info may have
4155                                  * propagated into the top four dividend digits
4156                                  */
4157                                 fdividend = (double) div[qi];
4158                                 for (i = 1; i < 4; i++)
4159                                 {
4160                                         fdividend *= NBASE;
4161                                         if (qi + i <= div_ndigits)
4162                                                 fdividend += (double) div[qi + i];
4163                                 }
4164                                 /* Compute the (approximate) quotient digit */
4165                                 fquotient = fdividend * fdivisorinverse;
4166                                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4167                                         (((int) fquotient) - 1);        /* truncate towards -infinity */
4168                                 maxdiv += Abs(qdigit);
4169                         }
4170
4171                         /* Subtract off the appropriate multiple of the divisor */
4172                         if (qdigit != 0)
4173                         {
4174                                 int                     istop = Min(var2ndigits, div_ndigits - qi + 1);
4175
4176                                 for (i = 0; i < istop; i++)
4177                                         div[qi + i] -= qdigit * var2digits[i];
4178                         }
4179                 }
4180
4181                 /*
4182                  * The dividend digit we are about to replace might still be nonzero.
4183                  * Fold it into the next digit position.  We don't need to worry about
4184                  * overflow here since this should nearly cancel with the subtraction
4185                  * of the divisor.
4186                  */
4187                 div[qi + 1] += div[qi] * NBASE;
4188
4189                 div[qi] = qdigit;
4190         }
4191
4192         /*
4193          * Approximate and store the last quotient digit (div[div_ndigits])
4194          */
4195         fdividend = (double) div[qi];
4196         for (i = 1; i < 4; i++)
4197                 fdividend *= NBASE;
4198         fquotient = fdividend * fdivisorinverse;
4199         qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4200                 (((int) fquotient) - 1);        /* truncate towards -infinity */
4201         div[qi] = qdigit;
4202
4203         /*
4204          * Now we do a final carry propagation pass to normalize the result, which
4205          * we combine with storing the result digits into the output. Note that
4206          * this is still done at full precision w/guard digits.
4207          */
4208         alloc_var(result, div_ndigits + 1);
4209         res_digits = result->digits;
4210         carry = 0;
4211         for (i = div_ndigits; i >= 0; i--)
4212         {
4213                 newdig = div[i] + carry;
4214                 if (newdig < 0)
4215                 {
4216                         carry = -((-newdig - 1) / NBASE) - 1;
4217                         newdig -= carry * NBASE;
4218                 }
4219                 else if (newdig >= NBASE)
4220                 {
4221                         carry = newdig / NBASE;
4222                         newdig -= carry * NBASE;
4223                 }
4224                 else
4225                         carry = 0;
4226                 res_digits[i] = newdig;
4227         }
4228         Assert(carry == 0);
4229
4230         pfree(div);
4231
4232         /*
4233          * Finally, round the result to the requested precision.
4234          */
4235         result->weight = res_weight;
4236         result->sign = res_sign;
4237
4238         /* Round to target rscale (and set result->dscale) */
4239         if (round)
4240                 round_var(result, rscale);
4241         else
4242                 trunc_var(result, rscale);
4243
4244         /* Strip leading and trailing zeroes */
4245         strip_var(result);
4246 }
4247
4248
4249 /*
4250  * Default scale selection for division
4251  *
4252  * Returns the appropriate result scale for the division result.
4253  */
4254 static int
4255 select_div_scale(NumericVar *var1, NumericVar *var2)
4256 {
4257         int                     weight1,
4258                                 weight2,
4259                                 qweight,
4260                                 i;
4261         NumericDigit firstdigit1,
4262                                 firstdigit2;
4263         int                     rscale;
4264
4265         /*
4266          * The result scale of a division isn't specified in any SQL standard. For
4267          * PostgreSQL we select a result scale that will give at least
4268          * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a
4269          * result no less accurate than float8; but use a scale not less than
4270          * either input's display scale.
4271          */
4272
4273         /* Get the actual (normalized) weight and first digit of each input */
4274
4275         weight1 = 0;                            /* values to use if var1 is zero */
4276         firstdigit1 = 0;
4277         for (i = 0; i < var1->ndigits; i++)
4278         {
4279                 firstdigit1 = var1->digits[i];
4280                 if (firstdigit1 != 0)
4281                 {
4282                         weight1 = var1->weight - i;
4283                         break;
4284                 }
4285         }
4286
4287         weight2 = 0;                            /* values to use if var2 is zero */
4288         firstdigit2 = 0;
4289         for (i = 0; i < var2->ndigits; i++)
4290         {
4291                 firstdigit2 = var2->digits[i];
4292                 if (firstdigit2 != 0)
4293                 {
4294                         weight2 = var2->weight - i;
4295                         break;
4296                 }
4297         }
4298
4299         /*
4300          * Estimate weight of quotient.  If the two first digits are equal, we
4301          * can't be sure, but assume that var1 is less than var2.
4302          */
4303         qweight = weight1 - weight2;
4304         if (firstdigit1 <= firstdigit2)
4305                 qweight--;
4306
4307         /* Select result scale */
4308         rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS;
4309         rscale = Max(rscale, var1->dscale);
4310         rscale = Max(rscale, var2->dscale);
4311         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4312         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4313
4314         return rscale;
4315 }
4316
4317
4318 /*
4319  * mod_var() -
4320  *
4321  *      Calculate the modulo of two numerics at variable level
4322  */
4323 static void
4324 mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
4325 {
4326         NumericVar      tmp;
4327         int                     rscale;
4328
4329         init_var(&tmp);
4330
4331         /* ---------
4332          * We do this using the equation
4333          *              mod(x,y) = x - trunc(x/y)*y
4334          * We set rscale the same way numeric_div and numeric_mul do
4335          * to get the right answer from the equation.  The final result,
4336          * however, need not be displayed to more precision than the inputs.
4337          * ----------
4338          */
4339         rscale = select_div_scale(var1, var2);
4340
4341         div_var(var1, var2, &tmp, rscale, false);
4342
4343         trunc_var(&tmp, 0);
4344
4345         mul_var(var2, &tmp, &tmp, var2->dscale + tmp.dscale);
4346
4347         sub_var(var1, &tmp, result);
4348
4349         round_var(result, Max(var1->dscale, var2->dscale));
4350
4351         free_var(&tmp);
4352 }
4353
4354
4355 /*
4356  * ceil_var() -
4357  *
4358  *      Return the smallest integer greater than or equal to the argument
4359  *      on variable level
4360  */
4361 static void
4362 ceil_var(NumericVar *var, NumericVar *result)
4363 {
4364         NumericVar      tmp;
4365
4366         init_var(&tmp);
4367         set_var_from_var(var, &tmp);
4368
4369         trunc_var(&tmp, 0);
4370
4371         if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
4372                 add_var(&tmp, &const_one, &tmp);
4373
4374         set_var_from_var(&tmp, result);
4375         free_var(&tmp);
4376 }
4377
4378
4379 /*
4380  * floor_var() -
4381  *
4382  *      Return the largest integer equal to or less than the argument
4383  *      on variable level
4384  */
4385 static void
4386 floor_var(NumericVar *var, NumericVar *result)
4387 {
4388         NumericVar      tmp;
4389
4390         init_var(&tmp);
4391         set_var_from_var(var, &tmp);
4392
4393         trunc_var(&tmp, 0);
4394
4395         if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
4396                 sub_var(&tmp, &const_one, &tmp);
4397
4398         set_var_from_var(&tmp, result);
4399         free_var(&tmp);
4400 }
4401
4402
4403 /*
4404  * sqrt_var() -
4405  *
4406  *      Compute the square root of x using Newton's algorithm
4407  */
4408 static void
4409 sqrt_var(NumericVar *arg, NumericVar *result, int rscale)
4410 {
4411         NumericVar      tmp_arg;
4412         NumericVar      tmp_val;
4413         NumericVar      last_val;
4414         int                     local_rscale;
4415         int                     stat;
4416
4417         local_rscale = rscale + 8;
4418
4419         stat = cmp_var(arg, &const_zero);
4420         if (stat == 0)
4421         {
4422                 zero_var(result);
4423                 result->dscale = rscale;
4424                 return;
4425         }
4426
4427         /*
4428          * SQL2003 defines sqrt() in terms of power, so we need to emit the right
4429          * SQLSTATE error code if the operand is negative.
4430          */
4431         if (stat < 0)
4432                 ereport(ERROR,
4433                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
4434                                  errmsg("cannot take square root of a negative number")));
4435
4436         init_var(&tmp_arg);
4437         init_var(&tmp_val);
4438         init_var(&last_val);
4439
4440         /* Copy arg in case it is the same var as result */
4441         set_var_from_var(arg, &tmp_arg);
4442
4443         /*
4444          * Initialize the result to the first guess
4445          */
4446         alloc_var(result, 1);
4447         result->digits[0] = tmp_arg.digits[0] / 2;
4448         if (result->digits[0] == 0)
4449                 result->digits[0] = 1;
4450         result->weight = tmp_arg.weight / 2;
4451         result->sign = NUMERIC_POS;
4452
4453         set_var_from_var(result, &last_val);
4454
4455         for (;;)
4456         {
4457                 div_var(&tmp_arg, result, &tmp_val, local_rscale, true);
4458
4459                 add_var(result, &tmp_val, result);
4460                 mul_var(result, &const_zero_point_five, result, local_rscale);
4461
4462                 if (cmp_var(&last_val, result) == 0)
4463                         break;
4464                 set_var_from_var(result, &last_val);
4465         }
4466
4467         free_var(&last_val);
4468         free_var(&tmp_val);
4469         free_var(&tmp_arg);
4470
4471         /* Round to requested precision */
4472         round_var(result, rscale);
4473 }
4474
4475
4476 /*
4477  * exp_var() -
4478  *
4479  *      Raise e to the power of x
4480  */
4481 static void
4482 exp_var(NumericVar *arg, NumericVar *result, int rscale)
4483 {
4484         NumericVar      x;
4485         int                     xintval;
4486         bool            xneg = FALSE;
4487         int                     local_rscale;
4488
4489         /*----------
4490          * We separate the integral and fraction parts of x, then compute
4491          *              e^x = e^xint * e^xfrac
4492          * where e = exp(1) and e^xfrac = exp(xfrac) are computed by
4493          * exp_var_internal; the limited range of inputs allows that routine
4494          * to do a good job with a simple Taylor series.  Raising e^xint is
4495          * done by repeated multiplications in power_var_int.
4496          *----------
4497          */
4498         init_var(&x);
4499
4500         set_var_from_var(arg, &x);
4501
4502         if (x.sign == NUMERIC_NEG)
4503         {
4504                 xneg = TRUE;
4505                 x.sign = NUMERIC_POS;
4506         }
4507
4508         /* Extract the integer part, remove it from x */
4509         xintval = 0;
4510         while (x.weight >= 0)
4511         {
4512                 xintval *= NBASE;
4513                 if (x.ndigits > 0)
4514                 {
4515                         xintval += x.digits[0];
4516                         x.digits++;
4517                         x.ndigits--;
4518                 }
4519                 x.weight--;
4520                 /* Guard against overflow */
4521                 if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3)
4522                         ereport(ERROR,
4523                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
4524                                          errmsg("argument for function \"exp\" too big")));
4525         }
4526
4527         /* Select an appropriate scale for internal calculation */
4528         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
4529
4530         /* Compute e^xfrac */
4531         exp_var_internal(&x, result, local_rscale);
4532
4533         /* If there's an integer part, multiply by e^xint */
4534         if (xintval > 0)
4535         {
4536                 NumericVar      e;
4537
4538                 init_var(&e);
4539                 exp_var_internal(&const_one, &e, local_rscale);
4540                 power_var_int(&e, xintval, &e, local_rscale);
4541                 mul_var(&e, result, result, local_rscale);
4542                 free_var(&e);
4543         }
4544
4545         /* Compensate for input sign, and round to requested rscale */
4546         if (xneg)
4547                 div_var(&const_one, result, result, rscale, true);
4548         else
4549                 round_var(result, rscale);
4550
4551         free_var(&x);
4552 }
4553
4554
4555 /*
4556  * exp_var_internal() -
4557  *
4558  *      Raise e to the power of x, where 0 <= x <= 1
4559  *
4560  * NB: the result should be good to at least rscale digits, but it has
4561  * *not* been rounded off; the caller must do that if wanted.
4562  */
4563 static void
4564 exp_var_internal(NumericVar *arg, NumericVar *result, int rscale)
4565 {
4566         NumericVar      x;
4567         NumericVar      xpow;
4568         NumericVar      ifac;
4569         NumericVar      elem;
4570         NumericVar      ni;
4571         int                     ndiv2 = 0;
4572         int                     local_rscale;
4573
4574         init_var(&x);
4575         init_var(&xpow);
4576         init_var(&ifac);
4577         init_var(&elem);
4578         init_var(&ni);
4579
4580         set_var_from_var(arg, &x);
4581
4582         Assert(x.sign == NUMERIC_POS);
4583
4584         local_rscale = rscale + 8;
4585
4586         /* Reduce input into range 0 <= x <= 0.01 */
4587         while (cmp_var(&x, &const_zero_point_01) > 0)
4588         {
4589                 ndiv2++;
4590                 local_rscale++;
4591                 mul_var(&x, &const_zero_point_five, &x, x.dscale + 1);
4592         }
4593
4594         /*
4595          * Use the Taylor series
4596          *
4597          * exp(x) = 1 + x + x^2/2! + x^3/3! + ...
4598          *
4599          * Given the limited range of x, this should converge reasonably quickly.
4600          * We run the series until the terms fall below the local_rscale limit.
4601          */
4602         add_var(&const_one, &x, result);
4603         set_var_from_var(&x, &xpow);
4604         set_var_from_var(&const_one, &ifac);
4605         set_var_from_var(&const_one, &ni);
4606
4607         for (;;)
4608         {
4609                 add_var(&ni, &const_one, &ni);
4610                 mul_var(&xpow, &x, &xpow, local_rscale);
4611                 mul_var(&ifac, &ni, &ifac, 0);
4612                 div_var(&xpow, &ifac, &elem, local_rscale, true);
4613
4614                 if (elem.ndigits == 0)
4615                         break;
4616
4617                 add_var(result, &elem, result);
4618         }
4619
4620         /* Compensate for argument range reduction */
4621         while (ndiv2-- > 0)
4622                 mul_var(result, result, result, local_rscale);
4623
4624         free_var(&x);
4625         free_var(&xpow);
4626         free_var(&ifac);
4627         free_var(&elem);
4628         free_var(&ni);
4629 }
4630
4631
4632 /*
4633  * ln_var() -
4634  *
4635  *      Compute the natural log of x
4636  */
4637 static void
4638 ln_var(NumericVar *arg, NumericVar *result, int rscale)
4639 {
4640         NumericVar      x;
4641         NumericVar      xx;
4642         NumericVar      ni;
4643         NumericVar      elem;
4644         NumericVar      fact;
4645         int                     local_rscale;
4646         int                     cmp;
4647
4648         cmp = cmp_var(arg, &const_zero);
4649         if (cmp == 0)
4650                 ereport(ERROR,
4651                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
4652                                  errmsg("cannot take logarithm of zero")));
4653         else if (cmp < 0)
4654                 ereport(ERROR,
4655                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
4656                                  errmsg("cannot take logarithm of a negative number")));
4657
4658         local_rscale = rscale + 8;
4659
4660         init_var(&x);
4661         init_var(&xx);
4662         init_var(&ni);
4663         init_var(&elem);
4664         init_var(&fact);
4665
4666         set_var_from_var(arg, &x);
4667         set_var_from_var(&const_two, &fact);
4668
4669         /* Reduce input into range 0.9 < x < 1.1 */
4670         while (cmp_var(&x, &const_zero_point_nine) <= 0)
4671         {
4672                 local_rscale++;
4673                 sqrt_var(&x, &x, local_rscale);
4674                 mul_var(&fact, &const_two, &fact, 0);
4675         }
4676         while (cmp_var(&x, &const_one_point_one) >= 0)
4677         {
4678                 local_rscale++;
4679                 sqrt_var(&x, &x, local_rscale);
4680                 mul_var(&fact, &const_two, &fact, 0);
4681         }
4682
4683         /*
4684          * We use the Taylor series for 0.5 * ln((1+z)/(1-z)),
4685          *
4686          * z + z^3/3 + z^5/5 + ...
4687          *
4688          * where z = (x-1)/(x+1) is in the range (approximately) -0.053 .. 0.048
4689          * due to the above range-reduction of x.
4690          *
4691          * The convergence of this is not as fast as one would like, but is
4692          * tolerable given that z is small.
4693          */
4694         sub_var(&x, &const_one, result);
4695         add_var(&x, &const_one, &elem);
4696         div_var(result, &elem, result, local_rscale, true);
4697         set_var_from_var(result, &xx);
4698         mul_var(result, result, &x, local_rscale);
4699
4700         set_var_from_var(&const_one, &ni);
4701
4702         for (;;)
4703         {
4704                 add_var(&ni, &const_two, &ni);
4705                 mul_var(&xx, &x, &xx, local_rscale);
4706                 div_var(&xx, &ni, &elem, local_rscale, true);
4707
4708                 if (elem.ndigits == 0)
4709                         break;
4710
4711                 add_var(result, &elem, result);
4712
4713                 if (elem.weight < (result->weight - local_rscale * 2 / DEC_DIGITS))
4714                         break;
4715         }
4716
4717         /* Compensate for argument range reduction, round to requested rscale */
4718         mul_var(result, &fact, result, rscale);
4719
4720         free_var(&x);
4721         free_var(&xx);
4722         free_var(&ni);
4723         free_var(&elem);
4724         free_var(&fact);
4725 }
4726
4727
4728 /*
4729  * log_var() -
4730  *
4731  *      Compute the logarithm of num in a given base.
4732  *
4733  *      Note: this routine chooses dscale of the result.
4734  */
4735 static void
4736 log_var(NumericVar *base, NumericVar *num, NumericVar *result)
4737 {
4738         NumericVar      ln_base;
4739         NumericVar      ln_num;
4740         int                     dec_digits;
4741         int                     rscale;
4742         int                     local_rscale;
4743
4744         init_var(&ln_base);
4745         init_var(&ln_num);
4746
4747         /* Set scale for ln() calculations --- compare numeric_ln() */
4748
4749         /* Approx decimal digits before decimal point */
4750         dec_digits = (num->weight + 1) * DEC_DIGITS;
4751
4752         if (dec_digits > 1)
4753                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
4754         else if (dec_digits < 1)
4755                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
4756         else
4757                 rscale = NUMERIC_MIN_SIG_DIGITS;
4758
4759         rscale = Max(rscale, base->dscale);
4760         rscale = Max(rscale, num->dscale);
4761         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4762         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4763
4764         local_rscale = rscale + 8;
4765
4766         /* Form natural logarithms */
4767         ln_var(base, &ln_base, local_rscale);
4768         ln_var(num, &ln_num, local_rscale);
4769
4770         ln_base.dscale = rscale;
4771         ln_num.dscale = rscale;
4772
4773         /* Select scale for division result */
4774         rscale = select_div_scale(&ln_num, &ln_base);
4775
4776         div_var(&ln_num, &ln_base, result, rscale, true);
4777
4778         free_var(&ln_num);
4779         free_var(&ln_base);
4780 }
4781
4782
4783 /*
4784  * power_var() -
4785  *
4786  *      Raise base to the power of exp
4787  *
4788  *      Note: this routine chooses dscale of the result.
4789  */
4790 static void
4791 power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
4792 {
4793         NumericVar      ln_base;
4794         NumericVar      ln_num;
4795         int                     dec_digits;
4796         int                     rscale;
4797         int                     local_rscale;
4798         double          val;
4799
4800         /* If exp can be represented as an integer, use power_var_int */
4801         if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1)
4802         {
4803                 /* exact integer, but does it fit in int? */
4804                 NumericVar      x;
4805                 int64           expval64;
4806
4807                 /* must copy because numericvar_to_int8() scribbles on input */
4808                 init_var(&x);
4809                 set_var_from_var(exp, &x);
4810                 if (numericvar_to_int8(&x, &expval64))
4811                 {
4812                         int                     expval = (int) expval64;
4813
4814                         /* Test for overflow by reverse-conversion. */
4815                         if ((int64) expval == expval64)
4816                         {
4817                                 /* Okay, select rscale */
4818                                 rscale = NUMERIC_MIN_SIG_DIGITS;
4819                                 rscale = Max(rscale, base->dscale);
4820                                 rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4821                                 rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4822
4823                                 power_var_int(base, expval, result, rscale);
4824
4825                                 free_var(&x);
4826                                 return;
4827                         }
4828                 }
4829                 free_var(&x);
4830         }
4831
4832         init_var(&ln_base);
4833         init_var(&ln_num);
4834
4835         /* Set scale for ln() calculation --- need extra accuracy here */
4836
4837         /* Approx decimal digits before decimal point */
4838         dec_digits = (base->weight + 1) * DEC_DIGITS;
4839
4840         if (dec_digits > 1)
4841                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(dec_digits - 1);
4842         else if (dec_digits < 1)
4843                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(1 - dec_digits);
4844         else
4845                 rscale = NUMERIC_MIN_SIG_DIGITS * 2;
4846
4847         rscale = Max(rscale, base->dscale * 2);
4848         rscale = Max(rscale, exp->dscale * 2);
4849         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2);
4850         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2);
4851
4852         local_rscale = rscale + 8;
4853
4854         ln_var(base, &ln_base, local_rscale);
4855
4856         mul_var(&ln_base, exp, &ln_num, local_rscale);
4857
4858         /* Set scale for exp() -- compare numeric_exp() */
4859
4860         /* convert input to float8, ignoring overflow */
4861         val = numericvar_to_double_no_overflow(&ln_num);
4862
4863         /*
4864          * log10(result) = num * log10(e), so this is approximately the weight:
4865          */
4866         val *= 0.434294481903252;
4867
4868         /* limit to something that won't cause integer overflow */
4869         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
4870         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
4871
4872         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
4873         rscale = Max(rscale, base->dscale);
4874         rscale = Max(rscale, exp->dscale);
4875         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4876         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4877
4878         exp_var(&ln_num, result, rscale);
4879
4880         free_var(&ln_num);
4881         free_var(&ln_base);
4882 }
4883
4884 /*
4885  * power_var_int() -
4886  *
4887  *      Raise base to the power of exp, where exp is an integer.
4888  */
4889 static void
4890 power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale)
4891 {
4892         bool            neg;
4893         NumericVar      base_prod;
4894         int                     local_rscale;
4895
4896         /* Detect some special cases, particularly 0^0. */
4897
4898         switch (exp)
4899         {
4900                 case 0:
4901                         if (base->ndigits == 0)
4902                                 ereport(ERROR,
4903                                                 (errcode(ERRCODE_FLOATING_POINT_EXCEPTION),
4904                                                  errmsg("zero raised to zero is undefined")));
4905                         set_var_from_var(&const_one, result);
4906                         result->dscale = rscale;        /* no need to round */
4907                         return;
4908                 case 1:
4909                         set_var_from_var(base, result);
4910                         round_var(result, rscale);
4911                         return;
4912                 case -1:
4913                         div_var(&const_one, base, result, rscale, true);
4914                         return;
4915                 case 2:
4916                         mul_var(base, base, result, rscale);
4917                         return;
4918                 default:
4919                         break;
4920         }
4921
4922         /*
4923          * The general case repeatedly multiplies base according to the bit
4924          * pattern of exp.      We do the multiplications with some extra precision.
4925          */
4926         neg = (exp < 0);
4927         exp = Abs(exp);
4928
4929         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
4930
4931         init_var(&base_prod);
4932         set_var_from_var(base, &base_prod);
4933
4934         if (exp & 1)
4935                 set_var_from_var(base, result);
4936         else
4937                 set_var_from_var(&const_one, result);
4938
4939         while ((exp >>= 1) > 0)
4940         {
4941                 mul_var(&base_prod, &base_prod, &base_prod, local_rscale);
4942                 if (exp & 1)
4943                         mul_var(&base_prod, result, result, local_rscale);
4944         }
4945
4946         free_var(&base_prod);
4947
4948         /* Compensate for input sign, and round to requested rscale */
4949         if (neg)
4950                 div_var(&const_one, result, result, rscale, true);
4951         else
4952                 round_var(result, rscale);
4953 }
4954
4955
4956 /* ----------------------------------------------------------------------
4957  *
4958  * Following are the lowest level functions that operate unsigned
4959  * on the variable level
4960  *
4961  * ----------------------------------------------------------------------
4962  */
4963
4964
4965 /* ----------
4966  * cmp_abs() -
4967  *
4968  *      Compare the absolute values of var1 and var2
4969  *      Returns:        -1 for ABS(var1) < ABS(var2)
4970  *                              0  for ABS(var1) == ABS(var2)
4971  *                              1  for ABS(var1) > ABS(var2)
4972  * ----------
4973  */
4974 static int
4975 cmp_abs(NumericVar *var1, NumericVar *var2)
4976 {
4977         return cmp_abs_common(var1->digits, var1->ndigits, var1->weight,
4978                                                   var2->digits, var2->ndigits, var2->weight);
4979 }
4980
4981 /* ----------
4982  * cmp_abs_common() -
4983  *
4984  *      Main routine of cmp_abs(). This function can be used by both
4985  *      NumericVar and Numeric.
4986  * ----------
4987  */
4988 static int
4989 cmp_abs_common(const NumericDigit *var1digits, int var1ndigits, int var1weight,
4990                          const NumericDigit *var2digits, int var2ndigits, int var2weight)
4991 {
4992         int                     i1 = 0;
4993         int                     i2 = 0;
4994
4995         /* Check any digits before the first common digit */
4996
4997         while (var1weight > var2weight && i1 < var1ndigits)
4998         {
4999                 if (var1digits[i1++] != 0)
5000                         return 1;
5001                 var1weight--;
5002         }
5003         while (var2weight > var1weight && i2 < var2ndigits)
5004         {
5005                 if (var2digits[i2++] != 0)
5006                         return -1;
5007                 var2weight--;
5008         }
5009
5010         /* At this point, either w1 == w2 or we've run out of digits */
5011
5012         if (var1weight == var2weight)
5013         {
5014                 while (i1 < var1ndigits && i2 < var2ndigits)
5015                 {
5016                         int                     stat = var1digits[i1++] - var2digits[i2++];
5017
5018                         if (stat)
5019                         {
5020                                 if (stat > 0)
5021                                         return 1;
5022                                 return -1;
5023                         }
5024                 }
5025         }
5026
5027         /*
5028          * At this point, we've run out of digits on one side or the other; so any
5029          * remaining nonzero digits imply that side is larger
5030          */
5031         while (i1 < var1ndigits)
5032         {
5033                 if (var1digits[i1++] != 0)
5034                         return 1;
5035         }
5036         while (i2 < var2ndigits)
5037         {
5038                 if (var2digits[i2++] != 0)
5039                         return -1;
5040         }
5041
5042         return 0;
5043 }
5044
5045
5046 /*
5047  * add_abs() -
5048  *
5049  *      Add the absolute values of two variables into result.
5050  *      result might point to one of the operands without danger.
5051  */
5052 static void
5053 add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5054 {
5055         NumericDigit *res_buf;
5056         NumericDigit *res_digits;
5057         int                     res_ndigits;
5058         int                     res_weight;
5059         int                     res_rscale,
5060                                 rscale1,
5061                                 rscale2;
5062         int                     res_dscale;
5063         int                     i,
5064                                 i1,
5065                                 i2;
5066         int                     carry = 0;
5067
5068         /* copy these values into local vars for speed in inner loop */
5069         int                     var1ndigits = var1->ndigits;
5070         int                     var2ndigits = var2->ndigits;
5071         NumericDigit *var1digits = var1->digits;
5072         NumericDigit *var2digits = var2->digits;
5073
5074         res_weight = Max(var1->weight, var2->weight) + 1;
5075
5076         res_dscale = Max(var1->dscale, var2->dscale);
5077
5078         /* Note: here we are figuring rscale in base-NBASE digits */
5079         rscale1 = var1->ndigits - var1->weight - 1;
5080         rscale2 = var2->ndigits - var2->weight - 1;
5081         res_rscale = Max(rscale1, rscale2);
5082
5083         res_ndigits = res_rscale + res_weight + 1;
5084         if (res_ndigits <= 0)
5085                 res_ndigits = 1;
5086
5087         res_buf = digitbuf_alloc(res_ndigits + 1);
5088         res_buf[0] = 0;                         /* spare digit for later rounding */
5089         res_digits = res_buf + 1;
5090
5091         i1 = res_rscale + var1->weight + 1;
5092         i2 = res_rscale + var2->weight + 1;
5093         for (i = res_ndigits - 1; i >= 0; i--)
5094         {
5095                 i1--;
5096                 i2--;
5097                 if (i1 >= 0 && i1 < var1ndigits)
5098                         carry += var1digits[i1];
5099                 if (i2 >= 0 && i2 < var2ndigits)
5100                         carry += var2digits[i2];
5101
5102                 if (carry >= NBASE)
5103                 {
5104                         res_digits[i] = carry - NBASE;
5105                         carry = 1;
5106                 }
5107                 else
5108                 {
5109                         res_digits[i] = carry;
5110                         carry = 0;
5111                 }
5112         }
5113
5114         Assert(carry == 0);                     /* else we failed to allow for carry out */
5115
5116         digitbuf_free(result->buf);
5117         result->ndigits = res_ndigits;
5118         result->buf = res_buf;
5119         result->digits = res_digits;
5120         result->weight = res_weight;
5121         result->dscale = res_dscale;
5122
5123         /* Remove leading/trailing zeroes */
5124         strip_var(result);
5125 }
5126
5127
5128 /*
5129  * sub_abs()
5130  *
5131  *      Subtract the absolute value of var2 from the absolute value of var1
5132  *      and store in result. result might point to one of the operands
5133  *      without danger.
5134  *
5135  *      ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!!
5136  */
5137 static void
5138 sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5139 {
5140         NumericDigit *res_buf;
5141         NumericDigit *res_digits;
5142         int                     res_ndigits;
5143         int                     res_weight;
5144         int                     res_rscale,
5145                                 rscale1,
5146                                 rscale2;
5147         int                     res_dscale;
5148         int                     i,
5149                                 i1,
5150                                 i2;
5151         int                     borrow = 0;
5152
5153         /* copy these values into local vars for speed in inner loop */
5154         int                     var1ndigits = var1->ndigits;
5155         int                     var2ndigits = var2->ndigits;
5156         NumericDigit *var1digits = var1->digits;
5157         NumericDigit *var2digits = var2->digits;
5158
5159         res_weight = var1->weight;
5160
5161         res_dscale = Max(var1->dscale, var2->dscale);
5162
5163         /* Note: here we are figuring rscale in base-NBASE digits */
5164         rscale1 = var1->ndigits - var1->weight - 1;
5165         rscale2 = var2->ndigits - var2->weight - 1;
5166         res_rscale = Max(rscale1, rscale2);
5167
5168         res_ndigits = res_rscale + res_weight + 1;
5169         if (res_ndigits <= 0)
5170                 res_ndigits = 1;
5171
5172         res_buf = digitbuf_alloc(res_ndigits + 1);
5173         res_buf[0] = 0;                         /* spare digit for later rounding */
5174         res_digits = res_buf + 1;
5175
5176         i1 = res_rscale + var1->weight + 1;
5177         i2 = res_rscale + var2->weight + 1;
5178         for (i = res_ndigits - 1; i >= 0; i--)
5179         {
5180                 i1--;
5181                 i2--;
5182                 if (i1 >= 0 && i1 < var1ndigits)
5183                         borrow += var1digits[i1];
5184                 if (i2 >= 0 && i2 < var2ndigits)
5185                         borrow -= var2digits[i2];
5186
5187                 if (borrow < 0)
5188                 {
5189                         res_digits[i] = borrow + NBASE;
5190                         borrow = -1;
5191                 }
5192                 else
5193                 {
5194                         res_digits[i] = borrow;
5195                         borrow = 0;
5196                 }
5197         }
5198
5199         Assert(borrow == 0);            /* else caller gave us var1 < var2 */
5200
5201         digitbuf_free(result->buf);
5202         result->ndigits = res_ndigits;
5203         result->buf = res_buf;
5204         result->digits = res_digits;
5205         result->weight = res_weight;
5206         result->dscale = res_dscale;
5207
5208         /* Remove leading/trailing zeroes */
5209         strip_var(result);
5210 }
5211
5212 /*
5213  * round_var
5214  *
5215  * Round the value of a variable to no more than rscale decimal digits
5216  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
5217  * rounding before the decimal point.
5218  */
5219 static void
5220 round_var(NumericVar *var, int rscale)
5221 {
5222         NumericDigit *digits = var->digits;
5223         int                     di;
5224         int                     ndigits;
5225         int                     carry;
5226
5227         var->dscale = rscale;
5228
5229         /* decimal digits wanted */
5230         di = (var->weight + 1) * DEC_DIGITS + rscale;
5231
5232         /*
5233          * If di = 0, the value loses all digits, but could round up to 1 if its
5234          * first extra digit is >= 5.  If di < 0 the result must be 0.
5235          */
5236         if (di < 0)
5237         {
5238                 var->ndigits = 0;
5239                 var->weight = 0;
5240                 var->sign = NUMERIC_POS;
5241         }
5242         else
5243         {
5244                 /* NBASE digits wanted */
5245                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5246
5247                 /* 0, or number of decimal digits to keep in last NBASE digit */
5248                 di %= DEC_DIGITS;
5249
5250                 if (ndigits < var->ndigits ||
5251                         (ndigits == var->ndigits && di > 0))
5252                 {
5253                         var->ndigits = ndigits;
5254
5255 #if DEC_DIGITS == 1
5256                         /* di must be zero */
5257                         carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5258 #else
5259                         if (di == 0)
5260                                 carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5261                         else
5262                         {
5263                                 /* Must round within last NBASE digit */
5264                                 int                     extra,
5265                                                         pow10;
5266
5267 #if DEC_DIGITS == 4
5268                                 pow10 = round_powers[di];
5269 #elif DEC_DIGITS == 2
5270                                 pow10 = 10;
5271 #else
5272 #error unsupported NBASE
5273 #endif
5274                                 extra = digits[--ndigits] % pow10;
5275                                 digits[ndigits] -= extra;
5276                                 carry = 0;
5277                                 if (extra >= pow10 / 2)
5278                                 {
5279                                         pow10 += digits[ndigits];
5280                                         if (pow10 >= NBASE)
5281                                         {
5282                                                 pow10 -= NBASE;
5283                                                 carry = 1;
5284                                         }
5285                                         digits[ndigits] = pow10;
5286                                 }
5287                         }
5288 #endif
5289
5290                         /* Propagate carry if needed */
5291                         while (carry)
5292                         {
5293                                 carry += digits[--ndigits];
5294                                 if (carry >= NBASE)
5295                                 {
5296                                         digits[ndigits] = carry - NBASE;
5297                                         carry = 1;
5298                                 }
5299                                 else
5300                                 {
5301                                         digits[ndigits] = carry;
5302                                         carry = 0;
5303                                 }
5304                         }
5305
5306                         if (ndigits < 0)
5307                         {
5308                                 Assert(ndigits == -1);  /* better not have added > 1 digit */
5309                                 Assert(var->digits > var->buf);
5310                                 var->digits--;
5311                                 var->ndigits++;
5312                                 var->weight++;
5313                         }
5314                 }
5315         }
5316 }
5317
5318 /*
5319  * trunc_var
5320  *
5321  * Truncate the value of a variable at rscale decimal digits after the
5322  * decimal point.  NOTE: we allow rscale < 0 here, implying
5323  * truncation before the decimal point.
5324  */
5325 static void
5326 trunc_var(NumericVar *var, int rscale)
5327 {
5328         int                     di;
5329         int                     ndigits;
5330
5331         var->dscale = rscale;
5332
5333         /* decimal digits wanted */
5334         di = (var->weight + 1) * DEC_DIGITS + rscale;
5335
5336         /*
5337          * If di <= 0, the value loses all digits.
5338          */
5339         if (di <= 0)
5340         {
5341                 var->ndigits = 0;
5342                 var->weight = 0;
5343                 var->sign = NUMERIC_POS;
5344         }
5345         else
5346         {
5347                 /* NBASE digits wanted */
5348                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5349
5350                 if (ndigits <= var->ndigits)
5351                 {
5352                         var->ndigits = ndigits;
5353
5354 #if DEC_DIGITS == 1
5355                         /* no within-digit stuff to worry about */
5356 #else
5357                         /* 0, or number of decimal digits to keep in last NBASE digit */
5358                         di %= DEC_DIGITS;
5359
5360                         if (di > 0)
5361                         {
5362                                 /* Must truncate within last NBASE digit */
5363                                 NumericDigit *digits = var->digits;
5364                                 int                     extra,
5365                                                         pow10;
5366
5367 #if DEC_DIGITS == 4
5368                                 pow10 = round_powers[di];
5369 #elif DEC_DIGITS == 2
5370                                 pow10 = 10;
5371 #else
5372 #error unsupported NBASE
5373 #endif
5374                                 extra = digits[--ndigits] % pow10;
5375                                 digits[ndigits] -= extra;
5376                         }
5377 #endif
5378                 }
5379         }
5380 }
5381
5382 /*
5383  * strip_var
5384  *
5385  * Strip any leading and trailing zeroes from a numeric variable
5386  */
5387 static void
5388 strip_var(NumericVar *var)
5389 {
5390         NumericDigit *digits = var->digits;
5391         int                     ndigits = var->ndigits;
5392
5393         /* Strip leading zeroes */
5394         while (ndigits > 0 && *digits == 0)
5395         {
5396                 digits++;
5397                 var->weight--;
5398                 ndigits--;
5399         }
5400
5401         /* Strip trailing zeroes */
5402         while (ndigits > 0 && digits[ndigits - 1] == 0)
5403                 ndigits--;
5404
5405         /* If it's zero, normalize the sign and weight */
5406         if (ndigits == 0)
5407         {
5408                 var->sign = NUMERIC_POS;
5409                 var->weight = 0;
5410         }
5411
5412         var->digits = digits;
5413         var->ndigits = ndigits;
5414 }