]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/numeric.c
Fix stddev_pop(numeric) and var_pop(numeric), which were incorrectly producing
[postgresql] / src / backend / utils / adt / numeric.c
1 /*-------------------------------------------------------------------------
2  *
3  * numeric.c
4  *        An exact numeric data type for the Postgres database system
5  *
6  * Original coding 1998, Jan Wieck.  Heavily revised 2003, Tom Lane.
7  *
8  * Many of the algorithmic ideas are borrowed from David M. Smith's "FM"
9  * multiple-precision math library, most recently published as Algorithm
10  * 786: Multiple-Precision Complex Arithmetic and Functions, ACM
11  * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998,
12  * pages 359-367.
13  *
14  * Copyright (c) 1998-2007, PostgreSQL Global Development Group
15  *
16  * IDENTIFICATION
17  *        $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.106 2007/07/09 16:13:57 tgl Exp $
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include <ctype.h>
25 #include <float.h>
26 #include <limits.h>
27 #include <math.h>
28
29 #include "access/hash.h"
30 #include "catalog/pg_type.h"
31 #include "libpq/pqformat.h"
32 #include "miscadmin.h"
33 #include "utils/array.h"
34 #include "utils/builtins.h"
35 #include "utils/int8.h"
36 #include "utils/numeric.h"
37
38 /* ----------
39  * Uncomment the following to enable compilation of dump_numeric()
40  * and dump_var() and to get a dump of any result produced by make_result().
41  * ----------
42 #define NUMERIC_DEBUG
43  */
44
45
46 /* ----------
47  * Local data types
48  *
49  * Numeric values are represented in a base-NBASE floating point format.
50  * Each "digit" ranges from 0 to NBASE-1.  The type NumericDigit is signed
51  * and wide enough to store a digit.  We assume that NBASE*NBASE can fit in
52  * an int.      Although the purely calculational routines could handle any even
53  * NBASE that's less than sqrt(INT_MAX), in practice we are only interested
54  * in NBASE a power of ten, so that I/O conversions and decimal rounding
55  * are easy.  Also, it's actually more efficient if NBASE is rather less than
56  * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var to
57  * postpone processing carries.
58  * ----------
59  */
60
61 #if 0
62 #define NBASE           10
63 #define HALF_NBASE      5
64 #define DEC_DIGITS      1                       /* decimal digits per NBASE digit */
65 #define MUL_GUARD_DIGITS        4       /* these are measured in NBASE digits */
66 #define DIV_GUARD_DIGITS        8
67
68 typedef signed char NumericDigit;
69 #endif
70
71 #if 0
72 #define NBASE           100
73 #define HALF_NBASE      50
74 #define DEC_DIGITS      2                       /* decimal digits per NBASE digit */
75 #define MUL_GUARD_DIGITS        3       /* these are measured in NBASE digits */
76 #define DIV_GUARD_DIGITS        6
77
78 typedef signed char NumericDigit;
79 #endif
80
81 #if 1
82 #define NBASE           10000
83 #define HALF_NBASE      5000
84 #define DEC_DIGITS      4                       /* decimal digits per NBASE digit */
85 #define MUL_GUARD_DIGITS        2       /* these are measured in NBASE digits */
86 #define DIV_GUARD_DIGITS        4
87
88 typedef int16 NumericDigit;
89 #endif
90
91
92 /* ----------
93  * The value represented by a NumericVar is determined by the sign, weight,
94  * ndigits, and digits[] array.
95  * Note: the first digit of a NumericVar's value is assumed to be multiplied
96  * by NBASE ** weight.  Another way to say it is that there are weight+1
97  * digits before the decimal point.  It is possible to have weight < 0.
98  *
99  * buf points at the physical start of the palloc'd digit buffer for the
100  * NumericVar.  digits points at the first digit in actual use (the one
101  * with the specified weight).  We normally leave an unused digit or two
102  * (preset to zeroes) between buf and digits, so that there is room to store
103  * a carry out of the top digit without special pushups.  We just need to
104  * decrement digits (and increment weight) to make room for the carry digit.
105  * (There is no such extra space in a numeric value stored in the database,
106  * only in a NumericVar in memory.)
107  *
108  * If buf is NULL then the digit buffer isn't actually palloc'd and should
109  * not be freed --- see the constants below for an example.
110  *
111  * dscale, or display scale, is the nominal precision expressed as number
112  * of digits after the decimal point (it must always be >= 0 at present).
113  * dscale may be more than the number of physically stored fractional digits,
114  * implying that we have suppressed storage of significant trailing zeroes.
115  * It should never be less than the number of stored digits, since that would
116  * imply hiding digits that are present.  NOTE that dscale is always expressed
117  * in *decimal* digits, and so it may correspond to a fractional number of
118  * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits.
119  *
120  * rscale, or result scale, is the target precision for a computation.
121  * Like dscale it is expressed as number of *decimal* digits after the decimal
122  * point, and is always >= 0 at present.
123  * Note that rscale is not stored in variables --- it's figured on-the-fly
124  * from the dscales of the inputs.
125  *
126  * NB: All the variable-level functions are written in a style that makes it
127  * possible to give one and the same variable as argument and destination.
128  * This is feasible because the digit buffer is separate from the variable.
129  * ----------
130  */
131 typedef struct NumericVar
132 {
133         int                     ndigits;                /* # of digits in digits[] - can be 0! */
134         int                     weight;                 /* weight of first digit */
135         int                     sign;                   /* NUMERIC_POS, NUMERIC_NEG, or NUMERIC_NAN */
136         int                     dscale;                 /* display scale */
137         NumericDigit *buf;                      /* start of palloc'd space for digits[] */
138         NumericDigit *digits;           /* base-NBASE digits */
139 } NumericVar;
140
141
142 /* ----------
143  * Some preinitialized constants
144  * ----------
145  */
146 static NumericDigit const_zero_data[1] = {0};
147 static NumericVar const_zero =
148 {0, 0, NUMERIC_POS, 0, NULL, const_zero_data};
149
150 static NumericDigit const_one_data[1] = {1};
151 static NumericVar const_one =
152 {1, 0, NUMERIC_POS, 0, NULL, const_one_data};
153
154 static NumericDigit const_two_data[1] = {2};
155 static NumericVar const_two =
156 {1, 0, NUMERIC_POS, 0, NULL, const_two_data};
157
158 #if DEC_DIGITS == 4
159 static NumericDigit const_zero_point_five_data[1] = {5000};
160 #elif DEC_DIGITS == 2
161 static NumericDigit const_zero_point_five_data[1] = {50};
162 #elif DEC_DIGITS == 1
163 static NumericDigit const_zero_point_five_data[1] = {5};
164 #endif
165 static NumericVar const_zero_point_five =
166 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data};
167
168 #if DEC_DIGITS == 4
169 static NumericDigit const_zero_point_nine_data[1] = {9000};
170 #elif DEC_DIGITS == 2
171 static NumericDigit const_zero_point_nine_data[1] = {90};
172 #elif DEC_DIGITS == 1
173 static NumericDigit const_zero_point_nine_data[1] = {9};
174 #endif
175 static NumericVar const_zero_point_nine =
176 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data};
177
178 #if DEC_DIGITS == 4
179 static NumericDigit const_zero_point_01_data[1] = {100};
180 static NumericVar const_zero_point_01 =
181 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
182 #elif DEC_DIGITS == 2
183 static NumericDigit const_zero_point_01_data[1] = {1};
184 static NumericVar const_zero_point_01 =
185 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
186 #elif DEC_DIGITS == 1
187 static NumericDigit const_zero_point_01_data[1] = {1};
188 static NumericVar const_zero_point_01 =
189 {1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
190 #endif
191
192 #if DEC_DIGITS == 4
193 static NumericDigit const_one_point_one_data[2] = {1, 1000};
194 #elif DEC_DIGITS == 2
195 static NumericDigit const_one_point_one_data[2] = {1, 10};
196 #elif DEC_DIGITS == 1
197 static NumericDigit const_one_point_one_data[2] = {1, 1};
198 #endif
199 static NumericVar const_one_point_one =
200 {2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data};
201
202 static NumericVar const_nan =
203 {0, 0, NUMERIC_NAN, 0, NULL, NULL};
204
205 #if DEC_DIGITS == 4
206 static const int round_powers[4] = {0, 1000, 100, 10};
207 #endif
208
209
210 /* ----------
211  * Local functions
212  * ----------
213  */
214
215 #ifdef NUMERIC_DEBUG
216 static void dump_numeric(const char *str, Numeric num);
217 static void dump_var(const char *str, NumericVar *var);
218 #else
219 #define dump_numeric(s,n)
220 #define dump_var(s,v)
221 #endif
222
223 #define digitbuf_alloc(ndigits)  \
224         ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit)))
225 #define digitbuf_free(buf)      \
226         do { \
227                  if ((buf) != NULL) \
228                          pfree(buf); \
229         } while (0)
230
231 #define init_var(v)             MemSetAligned(v, 0, sizeof(NumericVar))
232
233 #define NUMERIC_DIGITS(num) ((NumericDigit *)(num)->n_data)
234 #define NUMERIC_NDIGITS(num) \
235         ((VARSIZE(num) - NUMERIC_HDRSZ) / sizeof(NumericDigit))
236
237 static void alloc_var(NumericVar *var, int ndigits);
238 static void free_var(NumericVar *var);
239 static void zero_var(NumericVar *var);
240
241 static void set_var_from_str(const char *str, NumericVar *dest);
242 static void set_var_from_num(Numeric value, NumericVar *dest);
243 static void set_var_from_var(NumericVar *value, NumericVar *dest);
244 static char *get_str_from_var(NumericVar *var, int dscale);
245
246 static Numeric make_result(NumericVar *var);
247
248 static void apply_typmod(NumericVar *var, int32 typmod);
249
250 static int32 numericvar_to_int4(NumericVar *var);
251 static bool numericvar_to_int8(NumericVar *var, int64 *result);
252 static void int8_to_numericvar(int64 val, NumericVar *var);
253 static double numeric_to_double_no_overflow(Numeric num);
254 static double numericvar_to_double_no_overflow(NumericVar *var);
255
256 static int      cmp_numerics(Numeric num1, Numeric num2);
257 static int      cmp_var(NumericVar *var1, NumericVar *var2);
258 static int cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
259                            int var1weight, int var1sign,
260                            const NumericDigit *var2digits, int var2ndigits,
261                            int var2weight, int var2sign);
262 static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
263 static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
264 static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
265                 int rscale);
266 static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
267                 int rscale, bool round);
268 static int      select_div_scale(NumericVar *var1, NumericVar *var2);
269 static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
270 static void ceil_var(NumericVar *var, NumericVar *result);
271 static void floor_var(NumericVar *var, NumericVar *result);
272
273 static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale);
274 static void exp_var(NumericVar *arg, NumericVar *result, int rscale);
275 static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale);
276 static void ln_var(NumericVar *arg, NumericVar *result, int rscale);
277 static void log_var(NumericVar *base, NumericVar *num, NumericVar *result);
278 static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result);
279 static void power_var_int(NumericVar *base, int exp, NumericVar *result,
280                           int rscale);
281
282 static int      cmp_abs(NumericVar *var1, NumericVar *var2);
283 static int cmp_abs_common(const NumericDigit *var1digits, int var1ndigits,
284                            int var1weight,
285                            const NumericDigit *var2digits, int var2ndigits,
286                            int var2weight);
287 static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
288 static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
289 static void round_var(NumericVar *var, int rscale);
290 static void trunc_var(NumericVar *var, int rscale);
291 static void strip_var(NumericVar *var);
292 static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
293                            NumericVar *count_var, NumericVar *result_var);
294
295
296 /* ----------------------------------------------------------------------
297  *
298  * Input-, output- and rounding-functions
299  *
300  * ----------------------------------------------------------------------
301  */
302
303
304 /*
305  * numeric_in() -
306  *
307  *      Input function for numeric data type
308  */
309 Datum
310 numeric_in(PG_FUNCTION_ARGS)
311 {
312         char       *str = PG_GETARG_CSTRING(0);
313
314 #ifdef NOT_USED
315         Oid                     typelem = PG_GETARG_OID(1);
316 #endif
317         int32           typmod = PG_GETARG_INT32(2);
318         NumericVar      value;
319         Numeric         res;
320
321         /*
322          * Check for NaN
323          */
324         if (pg_strcasecmp(str, "NaN") == 0)
325                 PG_RETURN_NUMERIC(make_result(&const_nan));
326
327         /*
328          * Use set_var_from_str() to parse the input string and return it in the
329          * packed DB storage format
330          */
331         init_var(&value);
332         set_var_from_str(str, &value);
333
334         apply_typmod(&value, typmod);
335
336         res = make_result(&value);
337         free_var(&value);
338
339         PG_RETURN_NUMERIC(res);
340 }
341
342
343 /*
344  * numeric_out() -
345  *
346  *      Output function for numeric data type
347  */
348 Datum
349 numeric_out(PG_FUNCTION_ARGS)
350 {
351         Numeric         num = PG_GETARG_NUMERIC(0);
352         NumericVar      x;
353         char       *str;
354
355         /*
356          * Handle NaN
357          */
358         if (NUMERIC_IS_NAN(num))
359                 PG_RETURN_CSTRING(pstrdup("NaN"));
360
361         /*
362          * Get the number in the variable format.
363          *
364          * Even if we didn't need to change format, we'd still need to copy the
365          * value to have a modifiable copy for rounding.  set_var_from_num() also
366          * guarantees there is extra digit space in case we produce a carry out
367          * from rounding.
368          */
369         init_var(&x);
370         set_var_from_num(num, &x);
371
372         str = get_str_from_var(&x, x.dscale);
373
374         free_var(&x);
375
376         PG_RETURN_CSTRING(str);
377 }
378
379 /*
380  *              numeric_recv                    - converts external binary format to numeric
381  *
382  * External format is a sequence of int16's:
383  * ndigits, weight, sign, dscale, NumericDigits.
384  */
385 Datum
386 numeric_recv(PG_FUNCTION_ARGS)
387 {
388         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
389
390 #ifdef NOT_USED
391         Oid                     typelem = PG_GETARG_OID(1);
392 #endif
393         int32           typmod = PG_GETARG_INT32(2);
394         NumericVar      value;
395         Numeric         res;
396         int                     len,
397                                 i;
398
399         init_var(&value);
400
401         len = (uint16) pq_getmsgint(buf, sizeof(uint16));
402         if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE)
403                 ereport(ERROR,
404                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
405                                  errmsg("invalid length in external \"numeric\" value")));
406
407         alloc_var(&value, len);
408
409         value.weight = (int16) pq_getmsgint(buf, sizeof(int16));
410         value.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
411         if (!(value.sign == NUMERIC_POS ||
412                   value.sign == NUMERIC_NEG ||
413                   value.sign == NUMERIC_NAN))
414                 ereport(ERROR,
415                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
416                                  errmsg("invalid sign in external \"numeric\" value")));
417
418         value.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
419         for (i = 0; i < len; i++)
420         {
421                 NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
422
423                 if (d < 0 || d >= NBASE)
424                         ereport(ERROR,
425                                         (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
426                                          errmsg("invalid digit in external \"numeric\" value")));
427                 value.digits[i] = d;
428         }
429
430         apply_typmod(&value, typmod);
431
432         res = make_result(&value);
433         free_var(&value);
434
435         PG_RETURN_NUMERIC(res);
436 }
437
438 /*
439  *              numeric_send                    - converts numeric to binary format
440  */
441 Datum
442 numeric_send(PG_FUNCTION_ARGS)
443 {
444         Numeric         num = PG_GETARG_NUMERIC(0);
445         NumericVar      x;
446         StringInfoData buf;
447         int                     i;
448
449         init_var(&x);
450         set_var_from_num(num, &x);
451
452         pq_begintypsend(&buf);
453
454         pq_sendint(&buf, x.ndigits, sizeof(int16));
455         pq_sendint(&buf, x.weight, sizeof(int16));
456         pq_sendint(&buf, x.sign, sizeof(int16));
457         pq_sendint(&buf, x.dscale, sizeof(int16));
458         for (i = 0; i < x.ndigits; i++)
459                 pq_sendint(&buf, x.digits[i], sizeof(NumericDigit));
460
461         free_var(&x);
462
463         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
464 }
465
466
467 /*
468  * numeric() -
469  *
470  *      This is a special function called by the Postgres database system
471  *      before a value is stored in a tuple's attribute. The precision and
472  *      scale of the attribute have to be applied on the value.
473  */
474 Datum
475 numeric(PG_FUNCTION_ARGS)
476 {
477         Numeric         num = PG_GETARG_NUMERIC(0);
478         int32           typmod = PG_GETARG_INT32(1);
479         Numeric         new;
480         int32           tmp_typmod;
481         int                     precision;
482         int                     scale;
483         int                     ddigits;
484         int                     maxdigits;
485         NumericVar      var;
486
487         /*
488          * Handle NaN
489          */
490         if (NUMERIC_IS_NAN(num))
491                 PG_RETURN_NUMERIC(make_result(&const_nan));
492
493         /*
494          * If the value isn't a valid type modifier, simply return a copy of the
495          * input value
496          */
497         if (typmod < (int32) (VARHDRSZ))
498         {
499                 new = (Numeric) palloc(VARSIZE(num));
500                 memcpy(new, num, VARSIZE(num));
501                 PG_RETURN_NUMERIC(new);
502         }
503
504         /*
505          * Get the precision and scale out of the typmod value
506          */
507         tmp_typmod = typmod - VARHDRSZ;
508         precision = (tmp_typmod >> 16) & 0xffff;
509         scale = tmp_typmod & 0xffff;
510         maxdigits = precision - scale;
511
512         /*
513          * If the number is certainly in bounds and due to the target scale no
514          * rounding could be necessary, just make a copy of the input and modify
515          * its scale fields.  (Note we assume the existing dscale is honest...)
516          */
517         ddigits = (num->n_weight + 1) * DEC_DIGITS;
518         if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num))
519         {
520                 new = (Numeric) palloc(VARSIZE(num));
521                 memcpy(new, num, VARSIZE(num));
522                 new->n_sign_dscale = NUMERIC_SIGN(new) |
523                         ((uint16) scale & NUMERIC_DSCALE_MASK);
524                 PG_RETURN_NUMERIC(new);
525         }
526
527         /*
528          * We really need to fiddle with things - unpack the number into a
529          * variable and let apply_typmod() do it.
530          */
531         init_var(&var);
532
533         set_var_from_num(num, &var);
534         apply_typmod(&var, typmod);
535         new = make_result(&var);
536
537         free_var(&var);
538
539         PG_RETURN_NUMERIC(new);
540 }
541
542 Datum
543 numerictypmodin(PG_FUNCTION_ARGS)
544 {
545         ArrayType   *ta = PG_GETARG_ARRAYTYPE_P(0);
546         int32           *tl;
547         int                     n;
548         int32           typmod;
549
550         tl = ArrayGetIntegerTypmods(ta, &n);
551
552         if (n == 2)
553         {
554                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
555                         ereport(ERROR,
556                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
557                                          errmsg("NUMERIC precision %d must be between 1 and %d",
558                                                         tl[0], NUMERIC_MAX_PRECISION)));
559                 if (tl[1] < 0 || tl[1] > tl[0])
560                         ereport(ERROR,
561                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
562                                          errmsg("NUMERIC scale %d must be between 0 and precision %d",
563                                                         tl[1], tl[0])));
564                 typmod = ((tl[0] << 16) | tl[1]) + VARHDRSZ;
565         }
566         else if (n == 1)
567         {
568                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
569                         ereport(ERROR,
570                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
571                                          errmsg("NUMERIC precision %d must be between 1 and %d",
572                                                         tl[0], NUMERIC_MAX_PRECISION)));
573                 /* scale defaults to zero */
574                 typmod = (tl[0] << 16) + VARHDRSZ;
575         }
576         else
577         {
578                 ereport(ERROR,
579                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
580                                                 errmsg("invalid NUMERIC type modifier")));
581                 typmod = 0;                             /* keep compiler quiet */
582         }
583
584         PG_RETURN_INT32(typmod);
585 }
586
587 Datum
588 numerictypmodout(PG_FUNCTION_ARGS)
589 {
590         int32   typmod = PG_GETARG_INT32(0);
591         char   *res = (char *) palloc(64);
592
593         if (typmod >= 0)
594                 snprintf(res, 64, "(%d,%d)",
595                                  ((typmod - VARHDRSZ) >> 16) & 0xffff,
596                                  (typmod - VARHDRSZ) & 0xffff);
597         else
598                 *res = '\0';
599
600         PG_RETURN_CSTRING(res);
601 }
602
603
604 /* ----------------------------------------------------------------------
605  *
606  * Sign manipulation, rounding and the like
607  *
608  * ----------------------------------------------------------------------
609  */
610
611 Datum
612 numeric_abs(PG_FUNCTION_ARGS)
613 {
614         Numeric         num = PG_GETARG_NUMERIC(0);
615         Numeric         res;
616
617         /*
618          * Handle NaN
619          */
620         if (NUMERIC_IS_NAN(num))
621                 PG_RETURN_NUMERIC(make_result(&const_nan));
622
623         /*
624          * Do it the easy way directly on the packed format
625          */
626         res = (Numeric) palloc(VARSIZE(num));
627         memcpy(res, num, VARSIZE(num));
628
629         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
630
631         PG_RETURN_NUMERIC(res);
632 }
633
634
635 Datum
636 numeric_uminus(PG_FUNCTION_ARGS)
637 {
638         Numeric         num = PG_GETARG_NUMERIC(0);
639         Numeric         res;
640
641         /*
642          * Handle NaN
643          */
644         if (NUMERIC_IS_NAN(num))
645                 PG_RETURN_NUMERIC(make_result(&const_nan));
646
647         /*
648          * Do it the easy way directly on the packed format
649          */
650         res = (Numeric) palloc(VARSIZE(num));
651         memcpy(res, num, VARSIZE(num));
652
653         /*
654          * The packed format is known to be totally zero digit trimmed always. So
655          * we can identify a ZERO by the fact that there are no digits at all.  Do
656          * nothing to a zero.
657          */
658         if (VARSIZE(num) != NUMERIC_HDRSZ)
659         {
660                 /* Else, flip the sign */
661                 if (NUMERIC_SIGN(num) == NUMERIC_POS)
662                         res->n_sign_dscale = NUMERIC_NEG | NUMERIC_DSCALE(num);
663                 else
664                         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
665         }
666
667         PG_RETURN_NUMERIC(res);
668 }
669
670
671 Datum
672 numeric_uplus(PG_FUNCTION_ARGS)
673 {
674         Numeric         num = PG_GETARG_NUMERIC(0);
675         Numeric         res;
676
677         res = (Numeric) palloc(VARSIZE(num));
678         memcpy(res, num, VARSIZE(num));
679
680         PG_RETURN_NUMERIC(res);
681 }
682
683 /*
684  * numeric_sign() -
685  *
686  * returns -1 if the argument is less than 0, 0 if the argument is equal
687  * to 0, and 1 if the argument is greater than zero.
688  */
689 Datum
690 numeric_sign(PG_FUNCTION_ARGS)
691 {
692         Numeric         num = PG_GETARG_NUMERIC(0);
693         Numeric         res;
694         NumericVar      result;
695
696         /*
697          * Handle NaN
698          */
699         if (NUMERIC_IS_NAN(num))
700                 PG_RETURN_NUMERIC(make_result(&const_nan));
701
702         init_var(&result);
703
704         /*
705          * The packed format is known to be totally zero digit trimmed always. So
706          * we can identify a ZERO by the fact that there are no digits at all.
707          */
708         if (VARSIZE(num) == NUMERIC_HDRSZ)
709                 set_var_from_var(&const_zero, &result);
710         else
711         {
712                 /*
713                  * And if there are some, we return a copy of ONE with the sign of our
714                  * argument
715                  */
716                 set_var_from_var(&const_one, &result);
717                 result.sign = NUMERIC_SIGN(num);
718         }
719
720         res = make_result(&result);
721         free_var(&result);
722
723         PG_RETURN_NUMERIC(res);
724 }
725
726
727 /*
728  * numeric_round() -
729  *
730  *      Round a value to have 'scale' digits after the decimal point.
731  *      We allow negative 'scale', implying rounding before the decimal
732  *      point --- Oracle interprets rounding that way.
733  */
734 Datum
735 numeric_round(PG_FUNCTION_ARGS)
736 {
737         Numeric         num = PG_GETARG_NUMERIC(0);
738         int32           scale = PG_GETARG_INT32(1);
739         Numeric         res;
740         NumericVar      arg;
741
742         /*
743          * Handle NaN
744          */
745         if (NUMERIC_IS_NAN(num))
746                 PG_RETURN_NUMERIC(make_result(&const_nan));
747
748         /*
749          * Limit the scale value to avoid possible overflow in calculations
750          */
751         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
752         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
753
754         /*
755          * Unpack the argument and round it at the proper digit position
756          */
757         init_var(&arg);
758         set_var_from_num(num, &arg);
759
760         round_var(&arg, scale);
761
762         /* We don't allow negative output dscale */
763         if (scale < 0)
764                 arg.dscale = 0;
765
766         /*
767          * Return the rounded result
768          */
769         res = make_result(&arg);
770
771         free_var(&arg);
772         PG_RETURN_NUMERIC(res);
773 }
774
775
776 /*
777  * numeric_trunc() -
778  *
779  *      Truncate a value to have 'scale' digits after the decimal point.
780  *      We allow negative 'scale', implying a truncation before the decimal
781  *      point --- Oracle interprets truncation that way.
782  */
783 Datum
784 numeric_trunc(PG_FUNCTION_ARGS)
785 {
786         Numeric         num = PG_GETARG_NUMERIC(0);
787         int32           scale = PG_GETARG_INT32(1);
788         Numeric         res;
789         NumericVar      arg;
790
791         /*
792          * Handle NaN
793          */
794         if (NUMERIC_IS_NAN(num))
795                 PG_RETURN_NUMERIC(make_result(&const_nan));
796
797         /*
798          * Limit the scale value to avoid possible overflow in calculations
799          */
800         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
801         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
802
803         /*
804          * Unpack the argument and truncate it at the proper digit position
805          */
806         init_var(&arg);
807         set_var_from_num(num, &arg);
808
809         trunc_var(&arg, scale);
810
811         /* We don't allow negative output dscale */
812         if (scale < 0)
813                 arg.dscale = 0;
814
815         /*
816          * Return the truncated result
817          */
818         res = make_result(&arg);
819
820         free_var(&arg);
821         PG_RETURN_NUMERIC(res);
822 }
823
824
825 /*
826  * numeric_ceil() -
827  *
828  *      Return the smallest integer greater than or equal to the argument
829  */
830 Datum
831 numeric_ceil(PG_FUNCTION_ARGS)
832 {
833         Numeric         num = PG_GETARG_NUMERIC(0);
834         Numeric         res;
835         NumericVar      result;
836
837         if (NUMERIC_IS_NAN(num))
838                 PG_RETURN_NUMERIC(make_result(&const_nan));
839
840         init_var(&result);
841
842         set_var_from_num(num, &result);
843         ceil_var(&result, &result);
844
845         res = make_result(&result);
846         free_var(&result);
847
848         PG_RETURN_NUMERIC(res);
849 }
850
851
852 /*
853  * numeric_floor() -
854  *
855  *      Return the largest integer equal to or less than the argument
856  */
857 Datum
858 numeric_floor(PG_FUNCTION_ARGS)
859 {
860         Numeric         num = PG_GETARG_NUMERIC(0);
861         Numeric         res;
862         NumericVar      result;
863
864         if (NUMERIC_IS_NAN(num))
865                 PG_RETURN_NUMERIC(make_result(&const_nan));
866
867         init_var(&result);
868
869         set_var_from_num(num, &result);
870         floor_var(&result, &result);
871
872         res = make_result(&result);
873         free_var(&result);
874
875         PG_RETURN_NUMERIC(res);
876 }
877
878 /*
879  * Implements the numeric version of the width_bucket() function
880  * defined by SQL2003. See also width_bucket_float8().
881  *
882  * 'bound1' and 'bound2' are the lower and upper bounds of the
883  * histogram's range, respectively. 'count' is the number of buckets
884  * in the histogram. width_bucket() returns an integer indicating the
885  * bucket number that 'operand' belongs to in an equiwidth histogram
886  * with the specified characteristics. An operand smaller than the
887  * lower bound is assigned to bucket 0. An operand greater than the
888  * upper bound is assigned to an additional bucket (with number
889  * count+1). We don't allow "NaN" for any of the numeric arguments.
890  */
891 Datum
892 width_bucket_numeric(PG_FUNCTION_ARGS)
893 {
894         Numeric         operand = PG_GETARG_NUMERIC(0);
895         Numeric         bound1 = PG_GETARG_NUMERIC(1);
896         Numeric         bound2 = PG_GETARG_NUMERIC(2);
897         int32           count = PG_GETARG_INT32(3);
898         NumericVar      count_var;
899         NumericVar      result_var;
900         int32           result;
901
902         if (count <= 0)
903                 ereport(ERROR,
904                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
905                                  errmsg("count must be greater than zero")));
906
907         if (NUMERIC_IS_NAN(operand) ||
908                 NUMERIC_IS_NAN(bound1) ||
909                 NUMERIC_IS_NAN(bound2))
910                 ereport(ERROR,
911                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
912                                  errmsg("operand, lower bound and upper bound cannot be NaN")));
913
914         init_var(&result_var);
915         init_var(&count_var);
916
917         /* Convert 'count' to a numeric, for ease of use later */
918         int8_to_numericvar((int64) count, &count_var);
919
920         switch (cmp_numerics(bound1, bound2))
921         {
922                 case 0:
923                         ereport(ERROR,
924                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
925                                  errmsg("lower bound cannot equal upper bound")));
926
927                         /* bound1 < bound2 */
928                 case -1:
929                         if (cmp_numerics(operand, bound1) < 0)
930                                 set_var_from_var(&const_zero, &result_var);
931                         else if (cmp_numerics(operand, bound2) >= 0)
932                                 add_var(&count_var, &const_one, &result_var);
933                         else
934                                 compute_bucket(operand, bound1, bound2,
935                                                            &count_var, &result_var);
936                         break;
937
938                         /* bound1 > bound2 */
939                 case 1:
940                         if (cmp_numerics(operand, bound1) > 0)
941                                 set_var_from_var(&const_zero, &result_var);
942                         else if (cmp_numerics(operand, bound2) <= 0)
943                                 add_var(&count_var, &const_one, &result_var);
944                         else
945                                 compute_bucket(operand, bound1, bound2,
946                                                            &count_var, &result_var);
947                         break;
948         }
949
950         /* if result exceeds the range of a legal int4, we ereport here */
951         result = numericvar_to_int4(&result_var);
952
953         free_var(&count_var);
954         free_var(&result_var);
955
956         PG_RETURN_INT32(result);
957 }
958
959 /*
960  * If 'operand' is not outside the bucket range, determine the correct
961  * bucket for it to go. The calculations performed by this function
962  * are derived directly from the SQL2003 spec.
963  */
964 static void
965 compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
966                            NumericVar *count_var, NumericVar *result_var)
967 {
968         NumericVar      bound1_var;
969         NumericVar      bound2_var;
970         NumericVar      operand_var;
971
972         init_var(&bound1_var);
973         init_var(&bound2_var);
974         init_var(&operand_var);
975
976         set_var_from_num(bound1, &bound1_var);
977         set_var_from_num(bound2, &bound2_var);
978         set_var_from_num(operand, &operand_var);
979
980         if (cmp_var(&bound1_var, &bound2_var) < 0)
981         {
982                 sub_var(&operand_var, &bound1_var, &operand_var);
983                 sub_var(&bound2_var, &bound1_var, &bound2_var);
984                 div_var(&operand_var, &bound2_var, result_var,
985                                 select_div_scale(&operand_var, &bound2_var), true);
986         }
987         else
988         {
989                 sub_var(&bound1_var, &operand_var, &operand_var);
990                 sub_var(&bound1_var, &bound2_var, &bound1_var);
991                 div_var(&operand_var, &bound1_var, result_var,
992                                 select_div_scale(&operand_var, &bound1_var), true);
993         }
994
995         mul_var(result_var, count_var, result_var,
996                         result_var->dscale + count_var->dscale);
997         add_var(result_var, &const_one, result_var);
998         floor_var(result_var, result_var);
999
1000         free_var(&bound1_var);
1001         free_var(&bound2_var);
1002         free_var(&operand_var);
1003 }
1004
1005 /* ----------------------------------------------------------------------
1006  *
1007  * Comparison functions
1008  *
1009  * Note: btree indexes need these routines not to leak memory; therefore,
1010  * be careful to free working copies of toasted datums.  Most places don't
1011  * need to be so careful.
1012  * ----------------------------------------------------------------------
1013  */
1014
1015
1016 Datum
1017 numeric_cmp(PG_FUNCTION_ARGS)
1018 {
1019         Numeric         num1 = PG_GETARG_NUMERIC(0);
1020         Numeric         num2 = PG_GETARG_NUMERIC(1);
1021         int                     result;
1022
1023         result = cmp_numerics(num1, num2);
1024
1025         PG_FREE_IF_COPY(num1, 0);
1026         PG_FREE_IF_COPY(num2, 1);
1027
1028         PG_RETURN_INT32(result);
1029 }
1030
1031
1032 Datum
1033 numeric_eq(PG_FUNCTION_ARGS)
1034 {
1035         Numeric         num1 = PG_GETARG_NUMERIC(0);
1036         Numeric         num2 = PG_GETARG_NUMERIC(1);
1037         bool            result;
1038
1039         result = cmp_numerics(num1, num2) == 0;
1040
1041         PG_FREE_IF_COPY(num1, 0);
1042         PG_FREE_IF_COPY(num2, 1);
1043
1044         PG_RETURN_BOOL(result);
1045 }
1046
1047 Datum
1048 numeric_ne(PG_FUNCTION_ARGS)
1049 {
1050         Numeric         num1 = PG_GETARG_NUMERIC(0);
1051         Numeric         num2 = PG_GETARG_NUMERIC(1);
1052         bool            result;
1053
1054         result = cmp_numerics(num1, num2) != 0;
1055
1056         PG_FREE_IF_COPY(num1, 0);
1057         PG_FREE_IF_COPY(num2, 1);
1058
1059         PG_RETURN_BOOL(result);
1060 }
1061
1062 Datum
1063 numeric_gt(PG_FUNCTION_ARGS)
1064 {
1065         Numeric         num1 = PG_GETARG_NUMERIC(0);
1066         Numeric         num2 = PG_GETARG_NUMERIC(1);
1067         bool            result;
1068
1069         result = cmp_numerics(num1, num2) > 0;
1070
1071         PG_FREE_IF_COPY(num1, 0);
1072         PG_FREE_IF_COPY(num2, 1);
1073
1074         PG_RETURN_BOOL(result);
1075 }
1076
1077 Datum
1078 numeric_ge(PG_FUNCTION_ARGS)
1079 {
1080         Numeric         num1 = PG_GETARG_NUMERIC(0);
1081         Numeric         num2 = PG_GETARG_NUMERIC(1);
1082         bool            result;
1083
1084         result = cmp_numerics(num1, num2) >= 0;
1085
1086         PG_FREE_IF_COPY(num1, 0);
1087         PG_FREE_IF_COPY(num2, 1);
1088
1089         PG_RETURN_BOOL(result);
1090 }
1091
1092 Datum
1093 numeric_lt(PG_FUNCTION_ARGS)
1094 {
1095         Numeric         num1 = PG_GETARG_NUMERIC(0);
1096         Numeric         num2 = PG_GETARG_NUMERIC(1);
1097         bool            result;
1098
1099         result = cmp_numerics(num1, num2) < 0;
1100
1101         PG_FREE_IF_COPY(num1, 0);
1102         PG_FREE_IF_COPY(num2, 1);
1103
1104         PG_RETURN_BOOL(result);
1105 }
1106
1107 Datum
1108 numeric_le(PG_FUNCTION_ARGS)
1109 {
1110         Numeric         num1 = PG_GETARG_NUMERIC(0);
1111         Numeric         num2 = PG_GETARG_NUMERIC(1);
1112         bool            result;
1113
1114         result = cmp_numerics(num1, num2) <= 0;
1115
1116         PG_FREE_IF_COPY(num1, 0);
1117         PG_FREE_IF_COPY(num2, 1);
1118
1119         PG_RETURN_BOOL(result);
1120 }
1121
1122 static int
1123 cmp_numerics(Numeric num1, Numeric num2)
1124 {
1125         int                     result;
1126
1127         /*
1128          * We consider all NANs to be equal and larger than any non-NAN. This is
1129          * somewhat arbitrary; the important thing is to have a consistent sort
1130          * order.
1131          */
1132         if (NUMERIC_IS_NAN(num1))
1133         {
1134                 if (NUMERIC_IS_NAN(num2))
1135                         result = 0;                     /* NAN = NAN */
1136                 else
1137                         result = 1;                     /* NAN > non-NAN */
1138         }
1139         else if (NUMERIC_IS_NAN(num2))
1140         {
1141                 result = -1;                    /* non-NAN < NAN */
1142         }
1143         else
1144         {
1145                 result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
1146                                                                 num1->n_weight, NUMERIC_SIGN(num1),
1147                                                                 NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
1148                                                                 num2->n_weight, NUMERIC_SIGN(num2));
1149         }
1150
1151         return result;
1152 }
1153
1154 Datum
1155 hash_numeric(PG_FUNCTION_ARGS)
1156 {
1157         Numeric         key = PG_GETARG_NUMERIC(0);
1158         Datum           digit_hash;
1159         Datum           result;
1160         int             weight;
1161         int             start_offset;
1162         int             end_offset;
1163         int             i;
1164         int             hash_len;
1165
1166         /* If it's NaN, don't try to hash the rest of the fields */
1167         if (NUMERIC_IS_NAN(key))
1168                 PG_RETURN_UINT32(0);
1169
1170         weight           = key->n_weight;
1171         start_offset = 0;
1172         end_offset       = 0;
1173
1174         /*
1175          * Omit any leading or trailing zeros from the input to the
1176          * hash. The numeric implementation *should* guarantee that
1177          * leading and trailing zeros are suppressed, but we're
1178          * paranoid. Note that we measure the starting and ending offsets
1179          * in units of NumericDigits, not bytes.
1180          */
1181         for (i = 0; i < NUMERIC_NDIGITS(key); i++)
1182         {
1183                 if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
1184                         break;
1185
1186                 start_offset++;
1187                 /*
1188                  * The weight is effectively the # of digits before the
1189                  * decimal point, so decrement it for each leading zero we
1190                  * skip.
1191                  */
1192                 weight--;
1193         }
1194
1195         /*
1196          * If there are no non-zero digits, then the value of the number
1197          * is zero, regardless of any other fields.
1198          */
1199         if (NUMERIC_NDIGITS(key) == start_offset)
1200                 PG_RETURN_UINT32(-1);
1201
1202         for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--)
1203         {
1204                 if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
1205                         break;
1206
1207                 end_offset++;
1208         }
1209
1210         /* If we get here, there should be at least one non-zero digit */
1211         Assert(start_offset + end_offset < NUMERIC_NDIGITS(key));
1212
1213         /*
1214          * Note that we don't hash on the Numeric's scale, since two
1215          * numerics can compare equal but have different scales. We also
1216          * don't hash on the sign, although we could: since a sign
1217          * difference implies inequality, this shouldn't affect correctness.
1218          */
1219         hash_len = NUMERIC_NDIGITS(key) - start_offset - end_offset;
1220         digit_hash = hash_any((unsigned char *) (NUMERIC_DIGITS(key) + start_offset),
1221                           hash_len * sizeof(NumericDigit));
1222
1223         /* Mix in the weight, via XOR */
1224         result = digit_hash ^ weight;
1225
1226         PG_RETURN_DATUM(result);
1227 }
1228
1229
1230 /* ----------------------------------------------------------------------
1231  *
1232  * Basic arithmetic functions
1233  *
1234  * ----------------------------------------------------------------------
1235  */
1236
1237
1238 /*
1239  * numeric_add() -
1240  *
1241  *      Add two numerics
1242  */
1243 Datum
1244 numeric_add(PG_FUNCTION_ARGS)
1245 {
1246         Numeric         num1 = PG_GETARG_NUMERIC(0);
1247         Numeric         num2 = PG_GETARG_NUMERIC(1);
1248         NumericVar      arg1;
1249         NumericVar      arg2;
1250         NumericVar      result;
1251         Numeric         res;
1252
1253         /*
1254          * Handle NaN
1255          */
1256         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1257                 PG_RETURN_NUMERIC(make_result(&const_nan));
1258
1259         /*
1260          * Unpack the values, let add_var() compute the result and return it.
1261          */
1262         init_var(&arg1);
1263         init_var(&arg2);
1264         init_var(&result);
1265
1266         set_var_from_num(num1, &arg1);
1267         set_var_from_num(num2, &arg2);
1268
1269         add_var(&arg1, &arg2, &result);
1270
1271         res = make_result(&result);
1272
1273         free_var(&arg1);
1274         free_var(&arg2);
1275         free_var(&result);
1276
1277         PG_RETURN_NUMERIC(res);
1278 }
1279
1280
1281 /*
1282  * numeric_sub() -
1283  *
1284  *      Subtract one numeric from another
1285  */
1286 Datum
1287 numeric_sub(PG_FUNCTION_ARGS)
1288 {
1289         Numeric         num1 = PG_GETARG_NUMERIC(0);
1290         Numeric         num2 = PG_GETARG_NUMERIC(1);
1291         NumericVar      arg1;
1292         NumericVar      arg2;
1293         NumericVar      result;
1294         Numeric         res;
1295
1296         /*
1297          * Handle NaN
1298          */
1299         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1300                 PG_RETURN_NUMERIC(make_result(&const_nan));
1301
1302         /*
1303          * Unpack the values, let sub_var() compute the result and return it.
1304          */
1305         init_var(&arg1);
1306         init_var(&arg2);
1307         init_var(&result);
1308
1309         set_var_from_num(num1, &arg1);
1310         set_var_from_num(num2, &arg2);
1311
1312         sub_var(&arg1, &arg2, &result);
1313
1314         res = make_result(&result);
1315
1316         free_var(&arg1);
1317         free_var(&arg2);
1318         free_var(&result);
1319
1320         PG_RETURN_NUMERIC(res);
1321 }
1322
1323
1324 /*
1325  * numeric_mul() -
1326  *
1327  *      Calculate the product of two numerics
1328  */
1329 Datum
1330 numeric_mul(PG_FUNCTION_ARGS)
1331 {
1332         Numeric         num1 = PG_GETARG_NUMERIC(0);
1333         Numeric         num2 = PG_GETARG_NUMERIC(1);
1334         NumericVar      arg1;
1335         NumericVar      arg2;
1336         NumericVar      result;
1337         Numeric         res;
1338
1339         /*
1340          * Handle NaN
1341          */
1342         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1343                 PG_RETURN_NUMERIC(make_result(&const_nan));
1344
1345         /*
1346          * Unpack the values, let mul_var() compute the result and return it.
1347          * Unlike add_var() and sub_var(), mul_var() will round its result. In the
1348          * case of numeric_mul(), which is invoked for the * operator on numerics,
1349          * we request exact representation for the product (rscale = sum(dscale of
1350          * arg1, dscale of arg2)).
1351          */
1352         init_var(&arg1);
1353         init_var(&arg2);
1354         init_var(&result);
1355
1356         set_var_from_num(num1, &arg1);
1357         set_var_from_num(num2, &arg2);
1358
1359         mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
1360
1361         res = make_result(&result);
1362
1363         free_var(&arg1);
1364         free_var(&arg2);
1365         free_var(&result);
1366
1367         PG_RETURN_NUMERIC(res);
1368 }
1369
1370
1371 /*
1372  * numeric_div() -
1373  *
1374  *      Divide one numeric into another
1375  */
1376 Datum
1377 numeric_div(PG_FUNCTION_ARGS)
1378 {
1379         Numeric         num1 = PG_GETARG_NUMERIC(0);
1380         Numeric         num2 = PG_GETARG_NUMERIC(1);
1381         NumericVar      arg1;
1382         NumericVar      arg2;
1383         NumericVar      result;
1384         Numeric         res;
1385         int                     rscale;
1386
1387         /*
1388          * Handle NaN
1389          */
1390         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1391                 PG_RETURN_NUMERIC(make_result(&const_nan));
1392
1393         /*
1394          * Unpack the arguments
1395          */
1396         init_var(&arg1);
1397         init_var(&arg2);
1398         init_var(&result);
1399
1400         set_var_from_num(num1, &arg1);
1401         set_var_from_num(num2, &arg2);
1402
1403         /*
1404          * Select scale for division result
1405          */
1406         rscale = select_div_scale(&arg1, &arg2);
1407
1408         /*
1409          * Do the divide and return the result
1410          */
1411         div_var(&arg1, &arg2, &result, rscale, true);
1412
1413         res = make_result(&result);
1414
1415         free_var(&arg1);
1416         free_var(&arg2);
1417         free_var(&result);
1418
1419         PG_RETURN_NUMERIC(res);
1420 }
1421
1422
1423 /*
1424  * numeric_mod() -
1425  *
1426  *      Calculate the modulo of two numerics
1427  */
1428 Datum
1429 numeric_mod(PG_FUNCTION_ARGS)
1430 {
1431         Numeric         num1 = PG_GETARG_NUMERIC(0);
1432         Numeric         num2 = PG_GETARG_NUMERIC(1);
1433         Numeric         res;
1434         NumericVar      arg1;
1435         NumericVar      arg2;
1436         NumericVar      result;
1437
1438         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1439                 PG_RETURN_NUMERIC(make_result(&const_nan));
1440
1441         init_var(&arg1);
1442         init_var(&arg2);
1443         init_var(&result);
1444
1445         set_var_from_num(num1, &arg1);
1446         set_var_from_num(num2, &arg2);
1447
1448         mod_var(&arg1, &arg2, &result);
1449
1450         res = make_result(&result);
1451
1452         free_var(&result);
1453         free_var(&arg2);
1454         free_var(&arg1);
1455
1456         PG_RETURN_NUMERIC(res);
1457 }
1458
1459
1460 /*
1461  * numeric_inc() -
1462  *
1463  *      Increment a number by one
1464  */
1465 Datum
1466 numeric_inc(PG_FUNCTION_ARGS)
1467 {
1468         Numeric         num = PG_GETARG_NUMERIC(0);
1469         NumericVar      arg;
1470         Numeric         res;
1471
1472         /*
1473          * Handle NaN
1474          */
1475         if (NUMERIC_IS_NAN(num))
1476                 PG_RETURN_NUMERIC(make_result(&const_nan));
1477
1478         /*
1479          * Compute the result and return it
1480          */
1481         init_var(&arg);
1482
1483         set_var_from_num(num, &arg);
1484
1485         add_var(&arg, &const_one, &arg);
1486
1487         res = make_result(&arg);
1488
1489         free_var(&arg);
1490
1491         PG_RETURN_NUMERIC(res);
1492 }
1493
1494
1495 /*
1496  * numeric_smaller() -
1497  *
1498  *      Return the smaller of two numbers
1499  */
1500 Datum
1501 numeric_smaller(PG_FUNCTION_ARGS)
1502 {
1503         Numeric         num1 = PG_GETARG_NUMERIC(0);
1504         Numeric         num2 = PG_GETARG_NUMERIC(1);
1505
1506         /*
1507          * Use cmp_numerics so that this will agree with the comparison operators,
1508          * particularly as regards comparisons involving NaN.
1509          */
1510         if (cmp_numerics(num1, num2) < 0)
1511                 PG_RETURN_NUMERIC(num1);
1512         else
1513                 PG_RETURN_NUMERIC(num2);
1514 }
1515
1516
1517 /*
1518  * numeric_larger() -
1519  *
1520  *      Return the larger of two numbers
1521  */
1522 Datum
1523 numeric_larger(PG_FUNCTION_ARGS)
1524 {
1525         Numeric         num1 = PG_GETARG_NUMERIC(0);
1526         Numeric         num2 = PG_GETARG_NUMERIC(1);
1527
1528         /*
1529          * Use cmp_numerics so that this will agree with the comparison operators,
1530          * particularly as regards comparisons involving NaN.
1531          */
1532         if (cmp_numerics(num1, num2) > 0)
1533                 PG_RETURN_NUMERIC(num1);
1534         else
1535                 PG_RETURN_NUMERIC(num2);
1536 }
1537
1538
1539 /* ----------------------------------------------------------------------
1540  *
1541  * Advanced math functions
1542  *
1543  * ----------------------------------------------------------------------
1544  */
1545
1546 /*
1547  * numeric_fac()
1548  *
1549  * Compute factorial
1550  */
1551 Datum
1552 numeric_fac(PG_FUNCTION_ARGS)
1553 {
1554         int64           num = PG_GETARG_INT64(0);
1555         Numeric         res;
1556         NumericVar      fact;
1557         NumericVar      result;
1558
1559         if (num <= 1)
1560         {
1561                 res = make_result(&const_one);
1562                 PG_RETURN_NUMERIC(res);
1563         }
1564         /* Fail immediately if the result would overflow */
1565         if (num > 32177)
1566                 ereport(ERROR,
1567                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1568                                  errmsg("value overflows numeric format")));
1569
1570         init_var(&fact);
1571         init_var(&result);
1572
1573         int8_to_numericvar(num, &result);
1574
1575         for (num = num - 1; num > 1; num--)
1576         {
1577                 /* this loop can take awhile, so allow it to be interrupted */
1578                 CHECK_FOR_INTERRUPTS();
1579
1580                 int8_to_numericvar(num, &fact);
1581
1582                 mul_var(&result, &fact, &result, 0);
1583         }
1584
1585         res = make_result(&result);
1586
1587         free_var(&fact);
1588         free_var(&result);
1589
1590         PG_RETURN_NUMERIC(res);
1591 }
1592
1593
1594 /*
1595  * numeric_sqrt() -
1596  *
1597  *      Compute the square root of a numeric.
1598  */
1599 Datum
1600 numeric_sqrt(PG_FUNCTION_ARGS)
1601 {
1602         Numeric         num = PG_GETARG_NUMERIC(0);
1603         Numeric         res;
1604         NumericVar      arg;
1605         NumericVar      result;
1606         int                     sweight;
1607         int                     rscale;
1608
1609         /*
1610          * Handle NaN
1611          */
1612         if (NUMERIC_IS_NAN(num))
1613                 PG_RETURN_NUMERIC(make_result(&const_nan));
1614
1615         /*
1616          * Unpack the argument and determine the result scale.  We choose a scale
1617          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1618          * case not less than the input's dscale.
1619          */
1620         init_var(&arg);
1621         init_var(&result);
1622
1623         set_var_from_num(num, &arg);
1624
1625         /* Assume the input was normalized, so arg.weight is accurate */
1626         sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
1627
1628         rscale = NUMERIC_MIN_SIG_DIGITS - sweight;
1629         rscale = Max(rscale, arg.dscale);
1630         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1631         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1632
1633         /*
1634          * Let sqrt_var() do the calculation and return the result.
1635          */
1636         sqrt_var(&arg, &result, rscale);
1637
1638         res = make_result(&result);
1639
1640         free_var(&result);
1641         free_var(&arg);
1642
1643         PG_RETURN_NUMERIC(res);
1644 }
1645
1646
1647 /*
1648  * numeric_exp() -
1649  *
1650  *      Raise e to the power of x
1651  */
1652 Datum
1653 numeric_exp(PG_FUNCTION_ARGS)
1654 {
1655         Numeric         num = PG_GETARG_NUMERIC(0);
1656         Numeric         res;
1657         NumericVar      arg;
1658         NumericVar      result;
1659         int                     rscale;
1660         double          val;
1661
1662         /*
1663          * Handle NaN
1664          */
1665         if (NUMERIC_IS_NAN(num))
1666                 PG_RETURN_NUMERIC(make_result(&const_nan));
1667
1668         /*
1669          * Unpack the argument and determine the result scale.  We choose a scale
1670          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1671          * case not less than the input's dscale.
1672          */
1673         init_var(&arg);
1674         init_var(&result);
1675
1676         set_var_from_num(num, &arg);
1677
1678         /* convert input to float8, ignoring overflow */
1679         val = numericvar_to_double_no_overflow(&arg);
1680
1681         /*
1682          * log10(result) = num * log10(e), so this is approximately the decimal
1683          * weight of the result:
1684          */
1685         val *= 0.434294481903252;
1686
1687         /* limit to something that won't cause integer overflow */
1688         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
1689         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
1690
1691         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
1692         rscale = Max(rscale, arg.dscale);
1693         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1694         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1695
1696         /*
1697          * Let exp_var() do the calculation and return the result.
1698          */
1699         exp_var(&arg, &result, rscale);
1700
1701         res = make_result(&result);
1702
1703         free_var(&result);
1704         free_var(&arg);
1705
1706         PG_RETURN_NUMERIC(res);
1707 }
1708
1709
1710 /*
1711  * numeric_ln() -
1712  *
1713  *      Compute the natural logarithm of x
1714  */
1715 Datum
1716 numeric_ln(PG_FUNCTION_ARGS)
1717 {
1718         Numeric         num = PG_GETARG_NUMERIC(0);
1719         Numeric         res;
1720         NumericVar      arg;
1721         NumericVar      result;
1722         int                     dec_digits;
1723         int                     rscale;
1724
1725         /*
1726          * Handle NaN
1727          */
1728         if (NUMERIC_IS_NAN(num))
1729                 PG_RETURN_NUMERIC(make_result(&const_nan));
1730
1731         init_var(&arg);
1732         init_var(&result);
1733
1734         set_var_from_num(num, &arg);
1735
1736         /* Approx decimal digits before decimal point */
1737         dec_digits = (arg.weight + 1) * DEC_DIGITS;
1738
1739         if (dec_digits > 1)
1740                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
1741         else if (dec_digits < 1)
1742                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
1743         else
1744                 rscale = NUMERIC_MIN_SIG_DIGITS;
1745
1746         rscale = Max(rscale, arg.dscale);
1747         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1748         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1749
1750         ln_var(&arg, &result, rscale);
1751
1752         res = make_result(&result);
1753
1754         free_var(&result);
1755         free_var(&arg);
1756
1757         PG_RETURN_NUMERIC(res);
1758 }
1759
1760
1761 /*
1762  * numeric_log() -
1763  *
1764  *      Compute the logarithm of x in a given base
1765  */
1766 Datum
1767 numeric_log(PG_FUNCTION_ARGS)
1768 {
1769         Numeric         num1 = PG_GETARG_NUMERIC(0);
1770         Numeric         num2 = PG_GETARG_NUMERIC(1);
1771         Numeric         res;
1772         NumericVar      arg1;
1773         NumericVar      arg2;
1774         NumericVar      result;
1775
1776         /*
1777          * Handle NaN
1778          */
1779         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1780                 PG_RETURN_NUMERIC(make_result(&const_nan));
1781
1782         /*
1783          * Initialize things
1784          */
1785         init_var(&arg1);
1786         init_var(&arg2);
1787         init_var(&result);
1788
1789         set_var_from_num(num1, &arg1);
1790         set_var_from_num(num2, &arg2);
1791
1792         /*
1793          * Call log_var() to compute and return the result; note it handles scale
1794          * selection itself.
1795          */
1796         log_var(&arg1, &arg2, &result);
1797
1798         res = make_result(&result);
1799
1800         free_var(&result);
1801         free_var(&arg2);
1802         free_var(&arg1);
1803
1804         PG_RETURN_NUMERIC(res);
1805 }
1806
1807
1808 /*
1809  * numeric_power() -
1810  *
1811  *      Raise b to the power of x
1812  */
1813 Datum
1814 numeric_power(PG_FUNCTION_ARGS)
1815 {
1816         Numeric         num1 = PG_GETARG_NUMERIC(0);
1817         Numeric         num2 = PG_GETARG_NUMERIC(1);
1818         Numeric         res;
1819         NumericVar      arg1;
1820         NumericVar      arg2;
1821         NumericVar      arg2_trunc;
1822         NumericVar      result;
1823
1824         /*
1825          * Handle NaN
1826          */
1827         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1828                 PG_RETURN_NUMERIC(make_result(&const_nan));
1829
1830         /*
1831          * Initialize things
1832          */
1833         init_var(&arg1);
1834         init_var(&arg2);
1835         init_var(&arg2_trunc);
1836         init_var(&result);
1837
1838         set_var_from_num(num1, &arg1);
1839         set_var_from_num(num2, &arg2);
1840         set_var_from_var(&arg2, &arg2_trunc);
1841
1842         trunc_var(&arg2_trunc, 0);
1843
1844         /*
1845          * Return special SQLSTATE error codes for a few conditions mandated by
1846          * the standard.
1847          */
1848         if ((cmp_var(&arg1, &const_zero) == 0 &&
1849                  cmp_var(&arg2, &const_zero) < 0) ||
1850                 (cmp_var(&arg1, &const_zero) < 0 &&
1851                  cmp_var(&arg2, &arg2_trunc) != 0))
1852                 ereport(ERROR,
1853                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
1854                                  errmsg("invalid argument for power function")));
1855
1856         /*
1857          * Call power_var() to compute and return the result; note it handles
1858          * scale selection itself.
1859          */
1860         power_var(&arg1, &arg2, &result);
1861
1862         res = make_result(&result);
1863
1864         free_var(&result);
1865         free_var(&arg2);
1866         free_var(&arg2_trunc);
1867         free_var(&arg1);
1868
1869         PG_RETURN_NUMERIC(res);
1870 }
1871
1872
1873 /* ----------------------------------------------------------------------
1874  *
1875  * Type conversion functions
1876  *
1877  * ----------------------------------------------------------------------
1878  */
1879
1880
1881 Datum
1882 int4_numeric(PG_FUNCTION_ARGS)
1883 {
1884         int32           val = PG_GETARG_INT32(0);
1885         Numeric         res;
1886         NumericVar      result;
1887
1888         init_var(&result);
1889
1890         int8_to_numericvar((int64) val, &result);
1891
1892         res = make_result(&result);
1893
1894         free_var(&result);
1895
1896         PG_RETURN_NUMERIC(res);
1897 }
1898
1899
1900 Datum
1901 numeric_int4(PG_FUNCTION_ARGS)
1902 {
1903         Numeric         num = PG_GETARG_NUMERIC(0);
1904         NumericVar      x;
1905         int32           result;
1906
1907         /* XXX would it be better to return NULL? */
1908         if (NUMERIC_IS_NAN(num))
1909                 ereport(ERROR,
1910                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1911                                  errmsg("cannot convert NaN to integer")));
1912
1913         /* Convert to variable format, then convert to int4 */
1914         init_var(&x);
1915         set_var_from_num(num, &x);
1916         result = numericvar_to_int4(&x);
1917         free_var(&x);
1918         PG_RETURN_INT32(result);
1919 }
1920
1921 /*
1922  * Given a NumericVar, convert it to an int32. If the NumericVar
1923  * exceeds the range of an int32, raise the appropriate error via
1924  * ereport(). The input NumericVar is *not* free'd.
1925  */
1926 static int32
1927 numericvar_to_int4(NumericVar *var)
1928 {
1929         int32           result;
1930         int64           val;
1931
1932         if (!numericvar_to_int8(var, &val))
1933                 ereport(ERROR,
1934                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1935                                  errmsg("integer out of range")));
1936
1937         /* Down-convert to int4 */
1938         result = (int32) val;
1939
1940         /* Test for overflow by reverse-conversion. */
1941         if ((int64) result != val)
1942                 ereport(ERROR,
1943                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1944                                  errmsg("integer out of range")));
1945
1946         return result;
1947 }
1948
1949 Datum
1950 int8_numeric(PG_FUNCTION_ARGS)
1951 {
1952         int64           val = PG_GETARG_INT64(0);
1953         Numeric         res;
1954         NumericVar      result;
1955
1956         init_var(&result);
1957
1958         int8_to_numericvar(val, &result);
1959
1960         res = make_result(&result);
1961
1962         free_var(&result);
1963
1964         PG_RETURN_NUMERIC(res);
1965 }
1966
1967
1968 Datum
1969 numeric_int8(PG_FUNCTION_ARGS)
1970 {
1971         Numeric         num = PG_GETARG_NUMERIC(0);
1972         NumericVar      x;
1973         int64           result;
1974
1975         /* XXX would it be better to return NULL? */
1976         if (NUMERIC_IS_NAN(num))
1977                 ereport(ERROR,
1978                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1979                                  errmsg("cannot convert NaN to bigint")));
1980
1981         /* Convert to variable format and thence to int8 */
1982         init_var(&x);
1983         set_var_from_num(num, &x);
1984
1985         if (!numericvar_to_int8(&x, &result))
1986                 ereport(ERROR,
1987                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1988                                  errmsg("bigint out of range")));
1989
1990         free_var(&x);
1991
1992         PG_RETURN_INT64(result);
1993 }
1994
1995
1996 Datum
1997 int2_numeric(PG_FUNCTION_ARGS)
1998 {
1999         int16           val = PG_GETARG_INT16(0);
2000         Numeric         res;
2001         NumericVar      result;
2002
2003         init_var(&result);
2004
2005         int8_to_numericvar((int64) val, &result);
2006
2007         res = make_result(&result);
2008
2009         free_var(&result);
2010
2011         PG_RETURN_NUMERIC(res);
2012 }
2013
2014
2015 Datum
2016 numeric_int2(PG_FUNCTION_ARGS)
2017 {
2018         Numeric         num = PG_GETARG_NUMERIC(0);
2019         NumericVar      x;
2020         int64           val;
2021         int16           result;
2022
2023         /* XXX would it be better to return NULL? */
2024         if (NUMERIC_IS_NAN(num))
2025                 ereport(ERROR,
2026                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2027                                  errmsg("cannot convert NaN to smallint")));
2028
2029         /* Convert to variable format and thence to int8 */
2030         init_var(&x);
2031         set_var_from_num(num, &x);
2032
2033         if (!numericvar_to_int8(&x, &val))
2034                 ereport(ERROR,
2035                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2036                                  errmsg("smallint out of range")));
2037
2038         free_var(&x);
2039
2040         /* Down-convert to int2 */
2041         result = (int16) val;
2042
2043         /* Test for overflow by reverse-conversion. */
2044         if ((int64) result != val)
2045                 ereport(ERROR,
2046                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2047                                  errmsg("smallint out of range")));
2048
2049         PG_RETURN_INT16(result);
2050 }
2051
2052
2053 Datum
2054 float8_numeric(PG_FUNCTION_ARGS)
2055 {
2056         float8          val = PG_GETARG_FLOAT8(0);
2057         Numeric         res;
2058         NumericVar      result;
2059         char            buf[DBL_DIG + 100];
2060
2061         if (isnan(val))
2062                 PG_RETURN_NUMERIC(make_result(&const_nan));
2063
2064         sprintf(buf, "%.*g", DBL_DIG, val);
2065
2066         init_var(&result);
2067
2068         set_var_from_str(buf, &result);
2069         res = make_result(&result);
2070
2071         free_var(&result);
2072
2073         PG_RETURN_NUMERIC(res);
2074 }
2075
2076
2077 Datum
2078 numeric_float8(PG_FUNCTION_ARGS)
2079 {
2080         Numeric         num = PG_GETARG_NUMERIC(0);
2081         char       *tmp;
2082         Datum           result;
2083
2084         if (NUMERIC_IS_NAN(num))
2085                 PG_RETURN_FLOAT8(get_float8_nan());
2086
2087         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2088                                                                                           NumericGetDatum(num)));
2089
2090         result = DirectFunctionCall1(float8in, CStringGetDatum(tmp));
2091
2092         pfree(tmp);
2093
2094         PG_RETURN_DATUM(result);
2095 }
2096
2097
2098 /* Convert numeric to float8; if out of range, return +/- HUGE_VAL */
2099 Datum
2100 numeric_float8_no_overflow(PG_FUNCTION_ARGS)
2101 {
2102         Numeric         num = PG_GETARG_NUMERIC(0);
2103         double          val;
2104
2105         if (NUMERIC_IS_NAN(num))
2106                 PG_RETURN_FLOAT8(get_float8_nan());
2107
2108         val = numeric_to_double_no_overflow(num);
2109
2110         PG_RETURN_FLOAT8(val);
2111 }
2112
2113 Datum
2114 float4_numeric(PG_FUNCTION_ARGS)
2115 {
2116         float4          val = PG_GETARG_FLOAT4(0);
2117         Numeric         res;
2118         NumericVar      result;
2119         char            buf[FLT_DIG + 100];
2120
2121         if (isnan(val))
2122                 PG_RETURN_NUMERIC(make_result(&const_nan));
2123
2124         sprintf(buf, "%.*g", FLT_DIG, val);
2125
2126         init_var(&result);
2127
2128         set_var_from_str(buf, &result);
2129         res = make_result(&result);
2130
2131         free_var(&result);
2132
2133         PG_RETURN_NUMERIC(res);
2134 }
2135
2136
2137 Datum
2138 numeric_float4(PG_FUNCTION_ARGS)
2139 {
2140         Numeric         num = PG_GETARG_NUMERIC(0);
2141         char       *tmp;
2142         Datum           result;
2143
2144         if (NUMERIC_IS_NAN(num))
2145                 PG_RETURN_FLOAT4(get_float4_nan());
2146
2147         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2148                                                                                           NumericGetDatum(num)));
2149
2150         result = DirectFunctionCall1(float4in, CStringGetDatum(tmp));
2151
2152         pfree(tmp);
2153
2154         PG_RETURN_DATUM(result);
2155 }
2156
2157
2158 /* ----------------------------------------------------------------------
2159  *
2160  * Aggregate functions
2161  *
2162  * The transition datatype for all these aggregates is a 3-element array
2163  * of Numeric, holding the values N, sum(X), sum(X*X) in that order.
2164  *
2165  * We represent N as a numeric mainly to avoid having to build a special
2166  * datatype; it's unlikely it'd overflow an int4, but ...
2167  *
2168  * ----------------------------------------------------------------------
2169  */
2170
2171 static ArrayType *
2172 do_numeric_accum(ArrayType *transarray, Numeric newval)
2173 {
2174         Datum      *transdatums;
2175         int                     ndatums;
2176         Datum           N,
2177                                 sumX,
2178                                 sumX2;
2179         ArrayType  *result;
2180
2181         /* We assume the input is array of numeric */
2182         deconstruct_array(transarray,
2183                                           NUMERICOID, -1, false, 'i',
2184                                           &transdatums, NULL, &ndatums);
2185         if (ndatums != 3)
2186                 elog(ERROR, "expected 3-element numeric array");
2187         N = transdatums[0];
2188         sumX = transdatums[1];
2189         sumX2 = transdatums[2];
2190
2191         N = DirectFunctionCall1(numeric_inc, N);
2192         sumX = DirectFunctionCall2(numeric_add, sumX,
2193                                                            NumericGetDatum(newval));
2194         sumX2 = DirectFunctionCall2(numeric_add, sumX2,
2195                                                                 DirectFunctionCall2(numeric_mul,
2196                                                                                                         NumericGetDatum(newval),
2197                                                                                                         NumericGetDatum(newval)));
2198
2199         transdatums[0] = N;
2200         transdatums[1] = sumX;
2201         transdatums[2] = sumX2;
2202
2203         result = construct_array(transdatums, 3,
2204                                                          NUMERICOID, -1, false, 'i');
2205
2206         return result;
2207 }
2208
2209 /*
2210  * Improve avg performance by not caclulating sum(X*X).
2211  */
2212 static ArrayType *
2213 do_numeric_avg_accum(ArrayType *transarray, Numeric newval)
2214 {
2215         Datum      *transdatums;
2216         int                     ndatums;
2217         Datum           N,
2218                                 sumX;
2219         ArrayType  *result;
2220
2221         /* We assume the input is array of numeric */
2222         deconstruct_array(transarray,
2223                                           NUMERICOID, -1, false, 'i',
2224                                           &transdatums, NULL, &ndatums);
2225         if (ndatums != 2)
2226                 elog(ERROR, "expected 2-element numeric array");
2227         N = transdatums[0];
2228         sumX = transdatums[1];
2229
2230         N = DirectFunctionCall1(numeric_inc, N);
2231         sumX = DirectFunctionCall2(numeric_add, sumX,
2232                                                            NumericGetDatum(newval));
2233
2234         transdatums[0] = N;
2235         transdatums[1] = sumX;
2236
2237         result = construct_array(transdatums, 2,
2238                                                          NUMERICOID, -1, false, 'i');
2239
2240         return result;
2241 }
2242
2243 Datum
2244 numeric_accum(PG_FUNCTION_ARGS)
2245 {
2246         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2247         Numeric         newval = PG_GETARG_NUMERIC(1);
2248
2249         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2250 }
2251
2252 /*
2253  * Optimized case for average of numeric.
2254  */
2255 Datum
2256 numeric_avg_accum(PG_FUNCTION_ARGS)
2257 {
2258         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2259         Numeric         newval = PG_GETARG_NUMERIC(1);
2260
2261         PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
2262 }
2263
2264 /*
2265  * Integer data types all use Numeric accumulators to share code and
2266  * avoid risk of overflow.      For int2 and int4 inputs, Numeric accumulation
2267  * is overkill for the N and sum(X) values, but definitely not overkill
2268  * for the sum(X*X) value.      Hence, we use int2_accum and int4_accum only
2269  * for stddev/variance --- there are faster special-purpose accumulator
2270  * routines for SUM and AVG of these datatypes.
2271  */
2272
2273 Datum
2274 int2_accum(PG_FUNCTION_ARGS)
2275 {
2276         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2277         Datum           newval2 = PG_GETARG_DATUM(1);
2278         Numeric         newval;
2279
2280         newval = DatumGetNumeric(DirectFunctionCall1(int2_numeric, newval2));
2281
2282         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2283 }
2284
2285 Datum
2286 int4_accum(PG_FUNCTION_ARGS)
2287 {
2288         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2289         Datum           newval4 = PG_GETARG_DATUM(1);
2290         Numeric         newval;
2291
2292         newval = DatumGetNumeric(DirectFunctionCall1(int4_numeric, newval4));
2293
2294         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2295 }
2296
2297 Datum
2298 int8_accum(PG_FUNCTION_ARGS)
2299 {
2300         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2301         Datum           newval8 = PG_GETARG_DATUM(1);
2302         Numeric         newval;
2303
2304         newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
2305
2306         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2307 }
2308
2309 /*
2310  * Optimized case for average of int8.
2311  */
2312 Datum
2313 int8_avg_accum(PG_FUNCTION_ARGS)
2314 {
2315         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2316         Datum           newval8 = PG_GETARG_DATUM(1);
2317         Numeric         newval;
2318
2319         newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
2320
2321         PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
2322 }
2323
2324
2325 Datum
2326 numeric_avg(PG_FUNCTION_ARGS)
2327 {
2328         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2329         Datum      *transdatums;
2330         int                     ndatums;
2331         Numeric         N,
2332                                 sumX;
2333
2334         /* We assume the input is array of numeric */
2335         deconstruct_array(transarray,
2336                                           NUMERICOID, -1, false, 'i',
2337                                           &transdatums, NULL, &ndatums);
2338         if (ndatums != 2)
2339                 elog(ERROR, "expected 2-element numeric array");
2340         N = DatumGetNumeric(transdatums[0]);
2341         sumX = DatumGetNumeric(transdatums[1]);
2342
2343         /* SQL92 defines AVG of no values to be NULL */
2344         /* N is zero iff no digits (cf. numeric_uminus) */
2345         if (VARSIZE(N) == NUMERIC_HDRSZ)
2346                 PG_RETURN_NULL();
2347
2348         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div,
2349                                                                                 NumericGetDatum(sumX),
2350                                                                                 NumericGetDatum(N)));
2351 }
2352
2353 /*
2354  * Workhorse routine for the standard deviance and variance
2355  * aggregates. 'transarray' is the aggregate's transition
2356  * array. 'variance' specifies whether we should calculate the
2357  * variance or the standard deviation. 'sample' indicates whether the
2358  * caller is interested in the sample or the population
2359  * variance/stddev.
2360  *
2361  * If appropriate variance statistic is undefined for the input,
2362  * *is_null is set to true and NULL is returned.
2363  */
2364 static Numeric
2365 numeric_stddev_internal(ArrayType *transarray,
2366                                                 bool variance, bool sample,
2367                                                 bool *is_null)
2368 {
2369         Datum      *transdatums;
2370         int                     ndatums;
2371         Numeric         N,
2372                                 sumX,
2373                                 sumX2,
2374                                 res;
2375         NumericVar      vN,
2376                                 vsumX,
2377                                 vsumX2,
2378                                 vNminus1;
2379         NumericVar *comp;
2380         int                     rscale;
2381
2382         *is_null = false;
2383
2384         /* We assume the input is array of numeric */
2385         deconstruct_array(transarray,
2386                                           NUMERICOID, -1, false, 'i',
2387                                           &transdatums, NULL, &ndatums);
2388         if (ndatums != 3)
2389                 elog(ERROR, "expected 3-element numeric array");
2390         N = DatumGetNumeric(transdatums[0]);
2391         sumX = DatumGetNumeric(transdatums[1]);
2392         sumX2 = DatumGetNumeric(transdatums[2]);
2393
2394         if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2))
2395                 return make_result(&const_nan);
2396
2397         init_var(&vN);
2398         set_var_from_num(N, &vN);
2399
2400         /*
2401          * Sample stddev and variance are undefined when N <= 1; population stddev
2402          * is undefined when N == 0. Return NULL in either case.
2403          */
2404         if (sample)
2405                 comp = &const_one;
2406         else
2407                 comp = &const_zero;
2408
2409         if (cmp_var(&vN, comp) <= 0)
2410         {
2411                 free_var(&vN);
2412                 *is_null = true;
2413                 return NULL;
2414         }
2415
2416         init_var(&vNminus1);
2417         sub_var(&vN, &const_one, &vNminus1);
2418
2419         init_var(&vsumX);
2420         set_var_from_num(sumX, &vsumX);
2421         init_var(&vsumX2);
2422         set_var_from_num(sumX2, &vsumX2);
2423
2424         /* compute rscale for mul_var calls */
2425         rscale = vsumX.dscale * 2;
2426
2427         mul_var(&vsumX, &vsumX, &vsumX, rscale);        /* vsumX = sumX * sumX */
2428         mul_var(&vN, &vsumX2, &vsumX2, rscale);         /* vsumX2 = N * sumX2 */
2429         sub_var(&vsumX2, &vsumX, &vsumX2);      /* N * sumX2 - sumX * sumX */
2430
2431         if (cmp_var(&vsumX2, &const_zero) <= 0)
2432         {
2433                 /* Watch out for roundoff error producing a negative numerator */
2434                 res = make_result(&const_zero);
2435         }
2436         else
2437         {
2438                 if (sample)
2439                         mul_var(&vN, &vNminus1, &vNminus1, 0);  /* N * (N - 1) */
2440                 else
2441                         mul_var(&vN, &vN, &vNminus1, 0);                /* N * N */
2442                 rscale = select_div_scale(&vsumX2, &vNminus1);
2443                 div_var(&vsumX2, &vNminus1, &vsumX, rscale, true);              /* variance */
2444                 if (!variance)
2445                         sqrt_var(&vsumX, &vsumX, rscale);       /* stddev */
2446
2447                 res = make_result(&vsumX);
2448         }
2449
2450         free_var(&vN);
2451         free_var(&vNminus1);
2452         free_var(&vsumX);
2453         free_var(&vsumX2);
2454
2455         return res;
2456 }
2457
2458 Datum
2459 numeric_var_samp(PG_FUNCTION_ARGS)
2460 {
2461         Numeric         res;
2462         bool            is_null;
2463
2464         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2465                                                                   true, true, &is_null);
2466
2467         if (is_null)
2468                 PG_RETURN_NULL();
2469         else
2470                 PG_RETURN_NUMERIC(res);
2471 }
2472
2473 Datum
2474 numeric_stddev_samp(PG_FUNCTION_ARGS)
2475 {
2476         Numeric         res;
2477         bool            is_null;
2478
2479         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2480                                                                   false, true, &is_null);
2481
2482         if (is_null)
2483                 PG_RETURN_NULL();
2484         else
2485                 PG_RETURN_NUMERIC(res);
2486 }
2487
2488 Datum
2489 numeric_var_pop(PG_FUNCTION_ARGS)
2490 {
2491         Numeric         res;
2492         bool            is_null;
2493
2494         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2495                                                                   true, false, &is_null);
2496
2497         if (is_null)
2498                 PG_RETURN_NULL();
2499         else
2500                 PG_RETURN_NUMERIC(res);
2501 }
2502
2503 Datum
2504 numeric_stddev_pop(PG_FUNCTION_ARGS)
2505 {
2506         Numeric         res;
2507         bool            is_null;
2508
2509         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2510                                                                   false, false, &is_null);
2511
2512         if (is_null)
2513                 PG_RETURN_NULL();
2514         else
2515                 PG_RETURN_NUMERIC(res);
2516 }
2517
2518 /*
2519  * SUM transition functions for integer datatypes.
2520  *
2521  * To avoid overflow, we use accumulators wider than the input datatype.
2522  * A Numeric accumulator is needed for int8 input; for int4 and int2
2523  * inputs, we use int8 accumulators which should be sufficient for practical
2524  * purposes.  (The latter two therefore don't really belong in this file,
2525  * but we keep them here anyway.)
2526  *
2527  * Because SQL92 defines the SUM() of no values to be NULL, not zero,
2528  * the initial condition of the transition data value needs to be NULL. This
2529  * means we can't rely on ExecAgg to automatically insert the first non-null
2530  * data value into the transition data: it doesn't know how to do the type
2531  * conversion.  The upshot is that these routines have to be marked non-strict
2532  * and handle substitution of the first non-null input themselves.
2533  */
2534
2535 Datum
2536 int2_sum(PG_FUNCTION_ARGS)
2537 {
2538         int64           newval;
2539
2540         if (PG_ARGISNULL(0))
2541         {
2542                 /* No non-null input seen so far... */
2543                 if (PG_ARGISNULL(1))
2544                         PG_RETURN_NULL();       /* still no non-null */
2545                 /* This is the first non-null input. */
2546                 newval = (int64) PG_GETARG_INT16(1);
2547                 PG_RETURN_INT64(newval);
2548         }
2549
2550         /*
2551          * If we're invoked by nodeAgg, we can cheat and modify out first
2552          * parameter in-place to avoid palloc overhead. If not, we need to return
2553          * the new value of the transition variable.
2554          */
2555         if (fcinfo->context && IsA(fcinfo->context, AggState))
2556         {
2557                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
2558
2559                 /* Leave the running sum unchanged in the new input is null */
2560                 if (!PG_ARGISNULL(1))
2561                         *oldsum = *oldsum + (int64) PG_GETARG_INT16(1);
2562
2563                 PG_RETURN_POINTER(oldsum);
2564         }
2565         else
2566         {
2567                 int64           oldsum = PG_GETARG_INT64(0);
2568
2569                 /* Leave sum unchanged if new input is null. */
2570                 if (PG_ARGISNULL(1))
2571                         PG_RETURN_INT64(oldsum);
2572
2573                 /* OK to do the addition. */
2574                 newval = oldsum + (int64) PG_GETARG_INT16(1);
2575
2576                 PG_RETURN_INT64(newval);
2577         }
2578 }
2579
2580 Datum
2581 int4_sum(PG_FUNCTION_ARGS)
2582 {
2583         int64           newval;
2584
2585         if (PG_ARGISNULL(0))
2586         {
2587                 /* No non-null input seen so far... */
2588                 if (PG_ARGISNULL(1))
2589                         PG_RETURN_NULL();       /* still no non-null */
2590                 /* This is the first non-null input. */
2591                 newval = (int64) PG_GETARG_INT32(1);
2592                 PG_RETURN_INT64(newval);
2593         }
2594
2595         /*
2596          * If we're invoked by nodeAgg, we can cheat and modify out first
2597          * parameter in-place to avoid palloc overhead. If not, we need to return
2598          * the new value of the transition variable.
2599          */
2600         if (fcinfo->context && IsA(fcinfo->context, AggState))
2601         {
2602                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
2603
2604                 /* Leave the running sum unchanged in the new input is null */
2605                 if (!PG_ARGISNULL(1))
2606                         *oldsum = *oldsum + (int64) PG_GETARG_INT32(1);
2607
2608                 PG_RETURN_POINTER(oldsum);
2609         }
2610         else
2611         {
2612                 int64           oldsum = PG_GETARG_INT64(0);
2613
2614                 /* Leave sum unchanged if new input is null. */
2615                 if (PG_ARGISNULL(1))
2616                         PG_RETURN_INT64(oldsum);
2617
2618                 /* OK to do the addition. */
2619                 newval = oldsum + (int64) PG_GETARG_INT32(1);
2620
2621                 PG_RETURN_INT64(newval);
2622         }
2623 }
2624
2625 Datum
2626 int8_sum(PG_FUNCTION_ARGS)
2627 {
2628         Numeric         oldsum;
2629         Datum           newval;
2630
2631         if (PG_ARGISNULL(0))
2632         {
2633                 /* No non-null input seen so far... */
2634                 if (PG_ARGISNULL(1))
2635                         PG_RETURN_NULL();       /* still no non-null */
2636                 /* This is the first non-null input. */
2637                 newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2638                 PG_RETURN_DATUM(newval);
2639         }
2640
2641         /*
2642          * Note that we cannot special-case the nodeAgg case here, as we do for
2643          * int2_sum and int4_sum: numeric is of variable size, so we cannot modify
2644          * our first parameter in-place.
2645          */
2646
2647         oldsum = PG_GETARG_NUMERIC(0);
2648
2649         /* Leave sum unchanged if new input is null. */
2650         if (PG_ARGISNULL(1))
2651                 PG_RETURN_NUMERIC(oldsum);
2652
2653         /* OK to do the addition. */
2654         newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2655
2656         PG_RETURN_DATUM(DirectFunctionCall2(numeric_add,
2657                                                                                 NumericGetDatum(oldsum), newval));
2658 }
2659
2660
2661 /*
2662  * Routines for avg(int2) and avg(int4).  The transition datatype
2663  * is a two-element int8 array, holding count and sum.
2664  */
2665
2666 typedef struct Int8TransTypeData
2667 {
2668 #ifndef INT64_IS_BUSTED
2669         int64           count;
2670         int64           sum;
2671 #else
2672         /* "int64" isn't really 64 bits, so fake up properly-aligned fields */
2673         int32           count;
2674         int32           pad1;
2675         int32           sum;
2676         int32           pad2;
2677 #endif
2678 } Int8TransTypeData;
2679
2680 Datum
2681 int2_avg_accum(PG_FUNCTION_ARGS)
2682 {
2683         ArrayType  *transarray;
2684         int16           newval = PG_GETARG_INT16(1);
2685         Int8TransTypeData *transdata;
2686
2687         /*
2688          * If we're invoked by nodeAgg, we can cheat and modify our first
2689          * parameter in-place to reduce palloc overhead. Otherwise we need to make
2690          * a copy of it before scribbling on it.
2691          */
2692         if (fcinfo->context && IsA(fcinfo->context, AggState))
2693                 transarray = PG_GETARG_ARRAYTYPE_P(0);
2694         else
2695                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2696
2697         if (ARR_HASNULL(transarray) ||
2698                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2699                 elog(ERROR, "expected 2-element int8 array");
2700
2701         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2702         transdata->count++;
2703         transdata->sum += newval;
2704
2705         PG_RETURN_ARRAYTYPE_P(transarray);
2706 }
2707
2708 Datum
2709 int4_avg_accum(PG_FUNCTION_ARGS)
2710 {
2711         ArrayType  *transarray;
2712         int32           newval = PG_GETARG_INT32(1);
2713         Int8TransTypeData *transdata;
2714
2715         /*
2716          * If we're invoked by nodeAgg, we can cheat and modify our first
2717          * parameter in-place to reduce palloc overhead. Otherwise we need to make
2718          * a copy of it before scribbling on it.
2719          */
2720         if (fcinfo->context && IsA(fcinfo->context, AggState))
2721                 transarray = PG_GETARG_ARRAYTYPE_P(0);
2722         else
2723                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2724
2725         if (ARR_HASNULL(transarray) ||
2726                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2727                 elog(ERROR, "expected 2-element int8 array");
2728
2729         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2730         transdata->count++;
2731         transdata->sum += newval;
2732
2733         PG_RETURN_ARRAYTYPE_P(transarray);
2734 }
2735
2736 Datum
2737 int8_avg(PG_FUNCTION_ARGS)
2738 {
2739         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2740         Int8TransTypeData *transdata;
2741         Datum           countd,
2742                                 sumd;
2743
2744         if (ARR_HASNULL(transarray) ||
2745                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2746                 elog(ERROR, "expected 2-element int8 array");
2747         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2748
2749         /* SQL92 defines AVG of no values to be NULL */
2750         if (transdata->count == 0)
2751                 PG_RETURN_NULL();
2752
2753         countd = DirectFunctionCall1(int8_numeric,
2754                                                                  Int64GetDatumFast(transdata->count));
2755         sumd = DirectFunctionCall1(int8_numeric,
2756                                                            Int64GetDatumFast(transdata->sum));
2757
2758         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumd, countd));
2759 }
2760
2761
2762 /* ----------------------------------------------------------------------
2763  *
2764  * Debug support
2765  *
2766  * ----------------------------------------------------------------------
2767  */
2768
2769 #ifdef NUMERIC_DEBUG
2770
2771 /*
2772  * dump_numeric() - Dump a value in the db storage format for debugging
2773  */
2774 static void
2775 dump_numeric(const char *str, Numeric num)
2776 {
2777         NumericDigit *digits = NUMERIC_DIGITS(num);
2778         int                     ndigits;
2779         int                     i;
2780
2781         ndigits = NUMERIC_NDIGITS(num);
2782
2783         printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num));
2784         switch (NUMERIC_SIGN(num))
2785         {
2786                 case NUMERIC_POS:
2787                         printf("POS");
2788                         break;
2789                 case NUMERIC_NEG:
2790                         printf("NEG");
2791                         break;
2792                 case NUMERIC_NAN:
2793                         printf("NaN");
2794                         break;
2795                 default:
2796                         printf("SIGN=0x%x", NUMERIC_SIGN(num));
2797                         break;
2798         }
2799
2800         for (i = 0; i < ndigits; i++)
2801                 printf(" %0*d", DEC_DIGITS, digits[i]);
2802         printf("\n");
2803 }
2804
2805
2806 /*
2807  * dump_var() - Dump a value in the variable format for debugging
2808  */
2809 static void
2810 dump_var(const char *str, NumericVar *var)
2811 {
2812         int                     i;
2813
2814         printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale);
2815         switch (var->sign)
2816         {
2817                 case NUMERIC_POS:
2818                         printf("POS");
2819                         break;
2820                 case NUMERIC_NEG:
2821                         printf("NEG");
2822                         break;
2823                 case NUMERIC_NAN:
2824                         printf("NaN");
2825                         break;
2826                 default:
2827                         printf("SIGN=0x%x", var->sign);
2828                         break;
2829         }
2830
2831         for (i = 0; i < var->ndigits; i++)
2832                 printf(" %0*d", DEC_DIGITS, var->digits[i]);
2833
2834         printf("\n");
2835 }
2836 #endif   /* NUMERIC_DEBUG */
2837
2838
2839 /* ----------------------------------------------------------------------
2840  *
2841  * Local functions follow
2842  *
2843  * In general, these do not support NaNs --- callers must eliminate
2844  * the possibility of NaN first.  (make_result() is an exception.)
2845  *
2846  * ----------------------------------------------------------------------
2847  */
2848
2849
2850 /*
2851  * alloc_var() -
2852  *
2853  *      Allocate a digit buffer of ndigits digits (plus a spare digit for rounding)
2854  */
2855 static void
2856 alloc_var(NumericVar *var, int ndigits)
2857 {
2858         digitbuf_free(var->buf);
2859         var->buf = digitbuf_alloc(ndigits + 1);
2860         var->buf[0] = 0;                        /* spare digit for rounding */
2861         var->digits = var->buf + 1;
2862         var->ndigits = ndigits;
2863 }
2864
2865
2866 /*
2867  * free_var() -
2868  *
2869  *      Return the digit buffer of a variable to the free pool
2870  */
2871 static void
2872 free_var(NumericVar *var)
2873 {
2874         digitbuf_free(var->buf);
2875         var->buf = NULL;
2876         var->digits = NULL;
2877         var->sign = NUMERIC_NAN;
2878 }
2879
2880
2881 /*
2882  * zero_var() -
2883  *
2884  *      Set a variable to ZERO.
2885  *      Note: its dscale is not touched.
2886  */
2887 static void
2888 zero_var(NumericVar *var)
2889 {
2890         digitbuf_free(var->buf);
2891         var->buf = NULL;
2892         var->digits = NULL;
2893         var->ndigits = 0;
2894         var->weight = 0;                        /* by convention; doesn't really matter */
2895         var->sign = NUMERIC_POS;        /* anything but NAN... */
2896 }
2897
2898
2899 /*
2900  * set_var_from_str()
2901  *
2902  *      Parse a string and put the number into a variable
2903  */
2904 static void
2905 set_var_from_str(const char *str, NumericVar *dest)
2906 {
2907         const char *cp = str;
2908         bool            have_dp = FALSE;
2909         int                     i;
2910         unsigned char *decdigits;
2911         int                     sign = NUMERIC_POS;
2912         int                     dweight = -1;
2913         int                     ddigits;
2914         int                     dscale = 0;
2915         int                     weight;
2916         int                     ndigits;
2917         int                     offset;
2918         NumericDigit *digits;
2919
2920         /*
2921          * We first parse the string to extract decimal digits and determine the
2922          * correct decimal weight.      Then convert to NBASE representation.
2923          */
2924
2925         /* skip leading spaces */
2926         while (*cp)
2927         {
2928                 if (!isspace((unsigned char) *cp))
2929                         break;
2930                 cp++;
2931         }
2932
2933         switch (*cp)
2934         {
2935                 case '+':
2936                         sign = NUMERIC_POS;
2937                         cp++;
2938                         break;
2939
2940                 case '-':
2941                         sign = NUMERIC_NEG;
2942                         cp++;
2943                         break;
2944         }
2945
2946         if (*cp == '.')
2947         {
2948                 have_dp = TRUE;
2949                 cp++;
2950         }
2951
2952         if (!isdigit((unsigned char) *cp))
2953                 ereport(ERROR,
2954                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2955                           errmsg("invalid input syntax for type numeric: \"%s\"", str)));
2956
2957         decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS * 2);
2958
2959         /* leading padding for digit alignment later */
2960         memset(decdigits, 0, DEC_DIGITS);
2961         i = DEC_DIGITS;
2962
2963         while (*cp)
2964         {
2965                 if (isdigit((unsigned char) *cp))
2966                 {
2967                         decdigits[i++] = *cp++ - '0';
2968                         if (!have_dp)
2969                                 dweight++;
2970                         else
2971                                 dscale++;
2972                 }
2973                 else if (*cp == '.')
2974                 {
2975                         if (have_dp)
2976                                 ereport(ERROR,
2977                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2978                                           errmsg("invalid input syntax for type numeric: \"%s\"",
2979                                                          str)));
2980                         have_dp = TRUE;
2981                         cp++;
2982                 }
2983                 else
2984                         break;
2985         }
2986
2987         ddigits = i - DEC_DIGITS;
2988         /* trailing padding for digit alignment later */
2989         memset(decdigits + i, 0, DEC_DIGITS - 1);
2990
2991         /* Handle exponent, if any */
2992         if (*cp == 'e' || *cp == 'E')
2993         {
2994                 long            exponent;
2995                 char       *endptr;
2996
2997                 cp++;
2998                 exponent = strtol(cp, &endptr, 10);
2999                 if (endptr == cp)
3000                         ereport(ERROR,
3001                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3002                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3003                                                         str)));
3004                 cp = endptr;
3005                 if (exponent > NUMERIC_MAX_PRECISION ||
3006                         exponent < -NUMERIC_MAX_PRECISION)
3007                         ereport(ERROR,
3008                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3009                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3010                                                         str)));
3011                 dweight += (int) exponent;
3012                 dscale -= (int) exponent;
3013                 if (dscale < 0)
3014                         dscale = 0;
3015         }
3016
3017         /* Should be nothing left but spaces */
3018         while (*cp)
3019         {
3020                 if (!isspace((unsigned char) *cp))
3021                         ereport(ERROR,
3022                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3023                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3024                                                         str)));
3025                 cp++;
3026         }
3027
3028         /*
3029          * Okay, convert pure-decimal representation to base NBASE.  First we need
3030          * to determine the converted weight and ndigits.  offset is the number of
3031          * decimal zeroes to insert before the first given digit to have a
3032          * correctly aligned first NBASE digit.
3033          */
3034         if (dweight >= 0)
3035                 weight = (dweight + 1 + DEC_DIGITS - 1) / DEC_DIGITS - 1;
3036         else
3037                 weight = -((-dweight - 1) / DEC_DIGITS + 1);
3038         offset = (weight + 1) * DEC_DIGITS - (dweight + 1);
3039         ndigits = (ddigits + offset + DEC_DIGITS - 1) / DEC_DIGITS;
3040
3041         alloc_var(dest, ndigits);
3042         dest->sign = sign;
3043         dest->weight = weight;
3044         dest->dscale = dscale;
3045
3046         i = DEC_DIGITS - offset;
3047         digits = dest->digits;
3048
3049         while (ndigits-- > 0)
3050         {
3051 #if DEC_DIGITS == 4
3052                 *digits++ = ((decdigits[i] * 10 + decdigits[i + 1]) * 10 +
3053                                          decdigits[i + 2]) * 10 + decdigits[i + 3];
3054 #elif DEC_DIGITS == 2
3055                 *digits++ = decdigits[i] * 10 + decdigits[i + 1];
3056 #elif DEC_DIGITS == 1
3057                 *digits++ = decdigits[i];
3058 #else
3059 #error unsupported NBASE
3060 #endif
3061                 i += DEC_DIGITS;
3062         }
3063
3064         pfree(decdigits);
3065
3066         /* Strip any leading/trailing zeroes, and normalize weight if zero */
3067         strip_var(dest);
3068 }
3069
3070
3071 /*
3072  * set_var_from_num() -
3073  *
3074  *      Convert the packed db format into a variable
3075  */
3076 static void
3077 set_var_from_num(Numeric num, NumericVar *dest)
3078 {
3079         int                     ndigits;
3080
3081         ndigits = NUMERIC_NDIGITS(num);
3082
3083         alloc_var(dest, ndigits);
3084
3085         dest->weight = num->n_weight;
3086         dest->sign = NUMERIC_SIGN(num);
3087         dest->dscale = NUMERIC_DSCALE(num);
3088
3089         memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit));
3090 }
3091
3092
3093 /*
3094  * set_var_from_var() -
3095  *
3096  *      Copy one variable into another
3097  */
3098 static void
3099 set_var_from_var(NumericVar *value, NumericVar *dest)
3100 {
3101         NumericDigit *newbuf;
3102
3103         newbuf = digitbuf_alloc(value->ndigits + 1);
3104         newbuf[0] = 0;                          /* spare digit for rounding */
3105         memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit));
3106
3107         digitbuf_free(dest->buf);
3108
3109         memmove(dest, value, sizeof(NumericVar));
3110         dest->buf = newbuf;
3111         dest->digits = newbuf + 1;
3112 }
3113
3114
3115 /*
3116  * get_str_from_var() -
3117  *
3118  *      Convert a var to text representation (guts of numeric_out).
3119  *      CAUTION: var's contents may be modified by rounding!
3120  *      Returns a palloc'd string.
3121  */
3122 static char *
3123 get_str_from_var(NumericVar *var, int dscale)
3124 {
3125         char       *str;
3126         char       *cp;
3127         char       *endcp;
3128         int                     i;
3129         int                     d;
3130         NumericDigit dig;
3131
3132 #if DEC_DIGITS > 1
3133         NumericDigit d1;
3134 #endif
3135
3136         if (dscale < 0)
3137                 dscale = 0;
3138
3139         /*
3140          * Check if we must round up before printing the value and do so.
3141          */
3142         round_var(var, dscale);
3143
3144         /*
3145          * Allocate space for the result.
3146          *
3147          * i is set to to # of decimal digits before decimal point. dscale is the
3148          * # of decimal digits we will print after decimal point. We may generate
3149          * as many as DEC_DIGITS-1 excess digits at the end, and in addition we
3150          * need room for sign, decimal point, null terminator.
3151          */
3152         i = (var->weight + 1) * DEC_DIGITS;
3153         if (i <= 0)
3154                 i = 1;
3155
3156         str = palloc(i + dscale + DEC_DIGITS + 2);
3157         cp = str;
3158
3159         /*
3160          * Output a dash for negative values
3161          */
3162         if (var->sign == NUMERIC_NEG)
3163                 *cp++ = '-';
3164
3165         /*
3166          * Output all digits before the decimal point
3167          */
3168         if (var->weight < 0)
3169         {
3170                 d = var->weight + 1;
3171                 *cp++ = '0';
3172         }
3173         else
3174         {
3175                 for (d = 0; d <= var->weight; d++)
3176                 {
3177                         dig = (d < var->ndigits) ? var->digits[d] : 0;
3178                         /* In the first digit, suppress extra leading decimal zeroes */
3179 #if DEC_DIGITS == 4
3180                         {
3181                                 bool            putit = (d > 0);
3182
3183                                 d1 = dig / 1000;
3184                                 dig -= d1 * 1000;
3185                                 putit |= (d1 > 0);
3186                                 if (putit)
3187                                         *cp++ = d1 + '0';
3188                                 d1 = dig / 100;
3189                                 dig -= d1 * 100;
3190                                 putit |= (d1 > 0);
3191                                 if (putit)
3192                                         *cp++ = d1 + '0';
3193                                 d1 = dig / 10;
3194                                 dig -= d1 * 10;
3195                                 putit |= (d1 > 0);
3196                                 if (putit)
3197                                         *cp++ = d1 + '0';
3198                                 *cp++ = dig + '0';
3199                         }
3200 #elif DEC_DIGITS == 2
3201                         d1 = dig / 10;
3202                         dig -= d1 * 10;
3203                         if (d1 > 0 || d > 0)
3204                                 *cp++ = d1 + '0';
3205                         *cp++ = dig + '0';
3206 #elif DEC_DIGITS == 1
3207                         *cp++ = dig + '0';
3208 #else
3209 #error unsupported NBASE
3210 #endif
3211                 }
3212         }
3213
3214         /*
3215          * If requested, output a decimal point and all the digits that follow it.
3216          * We initially put out a multiple of DEC_DIGITS digits, then truncate if
3217          * needed.
3218          */
3219         if (dscale > 0)
3220         {
3221                 *cp++ = '.';
3222                 endcp = cp + dscale;
3223                 for (i = 0; i < dscale; d++, i += DEC_DIGITS)
3224                 {
3225                         dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0;
3226 #if DEC_DIGITS == 4
3227                         d1 = dig / 1000;
3228                         dig -= d1 * 1000;
3229                         *cp++ = d1 + '0';
3230                         d1 = dig / 100;
3231                         dig -= d1 * 100;
3232                         *cp++ = d1 + '0';
3233                         d1 = dig / 10;
3234                         dig -= d1 * 10;
3235                         *cp++ = d1 + '0';
3236                         *cp++ = dig + '0';
3237 #elif DEC_DIGITS == 2
3238                         d1 = dig / 10;
3239                         dig -= d1 * 10;
3240                         *cp++ = d1 + '0';
3241                         *cp++ = dig + '0';
3242 #elif DEC_DIGITS == 1
3243                         *cp++ = dig + '0';
3244 #else
3245 #error unsupported NBASE
3246 #endif
3247                 }
3248                 cp = endcp;
3249         }
3250
3251         /*
3252          * terminate the string and return it
3253          */
3254         *cp = '\0';
3255         return str;
3256 }
3257
3258
3259 /*
3260  * make_result() -
3261  *
3262  *      Create the packed db numeric format in palloc()'d memory from
3263  *      a variable.
3264  */
3265 static Numeric
3266 make_result(NumericVar *var)
3267 {
3268         Numeric         result;
3269         NumericDigit *digits = var->digits;
3270         int                     weight = var->weight;
3271         int                     sign = var->sign;
3272         int                     n;
3273         Size            len;
3274
3275         if (sign == NUMERIC_NAN)
3276         {
3277                 result = (Numeric) palloc(NUMERIC_HDRSZ);
3278
3279                 SET_VARSIZE(result, NUMERIC_HDRSZ);
3280                 result->n_weight = 0;
3281                 result->n_sign_dscale = NUMERIC_NAN;
3282
3283                 dump_numeric("make_result()", result);
3284                 return result;
3285         }
3286
3287         n = var->ndigits;
3288
3289         /* truncate leading zeroes */
3290         while (n > 0 && *digits == 0)
3291         {
3292                 digits++;
3293                 weight--;
3294                 n--;
3295         }
3296         /* truncate trailing zeroes */
3297         while (n > 0 && digits[n - 1] == 0)
3298                 n--;
3299
3300         /* If zero result, force to weight=0 and positive sign */
3301         if (n == 0)
3302         {
3303                 weight = 0;
3304                 sign = NUMERIC_POS;
3305         }
3306
3307         /* Build the result */
3308         len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
3309         result = (Numeric) palloc(len);
3310         SET_VARSIZE(result, len);
3311         result->n_weight = weight;
3312         result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK);
3313
3314         memcpy(result->n_data, digits, n * sizeof(NumericDigit));
3315
3316         /* Check for overflow of int16 fields */
3317         if (result->n_weight != weight ||
3318                 NUMERIC_DSCALE(result) != var->dscale)
3319                 ereport(ERROR,
3320                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3321                                  errmsg("value overflows numeric format")));
3322
3323         dump_numeric("make_result()", result);
3324         return result;
3325 }
3326
3327
3328 /*
3329  * apply_typmod() -
3330  *
3331  *      Do bounds checking and rounding according to the attributes
3332  *      typmod field.
3333  */
3334 static void
3335 apply_typmod(NumericVar *var, int32 typmod)
3336 {
3337         int                     precision;
3338         int                     scale;
3339         int                     maxdigits;
3340         int                     ddigits;
3341         int                     i;
3342
3343         /* Do nothing if we have a default typmod (-1) */
3344         if (typmod < (int32) (VARHDRSZ))
3345                 return;
3346
3347         typmod -= VARHDRSZ;
3348         precision = (typmod >> 16) & 0xffff;
3349         scale = typmod & 0xffff;
3350         maxdigits = precision - scale;
3351
3352         /* Round to target scale (and set var->dscale) */
3353         round_var(var, scale);
3354
3355         /*
3356          * Check for overflow - note we can't do this before rounding, because
3357          * rounding could raise the weight.  Also note that the var's weight could
3358          * be inflated by leading zeroes, which will be stripped before storage
3359          * but perhaps might not have been yet. In any case, we must recognize a
3360          * true zero, whose weight doesn't mean anything.
3361          */
3362         ddigits = (var->weight + 1) * DEC_DIGITS;
3363         if (ddigits > maxdigits)
3364         {
3365                 /* Determine true weight; and check for all-zero result */
3366                 for (i = 0; i < var->ndigits; i++)
3367                 {
3368                         NumericDigit dig = var->digits[i];
3369
3370                         if (dig)
3371                         {
3372                                 /* Adjust for any high-order decimal zero digits */
3373 #if DEC_DIGITS == 4
3374                                 if (dig < 10)
3375                                         ddigits -= 3;
3376                                 else if (dig < 100)
3377                                         ddigits -= 2;
3378                                 else if (dig < 1000)
3379                                         ddigits -= 1;
3380 #elif DEC_DIGITS == 2
3381                                 if (dig < 10)
3382                                         ddigits -= 1;
3383 #elif DEC_DIGITS == 1
3384                                 /* no adjustment */
3385 #else
3386 #error unsupported NBASE
3387 #endif
3388                                 if (ddigits > maxdigits)
3389                                         ereport(ERROR,
3390                                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3391                                                          errmsg("numeric field overflow"),
3392                                                          errdetail("A field with precision %d, scale %d must round to an absolute value less than %s%d.",
3393                                                                            precision, scale,
3394                                         /* Display 10^0 as 1 */
3395                                                                            maxdigits ? "10^" : "",
3396                                                                            maxdigits ? maxdigits : 1
3397                                                                            )));
3398                                 break;
3399                         }
3400                         ddigits -= DEC_DIGITS;
3401                 }
3402         }
3403 }
3404
3405 /*
3406  * Convert numeric to int8, rounding if needed.
3407  *
3408  * If overflow, return FALSE (no error is raised).      Return TRUE if okay.
3409  *
3410  *      CAUTION: var's contents may be modified by rounding!
3411  */
3412 static bool
3413 numericvar_to_int8(NumericVar *var, int64 *result)
3414 {
3415         NumericDigit *digits;
3416         int                     ndigits;
3417         int                     weight;
3418         int                     i;
3419         int64           val,
3420                                 oldval;
3421         bool            neg;
3422
3423         /* Round to nearest integer */
3424         round_var(var, 0);
3425
3426         /* Check for zero input */
3427         strip_var(var);
3428         ndigits = var->ndigits;
3429         if (ndigits == 0)
3430         {
3431                 *result = 0;
3432                 return true;
3433         }
3434
3435         /*
3436          * For input like 10000000000, we must treat stripped digits as real. So
3437          * the loop assumes there are weight+1 digits before the decimal point.
3438          */
3439         weight = var->weight;
3440         Assert(weight >= 0 && ndigits <= weight + 1);
3441
3442         /* Construct the result */
3443         digits = var->digits;
3444         neg = (var->sign == NUMERIC_NEG);
3445         val = digits[0];
3446         for (i = 1; i <= weight; i++)
3447         {
3448                 oldval = val;
3449                 val *= NBASE;
3450                 if (i < ndigits)
3451                         val += digits[i];
3452
3453                 /*
3454                  * The overflow check is a bit tricky because we want to accept
3455                  * INT64_MIN, which will overflow the positive accumulator.  We can
3456                  * detect this case easily though because INT64_MIN is the only
3457                  * nonzero value for which -val == val (on a two's complement machine,
3458                  * anyway).
3459                  */
3460                 if ((val / NBASE) != oldval)    /* possible overflow? */
3461                 {
3462                         if (!neg || (-val) != val || val == 0 || oldval < 0)
3463                                 return false;
3464                 }
3465         }
3466
3467         *result = neg ? -val : val;
3468         return true;
3469 }
3470
3471 /*
3472  * Convert int8 value to numeric.
3473  */
3474 static void
3475 int8_to_numericvar(int64 val, NumericVar *var)
3476 {
3477         uint64          uval,
3478                                 newuval;
3479         NumericDigit *ptr;
3480         int                     ndigits;
3481
3482         /* int8 can require at most 19 decimal digits; add one for safety */
3483         alloc_var(var, 20 / DEC_DIGITS);
3484         if (val < 0)
3485         {
3486                 var->sign = NUMERIC_NEG;
3487                 uval = -val;
3488         }
3489         else
3490         {
3491                 var->sign = NUMERIC_POS;
3492                 uval = val;
3493         }
3494         var->dscale = 0;
3495         if (val == 0)
3496         {
3497                 var->ndigits = 0;
3498                 var->weight = 0;
3499                 return;
3500         }
3501         ptr = var->digits + var->ndigits;
3502         ndigits = 0;
3503         do
3504         {
3505                 ptr--;
3506                 ndigits++;
3507                 newuval = uval / NBASE;
3508                 *ptr = uval - newuval * NBASE;
3509                 uval = newuval;
3510         } while (uval);
3511         var->digits = ptr;
3512         var->ndigits = ndigits;
3513         var->weight = ndigits - 1;
3514 }
3515
3516 /*
3517  * Convert numeric to float8; if out of range, return +/- HUGE_VAL
3518  */
3519 static double
3520 numeric_to_double_no_overflow(Numeric num)
3521 {
3522         char       *tmp;
3523         double          val;
3524         char       *endptr;
3525
3526         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
3527                                                                                           NumericGetDatum(num)));
3528
3529         /* unlike float8in, we ignore ERANGE from strtod */
3530         val = strtod(tmp, &endptr);
3531         if (*endptr != '\0')
3532         {
3533                 /* shouldn't happen ... */
3534                 ereport(ERROR,
3535                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3536                          errmsg("invalid input syntax for type double precision: \"%s\"",
3537                                         tmp)));
3538         }
3539
3540         pfree(tmp);
3541
3542         return val;
3543 }
3544
3545 /* As above, but work from a NumericVar */
3546 static double
3547 numericvar_to_double_no_overflow(NumericVar *var)
3548 {
3549         char       *tmp;
3550         double          val;
3551         char       *endptr;
3552
3553         tmp = get_str_from_var(var, var->dscale);
3554
3555         /* unlike float8in, we ignore ERANGE from strtod */
3556         val = strtod(tmp, &endptr);
3557         if (*endptr != '\0')
3558         {
3559                 /* shouldn't happen ... */
3560                 ereport(ERROR,
3561                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3562                          errmsg("invalid input syntax for type double precision: \"%s\"",
3563                                         tmp)));
3564         }
3565
3566         pfree(tmp);
3567
3568         return val;
3569 }
3570
3571
3572 /*
3573  * cmp_var() -
3574  *
3575  *      Compare two values on variable level.  We assume zeroes have been
3576  *      truncated to no digits.
3577  */
3578 static int
3579 cmp_var(NumericVar *var1, NumericVar *var2)
3580 {
3581         return cmp_var_common(var1->digits, var1->ndigits,
3582                                                   var1->weight, var1->sign,
3583                                                   var2->digits, var2->ndigits,
3584                                                   var2->weight, var2->sign);
3585 }
3586
3587 /*
3588  * cmp_var_common() -
3589  *
3590  *      Main routine of cmp_var(). This function can be used by both
3591  *      NumericVar and Numeric.
3592  */
3593 static int
3594 cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
3595                            int var1weight, int var1sign,
3596                            const NumericDigit *var2digits, int var2ndigits,
3597                            int var2weight, int var2sign)
3598 {
3599         if (var1ndigits == 0)
3600         {
3601                 if (var2ndigits == 0)
3602                         return 0;
3603                 if (var2sign == NUMERIC_NEG)
3604                         return 1;
3605                 return -1;
3606         }
3607         if (var2ndigits == 0)
3608         {
3609                 if (var1sign == NUMERIC_POS)
3610                         return 1;
3611                 return -1;
3612         }
3613
3614         if (var1sign == NUMERIC_POS)
3615         {
3616                 if (var2sign == NUMERIC_NEG)
3617                         return 1;
3618                 return cmp_abs_common(var1digits, var1ndigits, var1weight,
3619                                                           var2digits, var2ndigits, var2weight);
3620         }
3621
3622         if (var2sign == NUMERIC_POS)
3623                 return -1;
3624
3625         return cmp_abs_common(var2digits, var2ndigits, var2weight,
3626                                                   var1digits, var1ndigits, var1weight);
3627 }
3628
3629
3630 /*
3631  * add_var() -
3632  *
3633  *      Full version of add functionality on variable level (handling signs).
3634  *      result might point to one of the operands too without danger.
3635  */
3636 static void
3637 add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3638 {
3639         /*
3640          * Decide on the signs of the two variables what to do
3641          */
3642         if (var1->sign == NUMERIC_POS)
3643         {
3644                 if (var2->sign == NUMERIC_POS)
3645                 {
3646                         /*
3647                          * Both are positive result = +(ABS(var1) + ABS(var2))
3648                          */
3649                         add_abs(var1, var2, result);
3650                         result->sign = NUMERIC_POS;
3651                 }
3652                 else
3653                 {
3654                         /*
3655                          * var1 is positive, var2 is negative Must compare absolute values
3656                          */
3657                         switch (cmp_abs(var1, var2))
3658                         {
3659                                 case 0:
3660                                         /* ----------
3661                                          * ABS(var1) == ABS(var2)
3662                                          * result = ZERO
3663                                          * ----------
3664                                          */
3665                                         zero_var(result);
3666                                         result->dscale = Max(var1->dscale, var2->dscale);
3667                                         break;
3668
3669                                 case 1:
3670                                         /* ----------
3671                                          * ABS(var1) > ABS(var2)
3672                                          * result = +(ABS(var1) - ABS(var2))
3673                                          * ----------
3674                                          */
3675                                         sub_abs(var1, var2, result);
3676                                         result->sign = NUMERIC_POS;
3677                                         break;
3678
3679                                 case -1:
3680                                         /* ----------
3681                                          * ABS(var1) < ABS(var2)
3682                                          * result = -(ABS(var2) - ABS(var1))
3683                                          * ----------
3684                                          */
3685                                         sub_abs(var2, var1, result);
3686                                         result->sign = NUMERIC_NEG;
3687                                         break;
3688                         }
3689                 }
3690         }
3691         else
3692         {
3693                 if (var2->sign == NUMERIC_POS)
3694                 {
3695                         /* ----------
3696                          * var1 is negative, var2 is positive
3697                          * Must compare absolute values
3698                          * ----------
3699                          */
3700                         switch (cmp_abs(var1, var2))
3701                         {
3702                                 case 0:
3703                                         /* ----------
3704                                          * ABS(var1) == ABS(var2)
3705                                          * result = ZERO
3706                                          * ----------
3707                                          */
3708                                         zero_var(result);
3709                                         result->dscale = Max(var1->dscale, var2->dscale);
3710                                         break;
3711
3712                                 case 1:
3713                                         /* ----------
3714                                          * ABS(var1) > ABS(var2)
3715                                          * result = -(ABS(var1) - ABS(var2))
3716                                          * ----------
3717                                          */
3718                                         sub_abs(var1, var2, result);
3719                                         result->sign = NUMERIC_NEG;
3720                                         break;
3721
3722                                 case -1:
3723                                         /* ----------
3724                                          * ABS(var1) < ABS(var2)
3725                                          * result = +(ABS(var2) - ABS(var1))
3726                                          * ----------
3727                                          */
3728                                         sub_abs(var2, var1, result);
3729                                         result->sign = NUMERIC_POS;
3730                                         break;
3731                         }
3732                 }
3733                 else
3734                 {
3735                         /* ----------
3736                          * Both are negative
3737                          * result = -(ABS(var1) + ABS(var2))
3738                          * ----------
3739                          */
3740                         add_abs(var1, var2, result);
3741                         result->sign = NUMERIC_NEG;
3742                 }
3743         }
3744 }
3745
3746
3747 /*
3748  * sub_var() -
3749  *
3750  *      Full version of sub functionality on variable level (handling signs).
3751  *      result might point to one of the operands too without danger.
3752  */
3753 static void
3754 sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3755 {
3756         /*
3757          * Decide on the signs of the two variables what to do
3758          */
3759         if (var1->sign == NUMERIC_POS)
3760         {
3761                 if (var2->sign == NUMERIC_NEG)
3762                 {
3763                         /* ----------
3764                          * var1 is positive, var2 is negative
3765                          * result = +(ABS(var1) + ABS(var2))
3766                          * ----------
3767                          */
3768                         add_abs(var1, var2, result);
3769                         result->sign = NUMERIC_POS;
3770                 }
3771                 else
3772                 {
3773                         /* ----------
3774                          * Both are positive
3775                          * Must compare absolute values
3776                          * ----------
3777                          */
3778                         switch (cmp_abs(var1, var2))
3779                         {
3780                                 case 0:
3781                                         /* ----------
3782                                          * ABS(var1) == ABS(var2)
3783                                          * result = ZERO
3784                                          * ----------
3785                                          */
3786                                         zero_var(result);
3787                                         result->dscale = Max(var1->dscale, var2->dscale);
3788                                         break;
3789
3790                                 case 1:
3791                                         /* ----------
3792                                          * ABS(var1) > ABS(var2)
3793                                          * result = +(ABS(var1) - ABS(var2))
3794                                          * ----------
3795                                          */
3796                                         sub_abs(var1, var2, result);
3797                                         result->sign = NUMERIC_POS;
3798                                         break;
3799
3800                                 case -1:
3801                                         /* ----------
3802                                          * ABS(var1) < ABS(var2)
3803                                          * result = -(ABS(var2) - ABS(var1))
3804                                          * ----------
3805                                          */
3806                                         sub_abs(var2, var1, result);
3807                                         result->sign = NUMERIC_NEG;
3808                                         break;
3809                         }
3810                 }
3811         }
3812         else
3813         {
3814                 if (var2->sign == NUMERIC_NEG)
3815                 {
3816                         /* ----------
3817                          * Both are negative
3818                          * Must compare absolute values
3819                          * ----------
3820                          */
3821                         switch (cmp_abs(var1, var2))
3822                         {
3823                                 case 0:
3824                                         /* ----------
3825                                          * ABS(var1) == ABS(var2)
3826                                          * result = ZERO
3827                                          * ----------
3828                                          */
3829                                         zero_var(result);
3830                                         result->dscale = Max(var1->dscale, var2->dscale);
3831                                         break;
3832
3833                                 case 1:
3834                                         /* ----------
3835                                          * ABS(var1) > ABS(var2)
3836                                          * result = -(ABS(var1) - ABS(var2))
3837                                          * ----------
3838                                          */
3839                                         sub_abs(var1, var2, result);
3840                                         result->sign = NUMERIC_NEG;
3841                                         break;
3842
3843                                 case -1:
3844                                         /* ----------
3845                                          * ABS(var1) < ABS(var2)
3846                                          * result = +(ABS(var2) - ABS(var1))
3847                                          * ----------
3848                                          */
3849                                         sub_abs(var2, var1, result);
3850                                         result->sign = NUMERIC_POS;
3851                                         break;
3852                         }
3853                 }
3854                 else
3855                 {
3856                         /* ----------
3857                          * var1 is negative, var2 is positive
3858                          * result = -(ABS(var1) + ABS(var2))
3859                          * ----------
3860                          */
3861                         add_abs(var1, var2, result);
3862                         result->sign = NUMERIC_NEG;
3863                 }
3864         }
3865 }
3866
3867
3868 /*
3869  * mul_var() -
3870  *
3871  *      Multiplication on variable level. Product of var1 * var2 is stored
3872  *      in result.      Result is rounded to no more than rscale fractional digits.
3873  */
3874 static void
3875 mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
3876                 int rscale)
3877 {
3878         int                     res_ndigits;
3879         int                     res_sign;
3880         int                     res_weight;
3881         int                     maxdigits;
3882         int                *dig;
3883         int                     carry;
3884         int                     maxdig;
3885         int                     newdig;
3886         NumericDigit *res_digits;
3887         int                     i,
3888                                 ri,
3889                                 i1,
3890                                 i2;
3891
3892         /* copy these values into local vars for speed in inner loop */
3893         int                     var1ndigits = var1->ndigits;
3894         int                     var2ndigits = var2->ndigits;
3895         NumericDigit *var1digits = var1->digits;
3896         NumericDigit *var2digits = var2->digits;
3897
3898         if (var1ndigits == 0 || var2ndigits == 0)
3899         {
3900                 /* one or both inputs is zero; so is result */
3901                 zero_var(result);
3902                 result->dscale = rscale;
3903                 return;
3904         }
3905
3906         /* Determine result sign and (maximum possible) weight */
3907         if (var1->sign == var2->sign)
3908                 res_sign = NUMERIC_POS;
3909         else
3910                 res_sign = NUMERIC_NEG;
3911         res_weight = var1->weight + var2->weight + 2;
3912
3913         /*
3914          * Determine number of result digits to compute.  If the exact result
3915          * would have more than rscale fractional digits, truncate the computation
3916          * with MUL_GUARD_DIGITS guard digits.  We do that by pretending that one
3917          * or both inputs have fewer digits than they really do.
3918          */
3919         res_ndigits = var1ndigits + var2ndigits + 1;
3920         maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS;
3921         if (res_ndigits > maxdigits)
3922         {
3923                 if (maxdigits < 3)
3924                 {
3925                         /* no useful precision at all in the result... */
3926                         zero_var(result);
3927                         result->dscale = rscale;
3928                         return;
3929                 }
3930                 /* force maxdigits odd so that input ndigits can be equal */
3931                 if ((maxdigits & 1) == 0)
3932                         maxdigits++;
3933                 if (var1ndigits > var2ndigits)
3934                 {
3935                         var1ndigits -= res_ndigits - maxdigits;
3936                         if (var1ndigits < var2ndigits)
3937                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
3938                 }
3939                 else
3940                 {
3941                         var2ndigits -= res_ndigits - maxdigits;
3942                         if (var2ndigits < var1ndigits)
3943                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
3944                 }
3945                 res_ndigits = maxdigits;
3946                 Assert(res_ndigits == var1ndigits + var2ndigits + 1);
3947         }
3948
3949         /*
3950          * We do the arithmetic in an array "dig[]" of signed int's.  Since
3951          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
3952          * to avoid normalizing carries immediately.
3953          *
3954          * maxdig tracks the maximum possible value of any dig[] entry; when this
3955          * threatens to exceed INT_MAX, we take the time to propagate carries. To
3956          * avoid overflow in maxdig itself, it actually represents the max
3957          * possible value divided by NBASE-1.
3958          */
3959         dig = (int *) palloc0(res_ndigits * sizeof(int));
3960         maxdig = 0;
3961
3962         ri = res_ndigits - 1;
3963         for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--)
3964         {
3965                 int                     var1digit = var1digits[i1];
3966
3967                 if (var1digit == 0)
3968                         continue;
3969
3970                 /* Time to normalize? */
3971                 maxdig += var1digit;
3972                 if (maxdig > INT_MAX / (NBASE - 1))
3973                 {
3974                         /* Yes, do it */
3975                         carry = 0;
3976                         for (i = res_ndigits - 1; i >= 0; i--)
3977                         {
3978                                 newdig = dig[i] + carry;
3979                                 if (newdig >= NBASE)
3980                                 {
3981                                         carry = newdig / NBASE;
3982                                         newdig -= carry * NBASE;
3983                                 }
3984                                 else
3985                                         carry = 0;
3986                                 dig[i] = newdig;
3987                         }
3988                         Assert(carry == 0);
3989                         /* Reset maxdig to indicate new worst-case */
3990                         maxdig = 1 + var1digit;
3991                 }
3992
3993                 /* Add appropriate multiple of var2 into the accumulator */
3994                 i = ri;
3995                 for (i2 = var2ndigits - 1; i2 >= 0; i2--)
3996                         dig[i--] += var1digit * var2digits[i2];
3997         }
3998
3999         /*
4000          * Now we do a final carry propagation pass to normalize the result, which
4001          * we combine with storing the result digits into the output. Note that
4002          * this is still done at full precision w/guard digits.
4003          */
4004         alloc_var(result, res_ndigits);
4005         res_digits = result->digits;
4006         carry = 0;
4007         for (i = res_ndigits - 1; i >= 0; i--)
4008         {
4009                 newdig = dig[i] + carry;
4010                 if (newdig >= NBASE)
4011                 {
4012                         carry = newdig / NBASE;
4013                         newdig -= carry * NBASE;
4014                 }
4015                 else
4016                         carry = 0;
4017                 res_digits[i] = newdig;
4018         }
4019         Assert(carry == 0);
4020
4021         pfree(dig);
4022
4023         /*
4024          * Finally, round the result to the requested precision.
4025          */
4026         result->weight = res_weight;
4027         result->sign = res_sign;
4028
4029         /* Round to target rscale (and set result->dscale) */
4030         round_var(result, rscale);
4031
4032         /* Strip leading and trailing zeroes */
4033         strip_var(result);
4034 }
4035
4036
4037 /*
4038  * div_var() -
4039  *
4040  *      Division on variable level. Quotient of var1 / var2 is stored
4041  *      in result.      Result is rounded to no more than rscale fractional digits.
4042  */
4043 static void
4044 div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
4045                 int rscale, bool round)
4046 {
4047         int                     div_ndigits;
4048         int                     res_sign;
4049         int                     res_weight;
4050         int                *div;
4051         int                     qdigit;
4052         int                     carry;
4053         int                     maxdiv;
4054         int                     newdig;
4055         NumericDigit *res_digits;
4056         double          fdividend,
4057                                 fdivisor,
4058                                 fdivisorinverse,
4059                                 fquotient;
4060         int                     qi;
4061         int                     i;
4062
4063         /* copy these values into local vars for speed in inner loop */
4064         int                     var1ndigits = var1->ndigits;
4065         int                     var2ndigits = var2->ndigits;
4066         NumericDigit *var1digits = var1->digits;
4067         NumericDigit *var2digits = var2->digits;
4068
4069         /*
4070          * First of all division by zero check; we must not be handed an
4071          * unnormalized divisor.
4072          */
4073         if (var2ndigits == 0 || var2digits[0] == 0)
4074                 ereport(ERROR,
4075                                 (errcode(ERRCODE_DIVISION_BY_ZERO),
4076                                  errmsg("division by zero")));
4077
4078         /*
4079          * Now result zero check
4080          */
4081         if (var1ndigits == 0)
4082         {
4083                 zero_var(result);
4084                 result->dscale = rscale;
4085                 return;
4086         }
4087
4088         /*
4089          * Determine the result sign, weight and number of digits to calculate
4090          */
4091         if (var1->sign == var2->sign)
4092                 res_sign = NUMERIC_POS;
4093         else
4094                 res_sign = NUMERIC_NEG;
4095         res_weight = var1->weight - var2->weight + 1;
4096         /* The number of accurate result digits we need to produce: */
4097         div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
4098         /* Add guard digits for roundoff error */
4099         div_ndigits += DIV_GUARD_DIGITS;
4100         if (div_ndigits < DIV_GUARD_DIGITS)
4101                 div_ndigits = DIV_GUARD_DIGITS;
4102         /* Must be at least var1ndigits, too, to simplify data-loading loop */
4103         if (div_ndigits < var1ndigits)
4104                 div_ndigits = var1ndigits;
4105
4106         /*
4107          * We do the arithmetic in an array "div[]" of signed int's.  Since
4108          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
4109          * to avoid normalizing carries immediately.
4110          *
4111          * We start with div[] containing one zero digit followed by the
4112          * dividend's digits (plus appended zeroes to reach the desired precision
4113          * including guard digits).  Each step of the main loop computes an
4114          * (approximate) quotient digit and stores it into div[], removing one
4115          * position of dividend space.  A final pass of carry propagation takes
4116          * care of any mistaken quotient digits.
4117          */
4118         div = (int *) palloc0((div_ndigits + 1) * sizeof(int));
4119         for (i = 0; i < var1ndigits; i++)
4120                 div[i + 1] = var1digits[i];
4121
4122         /*
4123          * We estimate each quotient digit using floating-point arithmetic, taking
4124          * the first four digits of the (current) dividend and divisor. This must
4125          * be float to avoid overflow.
4126          */
4127         fdivisor = (double) var2digits[0];
4128         for (i = 1; i < 4; i++)
4129         {
4130                 fdivisor *= NBASE;
4131                 if (i < var2ndigits)
4132                         fdivisor += (double) var2digits[i];
4133         }
4134         fdivisorinverse = 1.0 / fdivisor;
4135
4136         /*
4137          * maxdiv tracks the maximum possible absolute value of any div[] entry;
4138          * when this threatens to exceed INT_MAX, we take the time to propagate
4139          * carries.  To avoid overflow in maxdiv itself, it actually represents
4140          * the max possible abs. value divided by NBASE-1.
4141          */
4142         maxdiv = 1;
4143
4144         /*
4145          * Outer loop computes next quotient digit, which will go into div[qi]
4146          */
4147         for (qi = 0; qi < div_ndigits; qi++)
4148         {
4149                 /* Approximate the current dividend value */
4150                 fdividend = (double) div[qi];
4151                 for (i = 1; i < 4; i++)
4152                 {
4153                         fdividend *= NBASE;
4154                         if (qi + i <= div_ndigits)
4155                                 fdividend += (double) div[qi + i];
4156                 }
4157                 /* Compute the (approximate) quotient digit */
4158                 fquotient = fdividend * fdivisorinverse;
4159                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4160                         (((int) fquotient) - 1);        /* truncate towards -infinity */
4161
4162                 if (qdigit != 0)
4163                 {
4164                         /* Do we need to normalize now? */
4165                         maxdiv += Abs(qdigit);
4166                         if (maxdiv > INT_MAX / (NBASE - 1))
4167                         {
4168                                 /* Yes, do it */
4169                                 carry = 0;
4170                                 for (i = div_ndigits; i > qi; i--)
4171                                 {
4172                                         newdig = div[i] + carry;
4173                                         if (newdig < 0)
4174                                         {
4175                                                 carry = -((-newdig - 1) / NBASE) - 1;
4176                                                 newdig -= carry * NBASE;
4177                                         }
4178                                         else if (newdig >= NBASE)
4179                                         {
4180                                                 carry = newdig / NBASE;
4181                                                 newdig -= carry * NBASE;
4182                                         }
4183                                         else
4184                                                 carry = 0;
4185                                         div[i] = newdig;
4186                                 }
4187                                 newdig = div[qi] + carry;
4188                                 div[qi] = newdig;
4189
4190                                 /*
4191                                  * All the div[] digits except possibly div[qi] are now in the
4192                                  * range 0..NBASE-1.
4193                                  */
4194                                 maxdiv = Abs(newdig) / (NBASE - 1);
4195                                 maxdiv = Max(maxdiv, 1);
4196
4197                                 /*
4198                                  * Recompute the quotient digit since new info may have
4199                                  * propagated into the top four dividend digits
4200                                  */
4201                                 fdividend = (double) div[qi];
4202                                 for (i = 1; i < 4; i++)
4203                                 {
4204                                         fdividend *= NBASE;
4205                                         if (qi + i <= div_ndigits)
4206                                                 fdividend += (double) div[qi + i];
4207                                 }
4208                                 /* Compute the (approximate) quotient digit */
4209                                 fquotient = fdividend * fdivisorinverse;
4210                                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4211                                         (((int) fquotient) - 1);        /* truncate towards -infinity */
4212                                 maxdiv += Abs(qdigit);
4213                         }
4214
4215                         /* Subtract off the appropriate multiple of the divisor */
4216                         if (qdigit != 0)
4217                         {
4218                                 int                     istop = Min(var2ndigits, div_ndigits - qi + 1);
4219
4220                                 for (i = 0; i < istop; i++)
4221                                         div[qi + i] -= qdigit * var2digits[i];
4222                         }
4223                 }
4224
4225                 /*
4226                  * The dividend digit we are about to replace might still be nonzero.
4227                  * Fold it into the next digit position.  We don't need to worry about
4228                  * overflow here since this should nearly cancel with the subtraction
4229                  * of the divisor.
4230                  */
4231                 div[qi + 1] += div[qi] * NBASE;
4232
4233                 div[qi] = qdigit;
4234         }
4235
4236         /*
4237          * Approximate and store the last quotient digit (div[div_ndigits])
4238          */
4239         fdividend = (double) div[qi];
4240         for (i = 1; i < 4; i++)
4241                 fdividend *= NBASE;
4242         fquotient = fdividend * fdivisorinverse;
4243         qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4244                 (((int) fquotient) - 1);        /* truncate towards -infinity */
4245         div[qi] = qdigit;
4246
4247         /*
4248          * Now we do a final carry propagation pass to normalize the result, which
4249          * we combine with storing the result digits into the output. Note that
4250          * this is still done at full precision w/guard digits.
4251          */
4252         alloc_var(result, div_ndigits + 1);
4253         res_digits = result->digits;
4254         carry = 0;
4255         for (i = div_ndigits; i >= 0; i--)
4256         {
4257                 newdig = div[i] + carry;
4258                 if (newdig < 0)
4259                 {
4260                         carry = -((-newdig - 1) / NBASE) - 1;
4261                         newdig -= carry * NBASE;
4262                 }
4263                 else if (newdig >= NBASE)
4264                 {
4265                         carry = newdig / NBASE;
4266                         newdig -= carry * NBASE;
4267                 }
4268                 else
4269                         carry = 0;
4270                 res_digits[i] = newdig;
4271         }
4272         Assert(carry == 0);
4273
4274         pfree(div);
4275
4276         /*
4277          * Finally, round the result to the requested precision.
4278          */
4279         result->weight = res_weight;
4280         result->sign = res_sign;
4281
4282         /* Round to target rscale (and set result->dscale) */
4283         if (round)
4284                 round_var(result, rscale);
4285         else
4286                 trunc_var(result, rscale);
4287
4288         /* Strip leading and trailing zeroes */
4289         strip_var(result);
4290 }
4291
4292
4293 /*
4294  * Default scale selection for division
4295  *
4296  * Returns the appropriate result scale for the division result.
4297  */
4298 static int
4299 select_div_scale(NumericVar *var1, NumericVar *var2)
4300 {
4301         int                     weight1,
4302                                 weight2,
4303                                 qweight,
4304                                 i;
4305         NumericDigit firstdigit1,
4306                                 firstdigit2;
4307         int                     rscale;
4308
4309         /*
4310          * The result scale of a division isn't specified in any SQL standard. For
4311          * PostgreSQL we select a result scale that will give at least
4312          * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a
4313          * result no less accurate than float8; but use a scale not less than
4314          * either input's display scale.
4315          */
4316
4317         /* Get the actual (normalized) weight and first digit of each input */
4318
4319         weight1 = 0;                            /* values to use if var1 is zero */
4320         firstdigit1 = 0;
4321         for (i = 0; i < var1->ndigits; i++)
4322         {
4323                 firstdigit1 = var1->digits[i];
4324                 if (firstdigit1 != 0)
4325                 {
4326                         weight1 = var1->weight - i;
4327                         break;
4328                 }
4329         }
4330
4331         weight2 = 0;                            /* values to use if var2 is zero */
4332         firstdigit2 = 0;
4333         for (i = 0; i < var2->ndigits; i++)
4334         {
4335                 firstdigit2 = var2->digits[i];
4336                 if (firstdigit2 != 0)
4337                 {
4338                         weight2 = var2->weight - i;
4339                         break;
4340                 }
4341         }
4342
4343         /*
4344          * Estimate weight of quotient.  If the two first digits are equal, we
4345          * can't be sure, but assume that var1 is less than var2.
4346          */
4347         qweight = weight1 - weight2;
4348         if (firstdigit1 <= firstdigit2)
4349                 qweight--;
4350
4351         /* Select result scale */
4352         rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS;
4353         rscale = Max(rscale, var1->dscale);
4354         rscale = Max(rscale, var2->dscale);
4355         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4356         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4357
4358         return rscale;
4359 }
4360
4361
4362 /*
4363  * mod_var() -
4364  *
4365  *      Calculate the modulo of two numerics at variable level
4366  */
4367 static void
4368 mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
4369 {
4370         NumericVar      tmp;
4371         int                     rscale;
4372
4373         init_var(&tmp);
4374
4375         /* ---------
4376          * We do this using the equation
4377          *              mod(x,y) = x - trunc(x/y)*y
4378          * We set rscale the same way numeric_div and numeric_mul do
4379          * to get the right answer from the equation.  The final result,
4380          * however, need not be displayed to more precision than the inputs.
4381          * ----------
4382          */
4383         rscale = select_div_scale(var1, var2);
4384
4385         div_var(var1, var2, &tmp, rscale, false);
4386
4387         trunc_var(&tmp, 0);
4388
4389         mul_var(var2, &tmp, &tmp, var2->dscale + tmp.dscale);
4390
4391         sub_var(var1, &tmp, result);
4392
4393         round_var(result, Max(var1->dscale, var2->dscale));
4394
4395         free_var(&tmp);
4396 }
4397
4398
4399 /*
4400  * ceil_var() -
4401  *
4402  *      Return the smallest integer greater than or equal to the argument
4403  *      on variable level
4404  */
4405 static void
4406 ceil_var(NumericVar *var, NumericVar *result)
4407 {
4408         NumericVar      tmp;
4409
4410         init_var(&tmp);
4411         set_var_from_var(var, &tmp);
4412
4413         trunc_var(&tmp, 0);
4414
4415         if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
4416                 add_var(&tmp, &const_one, &tmp);
4417
4418         set_var_from_var(&tmp, result);
4419         free_var(&tmp);
4420 }
4421
4422
4423 /*
4424  * floor_var() -
4425  *
4426  *      Return the largest integer equal to or less than the argument
4427  *      on variable level
4428  */
4429 static void
4430 floor_var(NumericVar *var, NumericVar *result)
4431 {
4432         NumericVar      tmp;
4433
4434         init_var(&tmp);
4435         set_var_from_var(var, &tmp);
4436
4437         trunc_var(&tmp, 0);
4438
4439         if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
4440                 sub_var(&tmp, &const_one, &tmp);
4441
4442         set_var_from_var(&tmp, result);
4443         free_var(&tmp);
4444 }
4445
4446
4447 /*
4448  * sqrt_var() -
4449  *
4450  *      Compute the square root of x using Newton's algorithm
4451  */
4452 static void
4453 sqrt_var(NumericVar *arg, NumericVar *result, int rscale)
4454 {
4455         NumericVar      tmp_arg;
4456         NumericVar      tmp_val;
4457         NumericVar      last_val;
4458         int                     local_rscale;
4459         int                     stat;
4460
4461         local_rscale = rscale + 8;
4462
4463         stat = cmp_var(arg, &const_zero);
4464         if (stat == 0)
4465         {
4466                 zero_var(result);
4467                 result->dscale = rscale;
4468                 return;
4469         }
4470
4471         /*
4472          * SQL2003 defines sqrt() in terms of power, so we need to emit the right
4473          * SQLSTATE error code if the operand is negative.
4474          */
4475         if (stat < 0)
4476                 ereport(ERROR,
4477                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
4478                                  errmsg("cannot take square root of a negative number")));
4479
4480         init_var(&tmp_arg);
4481         init_var(&tmp_val);
4482         init_var(&last_val);
4483
4484         /* Copy arg in case it is the same var as result */
4485         set_var_from_var(arg, &tmp_arg);
4486
4487         /*
4488          * Initialize the result to the first guess
4489          */
4490         alloc_var(result, 1);
4491         result->digits[0] = tmp_arg.digits[0] / 2;
4492         if (result->digits[0] == 0)
4493                 result->digits[0] = 1;
4494         result->weight = tmp_arg.weight / 2;
4495         result->sign = NUMERIC_POS;
4496
4497         set_var_from_var(result, &last_val);
4498
4499         for (;;)
4500         {
4501                 div_var(&tmp_arg, result, &tmp_val, local_rscale, true);
4502
4503                 add_var(result, &tmp_val, result);
4504                 mul_var(result, &const_zero_point_five, result, local_rscale);
4505
4506                 if (cmp_var(&last_val, result) == 0)
4507                         break;
4508                 set_var_from_var(result, &last_val);
4509         }
4510
4511         free_var(&last_val);
4512         free_var(&tmp_val);
4513         free_var(&tmp_arg);
4514
4515         /* Round to requested precision */
4516         round_var(result, rscale);
4517 }
4518
4519
4520 /*
4521  * exp_var() -
4522  *
4523  *      Raise e to the power of x
4524  */
4525 static void
4526 exp_var(NumericVar *arg, NumericVar *result, int rscale)
4527 {
4528         NumericVar      x;
4529         int                     xintval;
4530         bool            xneg = FALSE;
4531         int                     local_rscale;
4532
4533         /*----------
4534          * We separate the integral and fraction parts of x, then compute
4535          *              e^x = e^xint * e^xfrac
4536          * where e = exp(1) and e^xfrac = exp(xfrac) are computed by
4537          * exp_var_internal; the limited range of inputs allows that routine
4538          * to do a good job with a simple Taylor series.  Raising e^xint is
4539          * done by repeated multiplications in power_var_int.
4540          *----------
4541          */
4542         init_var(&x);
4543
4544         set_var_from_var(arg, &x);
4545
4546         if (x.sign == NUMERIC_NEG)
4547         {
4548                 xneg = TRUE;
4549                 x.sign = NUMERIC_POS;
4550         }
4551
4552         /* Extract the integer part, remove it from x */
4553         xintval = 0;
4554         while (x.weight >= 0)
4555         {
4556                 xintval *= NBASE;
4557                 if (x.ndigits > 0)
4558                 {
4559                         xintval += x.digits[0];
4560                         x.digits++;
4561                         x.ndigits--;
4562                 }
4563                 x.weight--;
4564                 /* Guard against overflow */
4565                 if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3)
4566                         ereport(ERROR,
4567                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
4568                                          errmsg("argument for function \"exp\" too big")));
4569         }
4570
4571         /* Select an appropriate scale for internal calculation */
4572         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
4573
4574         /* Compute e^xfrac */
4575         exp_var_internal(&x, result, local_rscale);
4576
4577         /* If there's an integer part, multiply by e^xint */
4578         if (xintval > 0)
4579         {
4580                 NumericVar      e;
4581
4582                 init_var(&e);
4583                 exp_var_internal(&const_one, &e, local_rscale);
4584                 power_var_int(&e, xintval, &e, local_rscale);
4585                 mul_var(&e, result, result, local_rscale);
4586                 free_var(&e);
4587         }
4588
4589         /* Compensate for input sign, and round to requested rscale */
4590         if (xneg)
4591                 div_var(&const_one, result, result, rscale, true);
4592         else
4593                 round_var(result, rscale);
4594
4595         free_var(&x);
4596 }
4597
4598
4599 /*
4600  * exp_var_internal() -
4601  *
4602  *      Raise e to the power of x, where 0 <= x <= 1
4603  *
4604  * NB: the result should be good to at least rscale digits, but it has
4605  * *not* been rounded off; the caller must do that if wanted.
4606  */
4607 static void
4608 exp_var_internal(NumericVar *arg, NumericVar *result, int rscale)
4609 {
4610         NumericVar      x;
4611         NumericVar      xpow;
4612         NumericVar      ifac;
4613         NumericVar      elem;
4614         NumericVar      ni;
4615         int                     ndiv2 = 0;
4616         int                     local_rscale;
4617
4618         init_var(&x);
4619         init_var(&xpow);
4620         init_var(&ifac);
4621         init_var(&elem);
4622         init_var(&ni);
4623
4624         set_var_from_var(arg, &x);
4625
4626         Assert(x.sign == NUMERIC_POS);
4627
4628         local_rscale = rscale + 8;
4629
4630         /* Reduce input into range 0 <= x <= 0.01 */
4631         while (cmp_var(&x, &const_zero_point_01) > 0)
4632         {
4633                 ndiv2++;
4634                 local_rscale++;
4635                 mul_var(&x, &const_zero_point_five, &x, x.dscale + 1);
4636         }
4637
4638         /*
4639          * Use the Taylor series
4640          *
4641          * exp(x) = 1 + x + x^2/2! + x^3/3! + ...
4642          *
4643          * Given the limited range of x, this should converge reasonably quickly.
4644          * We run the series until the terms fall below the local_rscale limit.
4645          */
4646         add_var(&const_one, &x, result);
4647         set_var_from_var(&x, &xpow);
4648         set_var_from_var(&const_one, &ifac);
4649         set_var_from_var(&const_one, &ni);
4650
4651         for (;;)
4652         {
4653                 add_var(&ni, &const_one, &ni);
4654                 mul_var(&xpow, &x, &xpow, local_rscale);
4655                 mul_var(&ifac, &ni, &ifac, 0);
4656                 div_var(&xpow, &ifac, &elem, local_rscale, true);
4657
4658                 if (elem.ndigits == 0)
4659                         break;
4660
4661                 add_var(result, &elem, result);
4662         }
4663
4664         /* Compensate for argument range reduction */
4665         while (ndiv2-- > 0)
4666                 mul_var(result, result, result, local_rscale);
4667
4668         free_var(&x);
4669         free_var(&xpow);
4670         free_var(&ifac);
4671         free_var(&elem);
4672         free_var(&ni);
4673 }
4674
4675
4676 /*
4677  * ln_var() -
4678  *
4679  *      Compute the natural log of x
4680  */
4681 static void
4682 ln_var(NumericVar *arg, NumericVar *result, int rscale)
4683 {
4684         NumericVar      x;
4685         NumericVar      xx;
4686         NumericVar      ni;
4687         NumericVar      elem;
4688         NumericVar      fact;
4689         int                     local_rscale;
4690         int                     cmp;
4691
4692         cmp = cmp_var(arg, &const_zero);
4693         if (cmp == 0)
4694                 ereport(ERROR,
4695                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
4696                                  errmsg("cannot take logarithm of zero")));
4697         else if (cmp < 0)
4698                 ereport(ERROR,
4699                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
4700                                  errmsg("cannot take logarithm of a negative number")));
4701
4702         local_rscale = rscale + 8;
4703
4704         init_var(&x);
4705         init_var(&xx);
4706         init_var(&ni);
4707         init_var(&elem);
4708         init_var(&fact);
4709
4710         set_var_from_var(arg, &x);
4711         set_var_from_var(&const_two, &fact);
4712
4713         /* Reduce input into range 0.9 < x < 1.1 */
4714         while (cmp_var(&x, &const_zero_point_nine) <= 0)
4715         {
4716                 local_rscale++;
4717                 sqrt_var(&x, &x, local_rscale);
4718                 mul_var(&fact, &const_two, &fact, 0);
4719         }
4720         while (cmp_var(&x, &const_one_point_one) >= 0)
4721         {
4722                 local_rscale++;
4723                 sqrt_var(&x, &x, local_rscale);
4724                 mul_var(&fact, &const_two, &fact, 0);
4725         }
4726
4727         /*
4728          * We use the Taylor series for 0.5 * ln((1+z)/(1-z)),
4729          *
4730          * z + z^3/3 + z^5/5 + ...
4731          *
4732          * where z = (x-1)/(x+1) is in the range (approximately) -0.053 .. 0.048
4733          * due to the above range-reduction of x.
4734          *
4735          * The convergence of this is not as fast as one would like, but is
4736          * tolerable given that z is small.
4737          */
4738         sub_var(&x, &const_one, result);
4739         add_var(&x, &const_one, &elem);
4740         div_var(result, &elem, result, local_rscale, true);
4741         set_var_from_var(result, &xx);
4742         mul_var(result, result, &x, local_rscale);
4743
4744         set_var_from_var(&const_one, &ni);
4745
4746         for (;;)
4747         {
4748                 add_var(&ni, &const_two, &ni);
4749                 mul_var(&xx, &x, &xx, local_rscale);
4750                 div_var(&xx, &ni, &elem, local_rscale, true);
4751
4752                 if (elem.ndigits == 0)
4753                         break;
4754
4755                 add_var(result, &elem, result);
4756
4757                 if (elem.weight < (result->weight - local_rscale * 2 / DEC_DIGITS))
4758                         break;
4759         }
4760
4761         /* Compensate for argument range reduction, round to requested rscale */
4762         mul_var(result, &fact, result, rscale);
4763
4764         free_var(&x);
4765         free_var(&xx);
4766         free_var(&ni);
4767         free_var(&elem);
4768         free_var(&fact);
4769 }
4770
4771
4772 /*
4773  * log_var() -
4774  *
4775  *      Compute the logarithm of num in a given base.
4776  *
4777  *      Note: this routine chooses dscale of the result.
4778  */
4779 static void
4780 log_var(NumericVar *base, NumericVar *num, NumericVar *result)
4781 {
4782         NumericVar      ln_base;
4783         NumericVar      ln_num;
4784         int                     dec_digits;
4785         int                     rscale;
4786         int                     local_rscale;
4787
4788         init_var(&ln_base);
4789         init_var(&ln_num);
4790
4791         /* Set scale for ln() calculations --- compare numeric_ln() */
4792
4793         /* Approx decimal digits before decimal point */
4794         dec_digits = (num->weight + 1) * DEC_DIGITS;
4795
4796         if (dec_digits > 1)
4797                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
4798         else if (dec_digits < 1)
4799                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
4800         else
4801                 rscale = NUMERIC_MIN_SIG_DIGITS;
4802
4803         rscale = Max(rscale, base->dscale);
4804         rscale = Max(rscale, num->dscale);
4805         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4806         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4807
4808         local_rscale = rscale + 8;
4809
4810         /* Form natural logarithms */
4811         ln_var(base, &ln_base, local_rscale);
4812         ln_var(num, &ln_num, local_rscale);
4813
4814         ln_base.dscale = rscale;
4815         ln_num.dscale = rscale;
4816
4817         /* Select scale for division result */
4818         rscale = select_div_scale(&ln_num, &ln_base);
4819
4820         div_var(&ln_num, &ln_base, result, rscale, true);
4821
4822         free_var(&ln_num);
4823         free_var(&ln_base);
4824 }
4825
4826
4827 /*
4828  * power_var() -
4829  *
4830  *      Raise base to the power of exp
4831  *
4832  *      Note: this routine chooses dscale of the result.
4833  */
4834 static void
4835 power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
4836 {
4837         NumericVar      ln_base;
4838         NumericVar      ln_num;
4839         int                     dec_digits;
4840         int                     rscale;
4841         int                     local_rscale;
4842         double          val;
4843
4844         /* If exp can be represented as an integer, use power_var_int */
4845         if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1)
4846         {
4847                 /* exact integer, but does it fit in int? */
4848                 NumericVar      x;
4849                 int64           expval64;
4850
4851                 /* must copy because numericvar_to_int8() scribbles on input */
4852                 init_var(&x);
4853                 set_var_from_var(exp, &x);
4854                 if (numericvar_to_int8(&x, &expval64))
4855                 {
4856                         int                     expval = (int) expval64;
4857
4858                         /* Test for overflow by reverse-conversion. */
4859                         if ((int64) expval == expval64)
4860                         {
4861                                 /* Okay, select rscale */
4862                                 rscale = NUMERIC_MIN_SIG_DIGITS;
4863                                 rscale = Max(rscale, base->dscale);
4864                                 rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4865                                 rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4866
4867                                 power_var_int(base, expval, result, rscale);
4868
4869                                 free_var(&x);
4870                                 return;
4871                         }
4872                 }
4873                 free_var(&x);
4874         }
4875
4876         init_var(&ln_base);
4877         init_var(&ln_num);
4878
4879         /* Set scale for ln() calculation --- need extra accuracy here */
4880
4881         /* Approx decimal digits before decimal point */
4882         dec_digits = (base->weight + 1) * DEC_DIGITS;
4883
4884         if (dec_digits > 1)
4885                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(dec_digits - 1);
4886         else if (dec_digits < 1)
4887                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(1 - dec_digits);
4888         else
4889                 rscale = NUMERIC_MIN_SIG_DIGITS * 2;
4890
4891         rscale = Max(rscale, base->dscale * 2);
4892         rscale = Max(rscale, exp->dscale * 2);
4893         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2);
4894         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2);
4895
4896         local_rscale = rscale + 8;
4897
4898         ln_var(base, &ln_base, local_rscale);
4899
4900         mul_var(&ln_base, exp, &ln_num, local_rscale);
4901
4902         /* Set scale for exp() -- compare numeric_exp() */
4903
4904         /* convert input to float8, ignoring overflow */
4905         val = numericvar_to_double_no_overflow(&ln_num);
4906
4907         /*
4908          * log10(result) = num * log10(e), so this is approximately the weight:
4909          */
4910         val *= 0.434294481903252;
4911
4912         /* limit to something that won't cause integer overflow */
4913         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
4914         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
4915
4916         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
4917         rscale = Max(rscale, base->dscale);
4918         rscale = Max(rscale, exp->dscale);
4919         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4920         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4921
4922         exp_var(&ln_num, result, rscale);
4923
4924         free_var(&ln_num);
4925         free_var(&ln_base);
4926 }
4927
4928 /*
4929  * power_var_int() -
4930  *
4931  *      Raise base to the power of exp, where exp is an integer.
4932  */
4933 static void
4934 power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale)
4935 {
4936         bool            neg;
4937         NumericVar      base_prod;
4938         int                     local_rscale;
4939
4940         /* Detect some special cases, particularly 0^0. */
4941
4942         switch (exp)
4943         {
4944                 case 0:
4945                         if (base->ndigits == 0)
4946                                 ereport(ERROR,
4947                                                 (errcode(ERRCODE_FLOATING_POINT_EXCEPTION),
4948                                                  errmsg("zero raised to zero is undefined")));
4949                         set_var_from_var(&const_one, result);
4950                         result->dscale = rscale;        /* no need to round */
4951                         return;
4952                 case 1:
4953                         set_var_from_var(base, result);
4954                         round_var(result, rscale);
4955                         return;
4956                 case -1:
4957                         div_var(&const_one, base, result, rscale, true);
4958                         return;
4959                 case 2:
4960                         mul_var(base, base, result, rscale);
4961                         return;
4962                 default:
4963                         break;
4964         }
4965
4966         /*
4967          * The general case repeatedly multiplies base according to the bit
4968          * pattern of exp.      We do the multiplications with some extra precision.
4969          */
4970         neg = (exp < 0);
4971         exp = Abs(exp);
4972
4973         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
4974
4975         init_var(&base_prod);
4976         set_var_from_var(base, &base_prod);
4977
4978         if (exp & 1)
4979                 set_var_from_var(base, result);
4980         else
4981                 set_var_from_var(&const_one, result);
4982
4983         while ((exp >>= 1) > 0)
4984         {
4985                 mul_var(&base_prod, &base_prod, &base_prod, local_rscale);
4986                 if (exp & 1)
4987                         mul_var(&base_prod, result, result, local_rscale);
4988         }
4989
4990         free_var(&base_prod);
4991
4992         /* Compensate for input sign, and round to requested rscale */
4993         if (neg)
4994                 div_var(&const_one, result, result, rscale, true);
4995         else
4996                 round_var(result, rscale);
4997 }
4998
4999
5000 /* ----------------------------------------------------------------------
5001  *
5002  * Following are the lowest level functions that operate unsigned
5003  * on the variable level
5004  *
5005  * ----------------------------------------------------------------------
5006  */
5007
5008
5009 /* ----------
5010  * cmp_abs() -
5011  *
5012  *      Compare the absolute values of var1 and var2
5013  *      Returns:        -1 for ABS(var1) < ABS(var2)
5014  *                              0  for ABS(var1) == ABS(var2)
5015  *                              1  for ABS(var1) > ABS(var2)
5016  * ----------
5017  */
5018 static int
5019 cmp_abs(NumericVar *var1, NumericVar *var2)
5020 {
5021         return cmp_abs_common(var1->digits, var1->ndigits, var1->weight,
5022                                                   var2->digits, var2->ndigits, var2->weight);
5023 }
5024
5025 /* ----------
5026  * cmp_abs_common() -
5027  *
5028  *      Main routine of cmp_abs(). This function can be used by both
5029  *      NumericVar and Numeric.
5030  * ----------
5031  */
5032 static int
5033 cmp_abs_common(const NumericDigit *var1digits, int var1ndigits, int var1weight,
5034                          const NumericDigit *var2digits, int var2ndigits, int var2weight)
5035 {
5036         int                     i1 = 0;
5037         int                     i2 = 0;
5038
5039         /* Check any digits before the first common digit */
5040
5041         while (var1weight > var2weight && i1 < var1ndigits)
5042         {
5043                 if (var1digits[i1++] != 0)
5044                         return 1;
5045                 var1weight--;
5046         }
5047         while (var2weight > var1weight && i2 < var2ndigits)
5048         {
5049                 if (var2digits[i2++] != 0)
5050                         return -1;
5051                 var2weight--;
5052         }
5053
5054         /* At this point, either w1 == w2 or we've run out of digits */
5055
5056         if (var1weight == var2weight)
5057         {
5058                 while (i1 < var1ndigits && i2 < var2ndigits)
5059                 {
5060                         int                     stat = var1digits[i1++] - var2digits[i2++];
5061
5062                         if (stat)
5063                         {
5064                                 if (stat > 0)
5065                                         return 1;
5066                                 return -1;
5067                         }
5068                 }
5069         }
5070
5071         /*
5072          * At this point, we've run out of digits on one side or the other; so any
5073          * remaining nonzero digits imply that side is larger
5074          */
5075         while (i1 < var1ndigits)
5076         {
5077                 if (var1digits[i1++] != 0)
5078                         return 1;
5079         }
5080         while (i2 < var2ndigits)
5081         {
5082                 if (var2digits[i2++] != 0)
5083                         return -1;
5084         }
5085
5086         return 0;
5087 }
5088
5089
5090 /*
5091  * add_abs() -
5092  *
5093  *      Add the absolute values of two variables into result.
5094  *      result might point to one of the operands without danger.
5095  */
5096 static void
5097 add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5098 {
5099         NumericDigit *res_buf;
5100         NumericDigit *res_digits;
5101         int                     res_ndigits;
5102         int                     res_weight;
5103         int                     res_rscale,
5104                                 rscale1,
5105                                 rscale2;
5106         int                     res_dscale;
5107         int                     i,
5108                                 i1,
5109                                 i2;
5110         int                     carry = 0;
5111
5112         /* copy these values into local vars for speed in inner loop */
5113         int                     var1ndigits = var1->ndigits;
5114         int                     var2ndigits = var2->ndigits;
5115         NumericDigit *var1digits = var1->digits;
5116         NumericDigit *var2digits = var2->digits;
5117
5118         res_weight = Max(var1->weight, var2->weight) + 1;
5119
5120         res_dscale = Max(var1->dscale, var2->dscale);
5121
5122         /* Note: here we are figuring rscale in base-NBASE digits */
5123         rscale1 = var1->ndigits - var1->weight - 1;
5124         rscale2 = var2->ndigits - var2->weight - 1;
5125         res_rscale = Max(rscale1, rscale2);
5126
5127         res_ndigits = res_rscale + res_weight + 1;
5128         if (res_ndigits <= 0)
5129                 res_ndigits = 1;
5130
5131         res_buf = digitbuf_alloc(res_ndigits + 1);
5132         res_buf[0] = 0;                         /* spare digit for later rounding */
5133         res_digits = res_buf + 1;
5134
5135         i1 = res_rscale + var1->weight + 1;
5136         i2 = res_rscale + var2->weight + 1;
5137         for (i = res_ndigits - 1; i >= 0; i--)
5138         {
5139                 i1--;
5140                 i2--;
5141                 if (i1 >= 0 && i1 < var1ndigits)
5142                         carry += var1digits[i1];
5143                 if (i2 >= 0 && i2 < var2ndigits)
5144                         carry += var2digits[i2];
5145
5146                 if (carry >= NBASE)
5147                 {
5148                         res_digits[i] = carry - NBASE;
5149                         carry = 1;
5150                 }
5151                 else
5152                 {
5153                         res_digits[i] = carry;
5154                         carry = 0;
5155                 }
5156         }
5157
5158         Assert(carry == 0);                     /* else we failed to allow for carry out */
5159
5160         digitbuf_free(result->buf);
5161         result->ndigits = res_ndigits;
5162         result->buf = res_buf;
5163         result->digits = res_digits;
5164         result->weight = res_weight;
5165         result->dscale = res_dscale;
5166
5167         /* Remove leading/trailing zeroes */
5168         strip_var(result);
5169 }
5170
5171
5172 /*
5173  * sub_abs()
5174  *
5175  *      Subtract the absolute value of var2 from the absolute value of var1
5176  *      and store in result. result might point to one of the operands
5177  *      without danger.
5178  *
5179  *      ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!!
5180  */
5181 static void
5182 sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5183 {
5184         NumericDigit *res_buf;
5185         NumericDigit *res_digits;
5186         int                     res_ndigits;
5187         int                     res_weight;
5188         int                     res_rscale,
5189                                 rscale1,
5190                                 rscale2;
5191         int                     res_dscale;
5192         int                     i,
5193                                 i1,
5194                                 i2;
5195         int                     borrow = 0;
5196
5197         /* copy these values into local vars for speed in inner loop */
5198         int                     var1ndigits = var1->ndigits;
5199         int                     var2ndigits = var2->ndigits;
5200         NumericDigit *var1digits = var1->digits;
5201         NumericDigit *var2digits = var2->digits;
5202
5203         res_weight = var1->weight;
5204
5205         res_dscale = Max(var1->dscale, var2->dscale);
5206
5207         /* Note: here we are figuring rscale in base-NBASE digits */
5208         rscale1 = var1->ndigits - var1->weight - 1;
5209         rscale2 = var2->ndigits - var2->weight - 1;
5210         res_rscale = Max(rscale1, rscale2);
5211
5212         res_ndigits = res_rscale + res_weight + 1;
5213         if (res_ndigits <= 0)
5214                 res_ndigits = 1;
5215
5216         res_buf = digitbuf_alloc(res_ndigits + 1);
5217         res_buf[0] = 0;                         /* spare digit for later rounding */
5218         res_digits = res_buf + 1;
5219
5220         i1 = res_rscale + var1->weight + 1;
5221         i2 = res_rscale + var2->weight + 1;
5222         for (i = res_ndigits - 1; i >= 0; i--)
5223         {
5224                 i1--;
5225                 i2--;
5226                 if (i1 >= 0 && i1 < var1ndigits)
5227                         borrow += var1digits[i1];
5228                 if (i2 >= 0 && i2 < var2ndigits)
5229                         borrow -= var2digits[i2];
5230
5231                 if (borrow < 0)
5232                 {
5233                         res_digits[i] = borrow + NBASE;
5234                         borrow = -1;
5235                 }
5236                 else
5237                 {
5238                         res_digits[i] = borrow;
5239                         borrow = 0;
5240                 }
5241         }
5242
5243         Assert(borrow == 0);            /* else caller gave us var1 < var2 */
5244
5245         digitbuf_free(result->buf);
5246         result->ndigits = res_ndigits;
5247         result->buf = res_buf;
5248         result->digits = res_digits;
5249         result->weight = res_weight;
5250         result->dscale = res_dscale;
5251
5252         /* Remove leading/trailing zeroes */
5253         strip_var(result);
5254 }
5255
5256 /*
5257  * round_var
5258  *
5259  * Round the value of a variable to no more than rscale decimal digits
5260  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
5261  * rounding before the decimal point.
5262  */
5263 static void
5264 round_var(NumericVar *var, int rscale)
5265 {
5266         NumericDigit *digits = var->digits;
5267         int                     di;
5268         int                     ndigits;
5269         int                     carry;
5270
5271         var->dscale = rscale;
5272
5273         /* decimal digits wanted */
5274         di = (var->weight + 1) * DEC_DIGITS + rscale;
5275
5276         /*
5277          * If di = 0, the value loses all digits, but could round up to 1 if its
5278          * first extra digit is >= 5.  If di < 0 the result must be 0.
5279          */
5280         if (di < 0)
5281         {
5282                 var->ndigits = 0;
5283                 var->weight = 0;
5284                 var->sign = NUMERIC_POS;
5285         }
5286         else
5287         {
5288                 /* NBASE digits wanted */
5289                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5290
5291                 /* 0, or number of decimal digits to keep in last NBASE digit */
5292                 di %= DEC_DIGITS;
5293
5294                 if (ndigits < var->ndigits ||
5295                         (ndigits == var->ndigits && di > 0))
5296                 {
5297                         var->ndigits = ndigits;
5298
5299 #if DEC_DIGITS == 1
5300                         /* di must be zero */
5301                         carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5302 #else
5303                         if (di == 0)
5304                                 carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5305                         else
5306                         {
5307                                 /* Must round within last NBASE digit */
5308                                 int                     extra,
5309                                                         pow10;
5310
5311 #if DEC_DIGITS == 4
5312                                 pow10 = round_powers[di];
5313 #elif DEC_DIGITS == 2
5314                                 pow10 = 10;
5315 #else
5316 #error unsupported NBASE
5317 #endif
5318                                 extra = digits[--ndigits] % pow10;
5319                                 digits[ndigits] -= extra;
5320                                 carry = 0;
5321                                 if (extra >= pow10 / 2)
5322                                 {
5323                                         pow10 += digits[ndigits];
5324                                         if (pow10 >= NBASE)
5325                                         {
5326                                                 pow10 -= NBASE;
5327                                                 carry = 1;
5328                                         }
5329                                         digits[ndigits] = pow10;
5330                                 }
5331                         }
5332 #endif
5333
5334                         /* Propagate carry if needed */
5335                         while (carry)
5336                         {
5337                                 carry += digits[--ndigits];
5338                                 if (carry >= NBASE)
5339                                 {
5340                                         digits[ndigits] = carry - NBASE;
5341                                         carry = 1;
5342                                 }
5343                                 else
5344                                 {
5345                                         digits[ndigits] = carry;
5346                                         carry = 0;
5347                                 }
5348                         }
5349
5350                         if (ndigits < 0)
5351                         {
5352                                 Assert(ndigits == -1);  /* better not have added > 1 digit */
5353                                 Assert(var->digits > var->buf);
5354                                 var->digits--;
5355                                 var->ndigits++;
5356                                 var->weight++;
5357                         }
5358                 }
5359         }
5360 }
5361
5362 /*
5363  * trunc_var
5364  *
5365  * Truncate the value of a variable at rscale decimal digits after the
5366  * decimal point.  NOTE: we allow rscale < 0 here, implying
5367  * truncation before the decimal point.
5368  */
5369 static void
5370 trunc_var(NumericVar *var, int rscale)
5371 {
5372         int                     di;
5373         int                     ndigits;
5374
5375         var->dscale = rscale;
5376
5377         /* decimal digits wanted */
5378         di = (var->weight + 1) * DEC_DIGITS + rscale;
5379
5380         /*
5381          * If di <= 0, the value loses all digits.
5382          */
5383         if (di <= 0)
5384         {
5385                 var->ndigits = 0;
5386                 var->weight = 0;
5387                 var->sign = NUMERIC_POS;
5388         }
5389         else
5390         {
5391                 /* NBASE digits wanted */
5392                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5393
5394                 if (ndigits <= var->ndigits)
5395                 {
5396                         var->ndigits = ndigits;
5397
5398 #if DEC_DIGITS == 1
5399                         /* no within-digit stuff to worry about */
5400 #else
5401                         /* 0, or number of decimal digits to keep in last NBASE digit */
5402                         di %= DEC_DIGITS;
5403
5404                         if (di > 0)
5405                         {
5406                                 /* Must truncate within last NBASE digit */
5407                                 NumericDigit *digits = var->digits;
5408                                 int                     extra,
5409                                                         pow10;
5410
5411 #if DEC_DIGITS == 4
5412                                 pow10 = round_powers[di];
5413 #elif DEC_DIGITS == 2
5414                                 pow10 = 10;
5415 #else
5416 #error unsupported NBASE
5417 #endif
5418                                 extra = digits[--ndigits] % pow10;
5419                                 digits[ndigits] -= extra;
5420                         }
5421 #endif
5422                 }
5423         }
5424 }
5425
5426 /*
5427  * strip_var
5428  *
5429  * Strip any leading and trailing zeroes from a numeric variable
5430  */
5431 static void
5432 strip_var(NumericVar *var)
5433 {
5434         NumericDigit *digits = var->digits;
5435         int                     ndigits = var->ndigits;
5436
5437         /* Strip leading zeroes */
5438         while (ndigits > 0 && *digits == 0)
5439         {
5440                 digits++;
5441                 var->weight--;
5442                 ndigits--;
5443         }
5444
5445         /* Strip trailing zeroes */
5446         while (ndigits > 0 && digits[ndigits - 1] == 0)
5447                 ndigits--;
5448
5449         /* If it's zero, normalize the sign and weight */
5450         if (ndigits == 0)
5451         {
5452                 var->sign = NUMERIC_POS;
5453                 var->weight = 0;
5454         }
5455
5456         var->digits = digits;
5457         var->ndigits = ndigits;
5458 }