]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/numeric.c
Tweak the API for per-datatype typmodin functions so that they are passed
[postgresql] / src / backend / utils / adt / numeric.c
1 /*-------------------------------------------------------------------------
2  *
3  * numeric.c
4  *        An exact numeric data type for the Postgres database system
5  *
6  * Original coding 1998, Jan Wieck.  Heavily revised 2003, Tom Lane.
7  *
8  * Many of the algorithmic ideas are borrowed from David M. Smith's "FM"
9  * multiple-precision math library, most recently published as Algorithm
10  * 786: Multiple-Precision Complex Arithmetic and Functions, ACM
11  * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998,
12  * pages 359-367.
13  *
14  * Copyright (c) 1998-2007, PostgreSQL Global Development Group
15  *
16  * IDENTIFICATION
17  *        $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.105 2007/06/15 20:56:50 tgl Exp $
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include <ctype.h>
25 #include <float.h>
26 #include <limits.h>
27 #include <math.h>
28
29 #include "access/hash.h"
30 #include "catalog/pg_type.h"
31 #include "libpq/pqformat.h"
32 #include "miscadmin.h"
33 #include "utils/array.h"
34 #include "utils/builtins.h"
35 #include "utils/int8.h"
36 #include "utils/numeric.h"
37
38 /* ----------
39  * Uncomment the following to enable compilation of dump_numeric()
40  * and dump_var() and to get a dump of any result produced by make_result().
41  * ----------
42 #define NUMERIC_DEBUG
43  */
44
45
46 /* ----------
47  * Local data types
48  *
49  * Numeric values are represented in a base-NBASE floating point format.
50  * Each "digit" ranges from 0 to NBASE-1.  The type NumericDigit is signed
51  * and wide enough to store a digit.  We assume that NBASE*NBASE can fit in
52  * an int.      Although the purely calculational routines could handle any even
53  * NBASE that's less than sqrt(INT_MAX), in practice we are only interested
54  * in NBASE a power of ten, so that I/O conversions and decimal rounding
55  * are easy.  Also, it's actually more efficient if NBASE is rather less than
56  * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var to
57  * postpone processing carries.
58  * ----------
59  */
60
61 #if 0
62 #define NBASE           10
63 #define HALF_NBASE      5
64 #define DEC_DIGITS      1                       /* decimal digits per NBASE digit */
65 #define MUL_GUARD_DIGITS        4       /* these are measured in NBASE digits */
66 #define DIV_GUARD_DIGITS        8
67
68 typedef signed char NumericDigit;
69 #endif
70
71 #if 0
72 #define NBASE           100
73 #define HALF_NBASE      50
74 #define DEC_DIGITS      2                       /* decimal digits per NBASE digit */
75 #define MUL_GUARD_DIGITS        3       /* these are measured in NBASE digits */
76 #define DIV_GUARD_DIGITS        6
77
78 typedef signed char NumericDigit;
79 #endif
80
81 #if 1
82 #define NBASE           10000
83 #define HALF_NBASE      5000
84 #define DEC_DIGITS      4                       /* decimal digits per NBASE digit */
85 #define MUL_GUARD_DIGITS        2       /* these are measured in NBASE digits */
86 #define DIV_GUARD_DIGITS        4
87
88 typedef int16 NumericDigit;
89 #endif
90
91
92 /* ----------
93  * The value represented by a NumericVar is determined by the sign, weight,
94  * ndigits, and digits[] array.
95  * Note: the first digit of a NumericVar's value is assumed to be multiplied
96  * by NBASE ** weight.  Another way to say it is that there are weight+1
97  * digits before the decimal point.  It is possible to have weight < 0.
98  *
99  * buf points at the physical start of the palloc'd digit buffer for the
100  * NumericVar.  digits points at the first digit in actual use (the one
101  * with the specified weight).  We normally leave an unused digit or two
102  * (preset to zeroes) between buf and digits, so that there is room to store
103  * a carry out of the top digit without special pushups.  We just need to
104  * decrement digits (and increment weight) to make room for the carry digit.
105  * (There is no such extra space in a numeric value stored in the database,
106  * only in a NumericVar in memory.)
107  *
108  * If buf is NULL then the digit buffer isn't actually palloc'd and should
109  * not be freed --- see the constants below for an example.
110  *
111  * dscale, or display scale, is the nominal precision expressed as number
112  * of digits after the decimal point (it must always be >= 0 at present).
113  * dscale may be more than the number of physically stored fractional digits,
114  * implying that we have suppressed storage of significant trailing zeroes.
115  * It should never be less than the number of stored digits, since that would
116  * imply hiding digits that are present.  NOTE that dscale is always expressed
117  * in *decimal* digits, and so it may correspond to a fractional number of
118  * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits.
119  *
120  * rscale, or result scale, is the target precision for a computation.
121  * Like dscale it is expressed as number of *decimal* digits after the decimal
122  * point, and is always >= 0 at present.
123  * Note that rscale is not stored in variables --- it's figured on-the-fly
124  * from the dscales of the inputs.
125  *
126  * NB: All the variable-level functions are written in a style that makes it
127  * possible to give one and the same variable as argument and destination.
128  * This is feasible because the digit buffer is separate from the variable.
129  * ----------
130  */
131 typedef struct NumericVar
132 {
133         int                     ndigits;                /* # of digits in digits[] - can be 0! */
134         int                     weight;                 /* weight of first digit */
135         int                     sign;                   /* NUMERIC_POS, NUMERIC_NEG, or NUMERIC_NAN */
136         int                     dscale;                 /* display scale */
137         NumericDigit *buf;                      /* start of palloc'd space for digits[] */
138         NumericDigit *digits;           /* base-NBASE digits */
139 } NumericVar;
140
141
142 /* ----------
143  * Some preinitialized constants
144  * ----------
145  */
146 static NumericDigit const_zero_data[1] = {0};
147 static NumericVar const_zero =
148 {0, 0, NUMERIC_POS, 0, NULL, const_zero_data};
149
150 static NumericDigit const_one_data[1] = {1};
151 static NumericVar const_one =
152 {1, 0, NUMERIC_POS, 0, NULL, const_one_data};
153
154 static NumericDigit const_two_data[1] = {2};
155 static NumericVar const_two =
156 {1, 0, NUMERIC_POS, 0, NULL, const_two_data};
157
158 #if DEC_DIGITS == 4
159 static NumericDigit const_zero_point_five_data[1] = {5000};
160 #elif DEC_DIGITS == 2
161 static NumericDigit const_zero_point_five_data[1] = {50};
162 #elif DEC_DIGITS == 1
163 static NumericDigit const_zero_point_five_data[1] = {5};
164 #endif
165 static NumericVar const_zero_point_five =
166 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data};
167
168 #if DEC_DIGITS == 4
169 static NumericDigit const_zero_point_nine_data[1] = {9000};
170 #elif DEC_DIGITS == 2
171 static NumericDigit const_zero_point_nine_data[1] = {90};
172 #elif DEC_DIGITS == 1
173 static NumericDigit const_zero_point_nine_data[1] = {9};
174 #endif
175 static NumericVar const_zero_point_nine =
176 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data};
177
178 #if DEC_DIGITS == 4
179 static NumericDigit const_zero_point_01_data[1] = {100};
180 static NumericVar const_zero_point_01 =
181 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
182 #elif DEC_DIGITS == 2
183 static NumericDigit const_zero_point_01_data[1] = {1};
184 static NumericVar const_zero_point_01 =
185 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
186 #elif DEC_DIGITS == 1
187 static NumericDigit const_zero_point_01_data[1] = {1};
188 static NumericVar const_zero_point_01 =
189 {1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
190 #endif
191
192 #if DEC_DIGITS == 4
193 static NumericDigit const_one_point_one_data[2] = {1, 1000};
194 #elif DEC_DIGITS == 2
195 static NumericDigit const_one_point_one_data[2] = {1, 10};
196 #elif DEC_DIGITS == 1
197 static NumericDigit const_one_point_one_data[2] = {1, 1};
198 #endif
199 static NumericVar const_one_point_one =
200 {2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data};
201
202 static NumericVar const_nan =
203 {0, 0, NUMERIC_NAN, 0, NULL, NULL};
204
205 #if DEC_DIGITS == 4
206 static const int round_powers[4] = {0, 1000, 100, 10};
207 #endif
208
209
210 /* ----------
211  * Local functions
212  * ----------
213  */
214
215 #ifdef NUMERIC_DEBUG
216 static void dump_numeric(const char *str, Numeric num);
217 static void dump_var(const char *str, NumericVar *var);
218 #else
219 #define dump_numeric(s,n)
220 #define dump_var(s,v)
221 #endif
222
223 #define digitbuf_alloc(ndigits)  \
224         ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit)))
225 #define digitbuf_free(buf)      \
226         do { \
227                  if ((buf) != NULL) \
228                          pfree(buf); \
229         } while (0)
230
231 #define init_var(v)             MemSetAligned(v, 0, sizeof(NumericVar))
232
233 #define NUMERIC_DIGITS(num) ((NumericDigit *)(num)->n_data)
234 #define NUMERIC_NDIGITS(num) \
235         ((VARSIZE(num) - NUMERIC_HDRSZ) / sizeof(NumericDigit))
236
237 static void alloc_var(NumericVar *var, int ndigits);
238 static void free_var(NumericVar *var);
239 static void zero_var(NumericVar *var);
240
241 static void set_var_from_str(const char *str, NumericVar *dest);
242 static void set_var_from_num(Numeric value, NumericVar *dest);
243 static void set_var_from_var(NumericVar *value, NumericVar *dest);
244 static char *get_str_from_var(NumericVar *var, int dscale);
245
246 static Numeric make_result(NumericVar *var);
247
248 static void apply_typmod(NumericVar *var, int32 typmod);
249
250 static int32 numericvar_to_int4(NumericVar *var);
251 static bool numericvar_to_int8(NumericVar *var, int64 *result);
252 static void int8_to_numericvar(int64 val, NumericVar *var);
253 static double numeric_to_double_no_overflow(Numeric num);
254 static double numericvar_to_double_no_overflow(NumericVar *var);
255
256 static int      cmp_numerics(Numeric num1, Numeric num2);
257 static int      cmp_var(NumericVar *var1, NumericVar *var2);
258 static int cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
259                            int var1weight, int var1sign,
260                            const NumericDigit *var2digits, int var2ndigits,
261                            int var2weight, int var2sign);
262 static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
263 static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
264 static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
265                 int rscale);
266 static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
267                 int rscale, bool round);
268 static int      select_div_scale(NumericVar *var1, NumericVar *var2);
269 static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
270 static void ceil_var(NumericVar *var, NumericVar *result);
271 static void floor_var(NumericVar *var, NumericVar *result);
272
273 static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale);
274 static void exp_var(NumericVar *arg, NumericVar *result, int rscale);
275 static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale);
276 static void ln_var(NumericVar *arg, NumericVar *result, int rscale);
277 static void log_var(NumericVar *base, NumericVar *num, NumericVar *result);
278 static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result);
279 static void power_var_int(NumericVar *base, int exp, NumericVar *result,
280                           int rscale);
281
282 static int      cmp_abs(NumericVar *var1, NumericVar *var2);
283 static int cmp_abs_common(const NumericDigit *var1digits, int var1ndigits,
284                            int var1weight,
285                            const NumericDigit *var2digits, int var2ndigits,
286                            int var2weight);
287 static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
288 static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
289 static void round_var(NumericVar *var, int rscale);
290 static void trunc_var(NumericVar *var, int rscale);
291 static void strip_var(NumericVar *var);
292 static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
293                            NumericVar *count_var, NumericVar *result_var);
294
295
296 /* ----------------------------------------------------------------------
297  *
298  * Input-, output- and rounding-functions
299  *
300  * ----------------------------------------------------------------------
301  */
302
303
304 /*
305  * numeric_in() -
306  *
307  *      Input function for numeric data type
308  */
309 Datum
310 numeric_in(PG_FUNCTION_ARGS)
311 {
312         char       *str = PG_GETARG_CSTRING(0);
313
314 #ifdef NOT_USED
315         Oid                     typelem = PG_GETARG_OID(1);
316 #endif
317         int32           typmod = PG_GETARG_INT32(2);
318         NumericVar      value;
319         Numeric         res;
320
321         /*
322          * Check for NaN
323          */
324         if (pg_strcasecmp(str, "NaN") == 0)
325                 PG_RETURN_NUMERIC(make_result(&const_nan));
326
327         /*
328          * Use set_var_from_str() to parse the input string and return it in the
329          * packed DB storage format
330          */
331         init_var(&value);
332         set_var_from_str(str, &value);
333
334         apply_typmod(&value, typmod);
335
336         res = make_result(&value);
337         free_var(&value);
338
339         PG_RETURN_NUMERIC(res);
340 }
341
342
343 /*
344  * numeric_out() -
345  *
346  *      Output function for numeric data type
347  */
348 Datum
349 numeric_out(PG_FUNCTION_ARGS)
350 {
351         Numeric         num = PG_GETARG_NUMERIC(0);
352         NumericVar      x;
353         char       *str;
354
355         /*
356          * Handle NaN
357          */
358         if (NUMERIC_IS_NAN(num))
359                 PG_RETURN_CSTRING(pstrdup("NaN"));
360
361         /*
362          * Get the number in the variable format.
363          *
364          * Even if we didn't need to change format, we'd still need to copy the
365          * value to have a modifiable copy for rounding.  set_var_from_num() also
366          * guarantees there is extra digit space in case we produce a carry out
367          * from rounding.
368          */
369         init_var(&x);
370         set_var_from_num(num, &x);
371
372         str = get_str_from_var(&x, x.dscale);
373
374         free_var(&x);
375
376         PG_RETURN_CSTRING(str);
377 }
378
379 /*
380  *              numeric_recv                    - converts external binary format to numeric
381  *
382  * External format is a sequence of int16's:
383  * ndigits, weight, sign, dscale, NumericDigits.
384  */
385 Datum
386 numeric_recv(PG_FUNCTION_ARGS)
387 {
388         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
389
390 #ifdef NOT_USED
391         Oid                     typelem = PG_GETARG_OID(1);
392 #endif
393         int32           typmod = PG_GETARG_INT32(2);
394         NumericVar      value;
395         Numeric         res;
396         int                     len,
397                                 i;
398
399         init_var(&value);
400
401         len = (uint16) pq_getmsgint(buf, sizeof(uint16));
402         if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE)
403                 ereport(ERROR,
404                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
405                                  errmsg("invalid length in external \"numeric\" value")));
406
407         alloc_var(&value, len);
408
409         value.weight = (int16) pq_getmsgint(buf, sizeof(int16));
410         value.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
411         if (!(value.sign == NUMERIC_POS ||
412                   value.sign == NUMERIC_NEG ||
413                   value.sign == NUMERIC_NAN))
414                 ereport(ERROR,
415                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
416                                  errmsg("invalid sign in external \"numeric\" value")));
417
418         value.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
419         for (i = 0; i < len; i++)
420         {
421                 NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
422
423                 if (d < 0 || d >= NBASE)
424                         ereport(ERROR,
425                                         (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
426                                          errmsg("invalid digit in external \"numeric\" value")));
427                 value.digits[i] = d;
428         }
429
430         apply_typmod(&value, typmod);
431
432         res = make_result(&value);
433         free_var(&value);
434
435         PG_RETURN_NUMERIC(res);
436 }
437
438 /*
439  *              numeric_send                    - converts numeric to binary format
440  */
441 Datum
442 numeric_send(PG_FUNCTION_ARGS)
443 {
444         Numeric         num = PG_GETARG_NUMERIC(0);
445         NumericVar      x;
446         StringInfoData buf;
447         int                     i;
448
449         init_var(&x);
450         set_var_from_num(num, &x);
451
452         pq_begintypsend(&buf);
453
454         pq_sendint(&buf, x.ndigits, sizeof(int16));
455         pq_sendint(&buf, x.weight, sizeof(int16));
456         pq_sendint(&buf, x.sign, sizeof(int16));
457         pq_sendint(&buf, x.dscale, sizeof(int16));
458         for (i = 0; i < x.ndigits; i++)
459                 pq_sendint(&buf, x.digits[i], sizeof(NumericDigit));
460
461         free_var(&x);
462
463         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
464 }
465
466
467 /*
468  * numeric() -
469  *
470  *      This is a special function called by the Postgres database system
471  *      before a value is stored in a tuple's attribute. The precision and
472  *      scale of the attribute have to be applied on the value.
473  */
474 Datum
475 numeric(PG_FUNCTION_ARGS)
476 {
477         Numeric         num = PG_GETARG_NUMERIC(0);
478         int32           typmod = PG_GETARG_INT32(1);
479         Numeric         new;
480         int32           tmp_typmod;
481         int                     precision;
482         int                     scale;
483         int                     ddigits;
484         int                     maxdigits;
485         NumericVar      var;
486
487         /*
488          * Handle NaN
489          */
490         if (NUMERIC_IS_NAN(num))
491                 PG_RETURN_NUMERIC(make_result(&const_nan));
492
493         /*
494          * If the value isn't a valid type modifier, simply return a copy of the
495          * input value
496          */
497         if (typmod < (int32) (VARHDRSZ))
498         {
499                 new = (Numeric) palloc(VARSIZE(num));
500                 memcpy(new, num, VARSIZE(num));
501                 PG_RETURN_NUMERIC(new);
502         }
503
504         /*
505          * Get the precision and scale out of the typmod value
506          */
507         tmp_typmod = typmod - VARHDRSZ;
508         precision = (tmp_typmod >> 16) & 0xffff;
509         scale = tmp_typmod & 0xffff;
510         maxdigits = precision - scale;
511
512         /*
513          * If the number is certainly in bounds and due to the target scale no
514          * rounding could be necessary, just make a copy of the input and modify
515          * its scale fields.  (Note we assume the existing dscale is honest...)
516          */
517         ddigits = (num->n_weight + 1) * DEC_DIGITS;
518         if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num))
519         {
520                 new = (Numeric) palloc(VARSIZE(num));
521                 memcpy(new, num, VARSIZE(num));
522                 new->n_sign_dscale = NUMERIC_SIGN(new) |
523                         ((uint16) scale & NUMERIC_DSCALE_MASK);
524                 PG_RETURN_NUMERIC(new);
525         }
526
527         /*
528          * We really need to fiddle with things - unpack the number into a
529          * variable and let apply_typmod() do it.
530          */
531         init_var(&var);
532
533         set_var_from_num(num, &var);
534         apply_typmod(&var, typmod);
535         new = make_result(&var);
536
537         free_var(&var);
538
539         PG_RETURN_NUMERIC(new);
540 }
541
542 Datum
543 numerictypmodin(PG_FUNCTION_ARGS)
544 {
545         ArrayType   *ta = PG_GETARG_ARRAYTYPE_P(0);
546         int32           *tl;
547         int                     n;
548         int32           typmod;
549
550         tl = ArrayGetIntegerTypmods(ta, &n);
551
552         if (n == 2)
553         {
554                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
555                         ereport(ERROR,
556                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
557                                          errmsg("NUMERIC precision %d must be between 1 and %d",
558                                                         tl[0], NUMERIC_MAX_PRECISION)));
559                 if (tl[1] < 0 || tl[1] > tl[0])
560                         ereport(ERROR,
561                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
562                                          errmsg("NUMERIC scale %d must be between 0 and precision %d",
563                                                         tl[1], tl[0])));
564                 typmod = ((tl[0] << 16) | tl[1]) + VARHDRSZ;
565         }
566         else if (n == 1)
567         {
568                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
569                         ereport(ERROR,
570                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
571                                          errmsg("NUMERIC precision %d must be between 1 and %d",
572                                                         tl[0], NUMERIC_MAX_PRECISION)));
573                 /* scale defaults to zero */
574                 typmod = (tl[0] << 16) + VARHDRSZ;
575         }
576         else
577         {
578                 ereport(ERROR,
579                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
580                                                 errmsg("invalid NUMERIC type modifier")));
581                 typmod = 0;                             /* keep compiler quiet */
582         }
583
584         PG_RETURN_INT32(typmod);
585 }
586
587 Datum
588 numerictypmodout(PG_FUNCTION_ARGS)
589 {
590         int32   typmod = PG_GETARG_INT32(0);
591         char   *res = (char *) palloc(64);
592
593         if (typmod >= 0)
594                 snprintf(res, 64, "(%d,%d)",
595                                  ((typmod - VARHDRSZ) >> 16) & 0xffff,
596                                  (typmod - VARHDRSZ) & 0xffff);
597         else
598                 *res = '\0';
599
600         PG_RETURN_CSTRING(res);
601 }
602
603
604 /* ----------------------------------------------------------------------
605  *
606  * Sign manipulation, rounding and the like
607  *
608  * ----------------------------------------------------------------------
609  */
610
611 Datum
612 numeric_abs(PG_FUNCTION_ARGS)
613 {
614         Numeric         num = PG_GETARG_NUMERIC(0);
615         Numeric         res;
616
617         /*
618          * Handle NaN
619          */
620         if (NUMERIC_IS_NAN(num))
621                 PG_RETURN_NUMERIC(make_result(&const_nan));
622
623         /*
624          * Do it the easy way directly on the packed format
625          */
626         res = (Numeric) palloc(VARSIZE(num));
627         memcpy(res, num, VARSIZE(num));
628
629         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
630
631         PG_RETURN_NUMERIC(res);
632 }
633
634
635 Datum
636 numeric_uminus(PG_FUNCTION_ARGS)
637 {
638         Numeric         num = PG_GETARG_NUMERIC(0);
639         Numeric         res;
640
641         /*
642          * Handle NaN
643          */
644         if (NUMERIC_IS_NAN(num))
645                 PG_RETURN_NUMERIC(make_result(&const_nan));
646
647         /*
648          * Do it the easy way directly on the packed format
649          */
650         res = (Numeric) palloc(VARSIZE(num));
651         memcpy(res, num, VARSIZE(num));
652
653         /*
654          * The packed format is known to be totally zero digit trimmed always. So
655          * we can identify a ZERO by the fact that there are no digits at all.  Do
656          * nothing to a zero.
657          */
658         if (VARSIZE(num) != NUMERIC_HDRSZ)
659         {
660                 /* Else, flip the sign */
661                 if (NUMERIC_SIGN(num) == NUMERIC_POS)
662                         res->n_sign_dscale = NUMERIC_NEG | NUMERIC_DSCALE(num);
663                 else
664                         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
665         }
666
667         PG_RETURN_NUMERIC(res);
668 }
669
670
671 Datum
672 numeric_uplus(PG_FUNCTION_ARGS)
673 {
674         Numeric         num = PG_GETARG_NUMERIC(0);
675         Numeric         res;
676
677         res = (Numeric) palloc(VARSIZE(num));
678         memcpy(res, num, VARSIZE(num));
679
680         PG_RETURN_NUMERIC(res);
681 }
682
683 /*
684  * numeric_sign() -
685  *
686  * returns -1 if the argument is less than 0, 0 if the argument is equal
687  * to 0, and 1 if the argument is greater than zero.
688  */
689 Datum
690 numeric_sign(PG_FUNCTION_ARGS)
691 {
692         Numeric         num = PG_GETARG_NUMERIC(0);
693         Numeric         res;
694         NumericVar      result;
695
696         /*
697          * Handle NaN
698          */
699         if (NUMERIC_IS_NAN(num))
700                 PG_RETURN_NUMERIC(make_result(&const_nan));
701
702         init_var(&result);
703
704         /*
705          * The packed format is known to be totally zero digit trimmed always. So
706          * we can identify a ZERO by the fact that there are no digits at all.
707          */
708         if (VARSIZE(num) == NUMERIC_HDRSZ)
709                 set_var_from_var(&const_zero, &result);
710         else
711         {
712                 /*
713                  * And if there are some, we return a copy of ONE with the sign of our
714                  * argument
715                  */
716                 set_var_from_var(&const_one, &result);
717                 result.sign = NUMERIC_SIGN(num);
718         }
719
720         res = make_result(&result);
721         free_var(&result);
722
723         PG_RETURN_NUMERIC(res);
724 }
725
726
727 /*
728  * numeric_round() -
729  *
730  *      Round a value to have 'scale' digits after the decimal point.
731  *      We allow negative 'scale', implying rounding before the decimal
732  *      point --- Oracle interprets rounding that way.
733  */
734 Datum
735 numeric_round(PG_FUNCTION_ARGS)
736 {
737         Numeric         num = PG_GETARG_NUMERIC(0);
738         int32           scale = PG_GETARG_INT32(1);
739         Numeric         res;
740         NumericVar      arg;
741
742         /*
743          * Handle NaN
744          */
745         if (NUMERIC_IS_NAN(num))
746                 PG_RETURN_NUMERIC(make_result(&const_nan));
747
748         /*
749          * Limit the scale value to avoid possible overflow in calculations
750          */
751         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
752         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
753
754         /*
755          * Unpack the argument and round it at the proper digit position
756          */
757         init_var(&arg);
758         set_var_from_num(num, &arg);
759
760         round_var(&arg, scale);
761
762         /* We don't allow negative output dscale */
763         if (scale < 0)
764                 arg.dscale = 0;
765
766         /*
767          * Return the rounded result
768          */
769         res = make_result(&arg);
770
771         free_var(&arg);
772         PG_RETURN_NUMERIC(res);
773 }
774
775
776 /*
777  * numeric_trunc() -
778  *
779  *      Truncate a value to have 'scale' digits after the decimal point.
780  *      We allow negative 'scale', implying a truncation before the decimal
781  *      point --- Oracle interprets truncation that way.
782  */
783 Datum
784 numeric_trunc(PG_FUNCTION_ARGS)
785 {
786         Numeric         num = PG_GETARG_NUMERIC(0);
787         int32           scale = PG_GETARG_INT32(1);
788         Numeric         res;
789         NumericVar      arg;
790
791         /*
792          * Handle NaN
793          */
794         if (NUMERIC_IS_NAN(num))
795                 PG_RETURN_NUMERIC(make_result(&const_nan));
796
797         /*
798          * Limit the scale value to avoid possible overflow in calculations
799          */
800         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
801         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
802
803         /*
804          * Unpack the argument and truncate it at the proper digit position
805          */
806         init_var(&arg);
807         set_var_from_num(num, &arg);
808
809         trunc_var(&arg, scale);
810
811         /* We don't allow negative output dscale */
812         if (scale < 0)
813                 arg.dscale = 0;
814
815         /*
816          * Return the truncated result
817          */
818         res = make_result(&arg);
819
820         free_var(&arg);
821         PG_RETURN_NUMERIC(res);
822 }
823
824
825 /*
826  * numeric_ceil() -
827  *
828  *      Return the smallest integer greater than or equal to the argument
829  */
830 Datum
831 numeric_ceil(PG_FUNCTION_ARGS)
832 {
833         Numeric         num = PG_GETARG_NUMERIC(0);
834         Numeric         res;
835         NumericVar      result;
836
837         if (NUMERIC_IS_NAN(num))
838                 PG_RETURN_NUMERIC(make_result(&const_nan));
839
840         init_var(&result);
841
842         set_var_from_num(num, &result);
843         ceil_var(&result, &result);
844
845         res = make_result(&result);
846         free_var(&result);
847
848         PG_RETURN_NUMERIC(res);
849 }
850
851
852 /*
853  * numeric_floor() -
854  *
855  *      Return the largest integer equal to or less than the argument
856  */
857 Datum
858 numeric_floor(PG_FUNCTION_ARGS)
859 {
860         Numeric         num = PG_GETARG_NUMERIC(0);
861         Numeric         res;
862         NumericVar      result;
863
864         if (NUMERIC_IS_NAN(num))
865                 PG_RETURN_NUMERIC(make_result(&const_nan));
866
867         init_var(&result);
868
869         set_var_from_num(num, &result);
870         floor_var(&result, &result);
871
872         res = make_result(&result);
873         free_var(&result);
874
875         PG_RETURN_NUMERIC(res);
876 }
877
878 /*
879  * Implements the numeric version of the width_bucket() function
880  * defined by SQL2003. See also width_bucket_float8().
881  *
882  * 'bound1' and 'bound2' are the lower and upper bounds of the
883  * histogram's range, respectively. 'count' is the number of buckets
884  * in the histogram. width_bucket() returns an integer indicating the
885  * bucket number that 'operand' belongs to in an equiwidth histogram
886  * with the specified characteristics. An operand smaller than the
887  * lower bound is assigned to bucket 0. An operand greater than the
888  * upper bound is assigned to an additional bucket (with number
889  * count+1). We don't allow "NaN" for any of the numeric arguments.
890  */
891 Datum
892 width_bucket_numeric(PG_FUNCTION_ARGS)
893 {
894         Numeric         operand = PG_GETARG_NUMERIC(0);
895         Numeric         bound1 = PG_GETARG_NUMERIC(1);
896         Numeric         bound2 = PG_GETARG_NUMERIC(2);
897         int32           count = PG_GETARG_INT32(3);
898         NumericVar      count_var;
899         NumericVar      result_var;
900         int32           result;
901
902         if (count <= 0)
903                 ereport(ERROR,
904                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
905                                  errmsg("count must be greater than zero")));
906
907         if (NUMERIC_IS_NAN(operand) ||
908                 NUMERIC_IS_NAN(bound1) ||
909                 NUMERIC_IS_NAN(bound2))
910                 ereport(ERROR,
911                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
912                                  errmsg("operand, lower bound and upper bound cannot be NaN")));
913
914         init_var(&result_var);
915         init_var(&count_var);
916
917         /* Convert 'count' to a numeric, for ease of use later */
918         int8_to_numericvar((int64) count, &count_var);
919
920         switch (cmp_numerics(bound1, bound2))
921         {
922                 case 0:
923                         ereport(ERROR,
924                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
925                                  errmsg("lower bound cannot equal upper bound")));
926
927                         /* bound1 < bound2 */
928                 case -1:
929                         if (cmp_numerics(operand, bound1) < 0)
930                                 set_var_from_var(&const_zero, &result_var);
931                         else if (cmp_numerics(operand, bound2) >= 0)
932                                 add_var(&count_var, &const_one, &result_var);
933                         else
934                                 compute_bucket(operand, bound1, bound2,
935                                                            &count_var, &result_var);
936                         break;
937
938                         /* bound1 > bound2 */
939                 case 1:
940                         if (cmp_numerics(operand, bound1) > 0)
941                                 set_var_from_var(&const_zero, &result_var);
942                         else if (cmp_numerics(operand, bound2) <= 0)
943                                 add_var(&count_var, &const_one, &result_var);
944                         else
945                                 compute_bucket(operand, bound1, bound2,
946                                                            &count_var, &result_var);
947                         break;
948         }
949
950         /* if result exceeds the range of a legal int4, we ereport here */
951         result = numericvar_to_int4(&result_var);
952
953         free_var(&count_var);
954         free_var(&result_var);
955
956         PG_RETURN_INT32(result);
957 }
958
959 /*
960  * If 'operand' is not outside the bucket range, determine the correct
961  * bucket for it to go. The calculations performed by this function
962  * are derived directly from the SQL2003 spec.
963  */
964 static void
965 compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
966                            NumericVar *count_var, NumericVar *result_var)
967 {
968         NumericVar      bound1_var;
969         NumericVar      bound2_var;
970         NumericVar      operand_var;
971
972         init_var(&bound1_var);
973         init_var(&bound2_var);
974         init_var(&operand_var);
975
976         set_var_from_num(bound1, &bound1_var);
977         set_var_from_num(bound2, &bound2_var);
978         set_var_from_num(operand, &operand_var);
979
980         if (cmp_var(&bound1_var, &bound2_var) < 0)
981         {
982                 sub_var(&operand_var, &bound1_var, &operand_var);
983                 sub_var(&bound2_var, &bound1_var, &bound2_var);
984                 div_var(&operand_var, &bound2_var, result_var,
985                                 select_div_scale(&operand_var, &bound2_var), true);
986         }
987         else
988         {
989                 sub_var(&bound1_var, &operand_var, &operand_var);
990                 sub_var(&bound1_var, &bound2_var, &bound1_var);
991                 div_var(&operand_var, &bound1_var, result_var,
992                                 select_div_scale(&operand_var, &bound1_var), true);
993         }
994
995         mul_var(result_var, count_var, result_var,
996                         result_var->dscale + count_var->dscale);
997         add_var(result_var, &const_one, result_var);
998         floor_var(result_var, result_var);
999
1000         free_var(&bound1_var);
1001         free_var(&bound2_var);
1002         free_var(&operand_var);
1003 }
1004
1005 /* ----------------------------------------------------------------------
1006  *
1007  * Comparison functions
1008  *
1009  * Note: btree indexes need these routines not to leak memory; therefore,
1010  * be careful to free working copies of toasted datums.  Most places don't
1011  * need to be so careful.
1012  * ----------------------------------------------------------------------
1013  */
1014
1015
1016 Datum
1017 numeric_cmp(PG_FUNCTION_ARGS)
1018 {
1019         Numeric         num1 = PG_GETARG_NUMERIC(0);
1020         Numeric         num2 = PG_GETARG_NUMERIC(1);
1021         int                     result;
1022
1023         result = cmp_numerics(num1, num2);
1024
1025         PG_FREE_IF_COPY(num1, 0);
1026         PG_FREE_IF_COPY(num2, 1);
1027
1028         PG_RETURN_INT32(result);
1029 }
1030
1031
1032 Datum
1033 numeric_eq(PG_FUNCTION_ARGS)
1034 {
1035         Numeric         num1 = PG_GETARG_NUMERIC(0);
1036         Numeric         num2 = PG_GETARG_NUMERIC(1);
1037         bool            result;
1038
1039         result = cmp_numerics(num1, num2) == 0;
1040
1041         PG_FREE_IF_COPY(num1, 0);
1042         PG_FREE_IF_COPY(num2, 1);
1043
1044         PG_RETURN_BOOL(result);
1045 }
1046
1047 Datum
1048 numeric_ne(PG_FUNCTION_ARGS)
1049 {
1050         Numeric         num1 = PG_GETARG_NUMERIC(0);
1051         Numeric         num2 = PG_GETARG_NUMERIC(1);
1052         bool            result;
1053
1054         result = cmp_numerics(num1, num2) != 0;
1055
1056         PG_FREE_IF_COPY(num1, 0);
1057         PG_FREE_IF_COPY(num2, 1);
1058
1059         PG_RETURN_BOOL(result);
1060 }
1061
1062 Datum
1063 numeric_gt(PG_FUNCTION_ARGS)
1064 {
1065         Numeric         num1 = PG_GETARG_NUMERIC(0);
1066         Numeric         num2 = PG_GETARG_NUMERIC(1);
1067         bool            result;
1068
1069         result = cmp_numerics(num1, num2) > 0;
1070
1071         PG_FREE_IF_COPY(num1, 0);
1072         PG_FREE_IF_COPY(num2, 1);
1073
1074         PG_RETURN_BOOL(result);
1075 }
1076
1077 Datum
1078 numeric_ge(PG_FUNCTION_ARGS)
1079 {
1080         Numeric         num1 = PG_GETARG_NUMERIC(0);
1081         Numeric         num2 = PG_GETARG_NUMERIC(1);
1082         bool            result;
1083
1084         result = cmp_numerics(num1, num2) >= 0;
1085
1086         PG_FREE_IF_COPY(num1, 0);
1087         PG_FREE_IF_COPY(num2, 1);
1088
1089         PG_RETURN_BOOL(result);
1090 }
1091
1092 Datum
1093 numeric_lt(PG_FUNCTION_ARGS)
1094 {
1095         Numeric         num1 = PG_GETARG_NUMERIC(0);
1096         Numeric         num2 = PG_GETARG_NUMERIC(1);
1097         bool            result;
1098
1099         result = cmp_numerics(num1, num2) < 0;
1100
1101         PG_FREE_IF_COPY(num1, 0);
1102         PG_FREE_IF_COPY(num2, 1);
1103
1104         PG_RETURN_BOOL(result);
1105 }
1106
1107 Datum
1108 numeric_le(PG_FUNCTION_ARGS)
1109 {
1110         Numeric         num1 = PG_GETARG_NUMERIC(0);
1111         Numeric         num2 = PG_GETARG_NUMERIC(1);
1112         bool            result;
1113
1114         result = cmp_numerics(num1, num2) <= 0;
1115
1116         PG_FREE_IF_COPY(num1, 0);
1117         PG_FREE_IF_COPY(num2, 1);
1118
1119         PG_RETURN_BOOL(result);
1120 }
1121
1122 static int
1123 cmp_numerics(Numeric num1, Numeric num2)
1124 {
1125         int                     result;
1126
1127         /*
1128          * We consider all NANs to be equal and larger than any non-NAN. This is
1129          * somewhat arbitrary; the important thing is to have a consistent sort
1130          * order.
1131          */
1132         if (NUMERIC_IS_NAN(num1))
1133         {
1134                 if (NUMERIC_IS_NAN(num2))
1135                         result = 0;                     /* NAN = NAN */
1136                 else
1137                         result = 1;                     /* NAN > non-NAN */
1138         }
1139         else if (NUMERIC_IS_NAN(num2))
1140         {
1141                 result = -1;                    /* non-NAN < NAN */
1142         }
1143         else
1144         {
1145                 result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
1146                                                                 num1->n_weight, NUMERIC_SIGN(num1),
1147                                                                 NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
1148                                                                 num2->n_weight, NUMERIC_SIGN(num2));
1149         }
1150
1151         return result;
1152 }
1153
1154 Datum
1155 hash_numeric(PG_FUNCTION_ARGS)
1156 {
1157         Numeric         key = PG_GETARG_NUMERIC(0);
1158         Datum           digit_hash;
1159         Datum           result;
1160         int             weight;
1161         int             start_offset;
1162         int             end_offset;
1163         int             i;
1164         int             hash_len;
1165
1166         /* If it's NaN, don't try to hash the rest of the fields */
1167         if (NUMERIC_IS_NAN(key))
1168                 PG_RETURN_UINT32(0);
1169
1170         weight           = key->n_weight;
1171         start_offset = 0;
1172         end_offset       = 0;
1173
1174         /*
1175          * Omit any leading or trailing zeros from the input to the
1176          * hash. The numeric implementation *should* guarantee that
1177          * leading and trailing zeros are suppressed, but we're
1178          * paranoid. Note that we measure the starting and ending offsets
1179          * in units of NumericDigits, not bytes.
1180          */
1181         for (i = 0; i < NUMERIC_NDIGITS(key); i++)
1182         {
1183                 if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
1184                         break;
1185
1186                 start_offset++;
1187                 /*
1188                  * The weight is effectively the # of digits before the
1189                  * decimal point, so decrement it for each leading zero we
1190                  * skip.
1191                  */
1192                 weight--;
1193         }
1194
1195         /*
1196          * If there are no non-zero digits, then the value of the number
1197          * is zero, regardless of any other fields.
1198          */
1199         if (NUMERIC_NDIGITS(key) == start_offset)
1200                 PG_RETURN_UINT32(-1);
1201
1202         for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--)
1203         {
1204                 if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
1205                         break;
1206
1207                 end_offset++;
1208         }
1209
1210         /* If we get here, there should be at least one non-zero digit */
1211         Assert(start_offset + end_offset < NUMERIC_NDIGITS(key));
1212
1213         /*
1214          * Note that we don't hash on the Numeric's scale, since two
1215          * numerics can compare equal but have different scales. We also
1216          * don't hash on the sign, although we could: since a sign
1217          * difference implies inequality, this shouldn't affect correctness.
1218          */
1219         hash_len = NUMERIC_NDIGITS(key) - start_offset - end_offset;
1220         digit_hash = hash_any((unsigned char *) (NUMERIC_DIGITS(key) + start_offset),
1221                           hash_len * sizeof(NumericDigit));
1222
1223         /* Mix in the weight, via XOR */
1224         result = digit_hash ^ weight;
1225
1226         PG_RETURN_DATUM(result);
1227 }
1228
1229
1230 /* ----------------------------------------------------------------------
1231  *
1232  * Basic arithmetic functions
1233  *
1234  * ----------------------------------------------------------------------
1235  */
1236
1237
1238 /*
1239  * numeric_add() -
1240  *
1241  *      Add two numerics
1242  */
1243 Datum
1244 numeric_add(PG_FUNCTION_ARGS)
1245 {
1246         Numeric         num1 = PG_GETARG_NUMERIC(0);
1247         Numeric         num2 = PG_GETARG_NUMERIC(1);
1248         NumericVar      arg1;
1249         NumericVar      arg2;
1250         NumericVar      result;
1251         Numeric         res;
1252
1253         /*
1254          * Handle NaN
1255          */
1256         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1257                 PG_RETURN_NUMERIC(make_result(&const_nan));
1258
1259         /*
1260          * Unpack the values, let add_var() compute the result and return it.
1261          */
1262         init_var(&arg1);
1263         init_var(&arg2);
1264         init_var(&result);
1265
1266         set_var_from_num(num1, &arg1);
1267         set_var_from_num(num2, &arg2);
1268
1269         add_var(&arg1, &arg2, &result);
1270
1271         res = make_result(&result);
1272
1273         free_var(&arg1);
1274         free_var(&arg2);
1275         free_var(&result);
1276
1277         PG_RETURN_NUMERIC(res);
1278 }
1279
1280
1281 /*
1282  * numeric_sub() -
1283  *
1284  *      Subtract one numeric from another
1285  */
1286 Datum
1287 numeric_sub(PG_FUNCTION_ARGS)
1288 {
1289         Numeric         num1 = PG_GETARG_NUMERIC(0);
1290         Numeric         num2 = PG_GETARG_NUMERIC(1);
1291         NumericVar      arg1;
1292         NumericVar      arg2;
1293         NumericVar      result;
1294         Numeric         res;
1295
1296         /*
1297          * Handle NaN
1298          */
1299         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1300                 PG_RETURN_NUMERIC(make_result(&const_nan));
1301
1302         /*
1303          * Unpack the values, let sub_var() compute the result and return it.
1304          */
1305         init_var(&arg1);
1306         init_var(&arg2);
1307         init_var(&result);
1308
1309         set_var_from_num(num1, &arg1);
1310         set_var_from_num(num2, &arg2);
1311
1312         sub_var(&arg1, &arg2, &result);
1313
1314         res = make_result(&result);
1315
1316         free_var(&arg1);
1317         free_var(&arg2);
1318         free_var(&result);
1319
1320         PG_RETURN_NUMERIC(res);
1321 }
1322
1323
1324 /*
1325  * numeric_mul() -
1326  *
1327  *      Calculate the product of two numerics
1328  */
1329 Datum
1330 numeric_mul(PG_FUNCTION_ARGS)
1331 {
1332         Numeric         num1 = PG_GETARG_NUMERIC(0);
1333         Numeric         num2 = PG_GETARG_NUMERIC(1);
1334         NumericVar      arg1;
1335         NumericVar      arg2;
1336         NumericVar      result;
1337         Numeric         res;
1338
1339         /*
1340          * Handle NaN
1341          */
1342         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1343                 PG_RETURN_NUMERIC(make_result(&const_nan));
1344
1345         /*
1346          * Unpack the values, let mul_var() compute the result and return it.
1347          * Unlike add_var() and sub_var(), mul_var() will round its result. In the
1348          * case of numeric_mul(), which is invoked for the * operator on numerics,
1349          * we request exact representation for the product (rscale = sum(dscale of
1350          * arg1, dscale of arg2)).
1351          */
1352         init_var(&arg1);
1353         init_var(&arg2);
1354         init_var(&result);
1355
1356         set_var_from_num(num1, &arg1);
1357         set_var_from_num(num2, &arg2);
1358
1359         mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
1360
1361         res = make_result(&result);
1362
1363         free_var(&arg1);
1364         free_var(&arg2);
1365         free_var(&result);
1366
1367         PG_RETURN_NUMERIC(res);
1368 }
1369
1370
1371 /*
1372  * numeric_div() -
1373  *
1374  *      Divide one numeric into another
1375  */
1376 Datum
1377 numeric_div(PG_FUNCTION_ARGS)
1378 {
1379         Numeric         num1 = PG_GETARG_NUMERIC(0);
1380         Numeric         num2 = PG_GETARG_NUMERIC(1);
1381         NumericVar      arg1;
1382         NumericVar      arg2;
1383         NumericVar      result;
1384         Numeric         res;
1385         int                     rscale;
1386
1387         /*
1388          * Handle NaN
1389          */
1390         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1391                 PG_RETURN_NUMERIC(make_result(&const_nan));
1392
1393         /*
1394          * Unpack the arguments
1395          */
1396         init_var(&arg1);
1397         init_var(&arg2);
1398         init_var(&result);
1399
1400         set_var_from_num(num1, &arg1);
1401         set_var_from_num(num2, &arg2);
1402
1403         /*
1404          * Select scale for division result
1405          */
1406         rscale = select_div_scale(&arg1, &arg2);
1407
1408         /*
1409          * Do the divide and return the result
1410          */
1411         div_var(&arg1, &arg2, &result, rscale, true);
1412
1413         res = make_result(&result);
1414
1415         free_var(&arg1);
1416         free_var(&arg2);
1417         free_var(&result);
1418
1419         PG_RETURN_NUMERIC(res);
1420 }
1421
1422
1423 /*
1424  * numeric_mod() -
1425  *
1426  *      Calculate the modulo of two numerics
1427  */
1428 Datum
1429 numeric_mod(PG_FUNCTION_ARGS)
1430 {
1431         Numeric         num1 = PG_GETARG_NUMERIC(0);
1432         Numeric         num2 = PG_GETARG_NUMERIC(1);
1433         Numeric         res;
1434         NumericVar      arg1;
1435         NumericVar      arg2;
1436         NumericVar      result;
1437
1438         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1439                 PG_RETURN_NUMERIC(make_result(&const_nan));
1440
1441         init_var(&arg1);
1442         init_var(&arg2);
1443         init_var(&result);
1444
1445         set_var_from_num(num1, &arg1);
1446         set_var_from_num(num2, &arg2);
1447
1448         mod_var(&arg1, &arg2, &result);
1449
1450         res = make_result(&result);
1451
1452         free_var(&result);
1453         free_var(&arg2);
1454         free_var(&arg1);
1455
1456         PG_RETURN_NUMERIC(res);
1457 }
1458
1459
1460 /*
1461  * numeric_inc() -
1462  *
1463  *      Increment a number by one
1464  */
1465 Datum
1466 numeric_inc(PG_FUNCTION_ARGS)
1467 {
1468         Numeric         num = PG_GETARG_NUMERIC(0);
1469         NumericVar      arg;
1470         Numeric         res;
1471
1472         /*
1473          * Handle NaN
1474          */
1475         if (NUMERIC_IS_NAN(num))
1476                 PG_RETURN_NUMERIC(make_result(&const_nan));
1477
1478         /*
1479          * Compute the result and return it
1480          */
1481         init_var(&arg);
1482
1483         set_var_from_num(num, &arg);
1484
1485         add_var(&arg, &const_one, &arg);
1486
1487         res = make_result(&arg);
1488
1489         free_var(&arg);
1490
1491         PG_RETURN_NUMERIC(res);
1492 }
1493
1494
1495 /*
1496  * numeric_smaller() -
1497  *
1498  *      Return the smaller of two numbers
1499  */
1500 Datum
1501 numeric_smaller(PG_FUNCTION_ARGS)
1502 {
1503         Numeric         num1 = PG_GETARG_NUMERIC(0);
1504         Numeric         num2 = PG_GETARG_NUMERIC(1);
1505
1506         /*
1507          * Use cmp_numerics so that this will agree with the comparison operators,
1508          * particularly as regards comparisons involving NaN.
1509          */
1510         if (cmp_numerics(num1, num2) < 0)
1511                 PG_RETURN_NUMERIC(num1);
1512         else
1513                 PG_RETURN_NUMERIC(num2);
1514 }
1515
1516
1517 /*
1518  * numeric_larger() -
1519  *
1520  *      Return the larger of two numbers
1521  */
1522 Datum
1523 numeric_larger(PG_FUNCTION_ARGS)
1524 {
1525         Numeric         num1 = PG_GETARG_NUMERIC(0);
1526         Numeric         num2 = PG_GETARG_NUMERIC(1);
1527
1528         /*
1529          * Use cmp_numerics so that this will agree with the comparison operators,
1530          * particularly as regards comparisons involving NaN.
1531          */
1532         if (cmp_numerics(num1, num2) > 0)
1533                 PG_RETURN_NUMERIC(num1);
1534         else
1535                 PG_RETURN_NUMERIC(num2);
1536 }
1537
1538
1539 /* ----------------------------------------------------------------------
1540  *
1541  * Advanced math functions
1542  *
1543  * ----------------------------------------------------------------------
1544  */
1545
1546 /*
1547  * numeric_fac()
1548  *
1549  * Compute factorial
1550  */
1551 Datum
1552 numeric_fac(PG_FUNCTION_ARGS)
1553 {
1554         int64           num = PG_GETARG_INT64(0);
1555         Numeric         res;
1556         NumericVar      fact;
1557         NumericVar      result;
1558
1559         if (num <= 1)
1560         {
1561                 res = make_result(&const_one);
1562                 PG_RETURN_NUMERIC(res);
1563         }
1564         /* Fail immediately if the result would overflow */
1565         if (num > 32177)
1566                 ereport(ERROR,
1567                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1568                                  errmsg("value overflows numeric format")));
1569
1570         init_var(&fact);
1571         init_var(&result);
1572
1573         int8_to_numericvar(num, &result);
1574
1575         for (num = num - 1; num > 1; num--)
1576         {
1577                 /* this loop can take awhile, so allow it to be interrupted */
1578                 CHECK_FOR_INTERRUPTS();
1579
1580                 int8_to_numericvar(num, &fact);
1581
1582                 mul_var(&result, &fact, &result, 0);
1583         }
1584
1585         res = make_result(&result);
1586
1587         free_var(&fact);
1588         free_var(&result);
1589
1590         PG_RETURN_NUMERIC(res);
1591 }
1592
1593
1594 /*
1595  * numeric_sqrt() -
1596  *
1597  *      Compute the square root of a numeric.
1598  */
1599 Datum
1600 numeric_sqrt(PG_FUNCTION_ARGS)
1601 {
1602         Numeric         num = PG_GETARG_NUMERIC(0);
1603         Numeric         res;
1604         NumericVar      arg;
1605         NumericVar      result;
1606         int                     sweight;
1607         int                     rscale;
1608
1609         /*
1610          * Handle NaN
1611          */
1612         if (NUMERIC_IS_NAN(num))
1613                 PG_RETURN_NUMERIC(make_result(&const_nan));
1614
1615         /*
1616          * Unpack the argument and determine the result scale.  We choose a scale
1617          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1618          * case not less than the input's dscale.
1619          */
1620         init_var(&arg);
1621         init_var(&result);
1622
1623         set_var_from_num(num, &arg);
1624
1625         /* Assume the input was normalized, so arg.weight is accurate */
1626         sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
1627
1628         rscale = NUMERIC_MIN_SIG_DIGITS - sweight;
1629         rscale = Max(rscale, arg.dscale);
1630         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1631         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1632
1633         /*
1634          * Let sqrt_var() do the calculation and return the result.
1635          */
1636         sqrt_var(&arg, &result, rscale);
1637
1638         res = make_result(&result);
1639
1640         free_var(&result);
1641         free_var(&arg);
1642
1643         PG_RETURN_NUMERIC(res);
1644 }
1645
1646
1647 /*
1648  * numeric_exp() -
1649  *
1650  *      Raise e to the power of x
1651  */
1652 Datum
1653 numeric_exp(PG_FUNCTION_ARGS)
1654 {
1655         Numeric         num = PG_GETARG_NUMERIC(0);
1656         Numeric         res;
1657         NumericVar      arg;
1658         NumericVar      result;
1659         int                     rscale;
1660         double          val;
1661
1662         /*
1663          * Handle NaN
1664          */
1665         if (NUMERIC_IS_NAN(num))
1666                 PG_RETURN_NUMERIC(make_result(&const_nan));
1667
1668         /*
1669          * Unpack the argument and determine the result scale.  We choose a scale
1670          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1671          * case not less than the input's dscale.
1672          */
1673         init_var(&arg);
1674         init_var(&result);
1675
1676         set_var_from_num(num, &arg);
1677
1678         /* convert input to float8, ignoring overflow */
1679         val = numericvar_to_double_no_overflow(&arg);
1680
1681         /*
1682          * log10(result) = num * log10(e), so this is approximately the decimal
1683          * weight of the result:
1684          */
1685         val *= 0.434294481903252;
1686
1687         /* limit to something that won't cause integer overflow */
1688         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
1689         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
1690
1691         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
1692         rscale = Max(rscale, arg.dscale);
1693         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1694         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1695
1696         /*
1697          * Let exp_var() do the calculation and return the result.
1698          */
1699         exp_var(&arg, &result, rscale);
1700
1701         res = make_result(&result);
1702
1703         free_var(&result);
1704         free_var(&arg);
1705
1706         PG_RETURN_NUMERIC(res);
1707 }
1708
1709
1710 /*
1711  * numeric_ln() -
1712  *
1713  *      Compute the natural logarithm of x
1714  */
1715 Datum
1716 numeric_ln(PG_FUNCTION_ARGS)
1717 {
1718         Numeric         num = PG_GETARG_NUMERIC(0);
1719         Numeric         res;
1720         NumericVar      arg;
1721         NumericVar      result;
1722         int                     dec_digits;
1723         int                     rscale;
1724
1725         /*
1726          * Handle NaN
1727          */
1728         if (NUMERIC_IS_NAN(num))
1729                 PG_RETURN_NUMERIC(make_result(&const_nan));
1730
1731         init_var(&arg);
1732         init_var(&result);
1733
1734         set_var_from_num(num, &arg);
1735
1736         /* Approx decimal digits before decimal point */
1737         dec_digits = (arg.weight + 1) * DEC_DIGITS;
1738
1739         if (dec_digits > 1)
1740                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
1741         else if (dec_digits < 1)
1742                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
1743         else
1744                 rscale = NUMERIC_MIN_SIG_DIGITS;
1745
1746         rscale = Max(rscale, arg.dscale);
1747         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1748         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1749
1750         ln_var(&arg, &result, rscale);
1751
1752         res = make_result(&result);
1753
1754         free_var(&result);
1755         free_var(&arg);
1756
1757         PG_RETURN_NUMERIC(res);
1758 }
1759
1760
1761 /*
1762  * numeric_log() -
1763  *
1764  *      Compute the logarithm of x in a given base
1765  */
1766 Datum
1767 numeric_log(PG_FUNCTION_ARGS)
1768 {
1769         Numeric         num1 = PG_GETARG_NUMERIC(0);
1770         Numeric         num2 = PG_GETARG_NUMERIC(1);
1771         Numeric         res;
1772         NumericVar      arg1;
1773         NumericVar      arg2;
1774         NumericVar      result;
1775
1776         /*
1777          * Handle NaN
1778          */
1779         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1780                 PG_RETURN_NUMERIC(make_result(&const_nan));
1781
1782         /*
1783          * Initialize things
1784          */
1785         init_var(&arg1);
1786         init_var(&arg2);
1787         init_var(&result);
1788
1789         set_var_from_num(num1, &arg1);
1790         set_var_from_num(num2, &arg2);
1791
1792         /*
1793          * Call log_var() to compute and return the result; note it handles scale
1794          * selection itself.
1795          */
1796         log_var(&arg1, &arg2, &result);
1797
1798         res = make_result(&result);
1799
1800         free_var(&result);
1801         free_var(&arg2);
1802         free_var(&arg1);
1803
1804         PG_RETURN_NUMERIC(res);
1805 }
1806
1807
1808 /*
1809  * numeric_power() -
1810  *
1811  *      Raise b to the power of x
1812  */
1813 Datum
1814 numeric_power(PG_FUNCTION_ARGS)
1815 {
1816         Numeric         num1 = PG_GETARG_NUMERIC(0);
1817         Numeric         num2 = PG_GETARG_NUMERIC(1);
1818         Numeric         res;
1819         NumericVar      arg1;
1820         NumericVar      arg2;
1821         NumericVar      arg2_trunc;
1822         NumericVar      result;
1823
1824         /*
1825          * Handle NaN
1826          */
1827         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1828                 PG_RETURN_NUMERIC(make_result(&const_nan));
1829
1830         /*
1831          * Initialize things
1832          */
1833         init_var(&arg1);
1834         init_var(&arg2);
1835         init_var(&arg2_trunc);
1836         init_var(&result);
1837
1838         set_var_from_num(num1, &arg1);
1839         set_var_from_num(num2, &arg2);
1840         set_var_from_var(&arg2, &arg2_trunc);
1841
1842         trunc_var(&arg2_trunc, 0);
1843
1844         /*
1845          * Return special SQLSTATE error codes for a few conditions mandated by
1846          * the standard.
1847          */
1848         if ((cmp_var(&arg1, &const_zero) == 0 &&
1849                  cmp_var(&arg2, &const_zero) < 0) ||
1850                 (cmp_var(&arg1, &const_zero) < 0 &&
1851                  cmp_var(&arg2, &arg2_trunc) != 0))
1852                 ereport(ERROR,
1853                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
1854                                  errmsg("invalid argument for power function")));
1855
1856         /*
1857          * Call power_var() to compute and return the result; note it handles
1858          * scale selection itself.
1859          */
1860         power_var(&arg1, &arg2, &result);
1861
1862         res = make_result(&result);
1863
1864         free_var(&result);
1865         free_var(&arg2);
1866         free_var(&arg2_trunc);
1867         free_var(&arg1);
1868
1869         PG_RETURN_NUMERIC(res);
1870 }
1871
1872
1873 /* ----------------------------------------------------------------------
1874  *
1875  * Type conversion functions
1876  *
1877  * ----------------------------------------------------------------------
1878  */
1879
1880
1881 Datum
1882 int4_numeric(PG_FUNCTION_ARGS)
1883 {
1884         int32           val = PG_GETARG_INT32(0);
1885         Numeric         res;
1886         NumericVar      result;
1887
1888         init_var(&result);
1889
1890         int8_to_numericvar((int64) val, &result);
1891
1892         res = make_result(&result);
1893
1894         free_var(&result);
1895
1896         PG_RETURN_NUMERIC(res);
1897 }
1898
1899
1900 Datum
1901 numeric_int4(PG_FUNCTION_ARGS)
1902 {
1903         Numeric         num = PG_GETARG_NUMERIC(0);
1904         NumericVar      x;
1905         int32           result;
1906
1907         /* XXX would it be better to return NULL? */
1908         if (NUMERIC_IS_NAN(num))
1909                 ereport(ERROR,
1910                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1911                                  errmsg("cannot convert NaN to integer")));
1912
1913         /* Convert to variable format, then convert to int4 */
1914         init_var(&x);
1915         set_var_from_num(num, &x);
1916         result = numericvar_to_int4(&x);
1917         free_var(&x);
1918         PG_RETURN_INT32(result);
1919 }
1920
1921 /*
1922  * Given a NumericVar, convert it to an int32. If the NumericVar
1923  * exceeds the range of an int32, raise the appropriate error via
1924  * ereport(). The input NumericVar is *not* free'd.
1925  */
1926 static int32
1927 numericvar_to_int4(NumericVar *var)
1928 {
1929         int32           result;
1930         int64           val;
1931
1932         if (!numericvar_to_int8(var, &val))
1933                 ereport(ERROR,
1934                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1935                                  errmsg("integer out of range")));
1936
1937         /* Down-convert to int4 */
1938         result = (int32) val;
1939
1940         /* Test for overflow by reverse-conversion. */
1941         if ((int64) result != val)
1942                 ereport(ERROR,
1943                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1944                                  errmsg("integer out of range")));
1945
1946         return result;
1947 }
1948
1949 Datum
1950 int8_numeric(PG_FUNCTION_ARGS)
1951 {
1952         int64           val = PG_GETARG_INT64(0);
1953         Numeric         res;
1954         NumericVar      result;
1955
1956         init_var(&result);
1957
1958         int8_to_numericvar(val, &result);
1959
1960         res = make_result(&result);
1961
1962         free_var(&result);
1963
1964         PG_RETURN_NUMERIC(res);
1965 }
1966
1967
1968 Datum
1969 numeric_int8(PG_FUNCTION_ARGS)
1970 {
1971         Numeric         num = PG_GETARG_NUMERIC(0);
1972         NumericVar      x;
1973         int64           result;
1974
1975         /* XXX would it be better to return NULL? */
1976         if (NUMERIC_IS_NAN(num))
1977                 ereport(ERROR,
1978                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1979                                  errmsg("cannot convert NaN to bigint")));
1980
1981         /* Convert to variable format and thence to int8 */
1982         init_var(&x);
1983         set_var_from_num(num, &x);
1984
1985         if (!numericvar_to_int8(&x, &result))
1986                 ereport(ERROR,
1987                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1988                                  errmsg("bigint out of range")));
1989
1990         free_var(&x);
1991
1992         PG_RETURN_INT64(result);
1993 }
1994
1995
1996 Datum
1997 int2_numeric(PG_FUNCTION_ARGS)
1998 {
1999         int16           val = PG_GETARG_INT16(0);
2000         Numeric         res;
2001         NumericVar      result;
2002
2003         init_var(&result);
2004
2005         int8_to_numericvar((int64) val, &result);
2006
2007         res = make_result(&result);
2008
2009         free_var(&result);
2010
2011         PG_RETURN_NUMERIC(res);
2012 }
2013
2014
2015 Datum
2016 numeric_int2(PG_FUNCTION_ARGS)
2017 {
2018         Numeric         num = PG_GETARG_NUMERIC(0);
2019         NumericVar      x;
2020         int64           val;
2021         int16           result;
2022
2023         /* XXX would it be better to return NULL? */
2024         if (NUMERIC_IS_NAN(num))
2025                 ereport(ERROR,
2026                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2027                                  errmsg("cannot convert NaN to smallint")));
2028
2029         /* Convert to variable format and thence to int8 */
2030         init_var(&x);
2031         set_var_from_num(num, &x);
2032
2033         if (!numericvar_to_int8(&x, &val))
2034                 ereport(ERROR,
2035                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2036                                  errmsg("smallint out of range")));
2037
2038         free_var(&x);
2039
2040         /* Down-convert to int2 */
2041         result = (int16) val;
2042
2043         /* Test for overflow by reverse-conversion. */
2044         if ((int64) result != val)
2045                 ereport(ERROR,
2046                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2047                                  errmsg("smallint out of range")));
2048
2049         PG_RETURN_INT16(result);
2050 }
2051
2052
2053 Datum
2054 float8_numeric(PG_FUNCTION_ARGS)
2055 {
2056         float8          val = PG_GETARG_FLOAT8(0);
2057         Numeric         res;
2058         NumericVar      result;
2059         char            buf[DBL_DIG + 100];
2060
2061         if (isnan(val))
2062                 PG_RETURN_NUMERIC(make_result(&const_nan));
2063
2064         sprintf(buf, "%.*g", DBL_DIG, val);
2065
2066         init_var(&result);
2067
2068         set_var_from_str(buf, &result);
2069         res = make_result(&result);
2070
2071         free_var(&result);
2072
2073         PG_RETURN_NUMERIC(res);
2074 }
2075
2076
2077 Datum
2078 numeric_float8(PG_FUNCTION_ARGS)
2079 {
2080         Numeric         num = PG_GETARG_NUMERIC(0);
2081         char       *tmp;
2082         Datum           result;
2083
2084         if (NUMERIC_IS_NAN(num))
2085                 PG_RETURN_FLOAT8(get_float8_nan());
2086
2087         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2088                                                                                           NumericGetDatum(num)));
2089
2090         result = DirectFunctionCall1(float8in, CStringGetDatum(tmp));
2091
2092         pfree(tmp);
2093
2094         PG_RETURN_DATUM(result);
2095 }
2096
2097
2098 /* Convert numeric to float8; if out of range, return +/- HUGE_VAL */
2099 Datum
2100 numeric_float8_no_overflow(PG_FUNCTION_ARGS)
2101 {
2102         Numeric         num = PG_GETARG_NUMERIC(0);
2103         double          val;
2104
2105         if (NUMERIC_IS_NAN(num))
2106                 PG_RETURN_FLOAT8(get_float8_nan());
2107
2108         val = numeric_to_double_no_overflow(num);
2109
2110         PG_RETURN_FLOAT8(val);
2111 }
2112
2113 Datum
2114 float4_numeric(PG_FUNCTION_ARGS)
2115 {
2116         float4          val = PG_GETARG_FLOAT4(0);
2117         Numeric         res;
2118         NumericVar      result;
2119         char            buf[FLT_DIG + 100];
2120
2121         if (isnan(val))
2122                 PG_RETURN_NUMERIC(make_result(&const_nan));
2123
2124         sprintf(buf, "%.*g", FLT_DIG, val);
2125
2126         init_var(&result);
2127
2128         set_var_from_str(buf, &result);
2129         res = make_result(&result);
2130
2131         free_var(&result);
2132
2133         PG_RETURN_NUMERIC(res);
2134 }
2135
2136
2137 Datum
2138 numeric_float4(PG_FUNCTION_ARGS)
2139 {
2140         Numeric         num = PG_GETARG_NUMERIC(0);
2141         char       *tmp;
2142         Datum           result;
2143
2144         if (NUMERIC_IS_NAN(num))
2145                 PG_RETURN_FLOAT4(get_float4_nan());
2146
2147         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2148                                                                                           NumericGetDatum(num)));
2149
2150         result = DirectFunctionCall1(float4in, CStringGetDatum(tmp));
2151
2152         pfree(tmp);
2153
2154         PG_RETURN_DATUM(result);
2155 }
2156
2157
2158 /* ----------------------------------------------------------------------
2159  *
2160  * Aggregate functions
2161  *
2162  * The transition datatype for all these aggregates is a 3-element array
2163  * of Numeric, holding the values N, sum(X), sum(X*X) in that order.
2164  *
2165  * We represent N as a numeric mainly to avoid having to build a special
2166  * datatype; it's unlikely it'd overflow an int4, but ...
2167  *
2168  * ----------------------------------------------------------------------
2169  */
2170
2171 static ArrayType *
2172 do_numeric_accum(ArrayType *transarray, Numeric newval)
2173 {
2174         Datum      *transdatums;
2175         int                     ndatums;
2176         Datum           N,
2177                                 sumX,
2178                                 sumX2;
2179         ArrayType  *result;
2180
2181         /* We assume the input is array of numeric */
2182         deconstruct_array(transarray,
2183                                           NUMERICOID, -1, false, 'i',
2184                                           &transdatums, NULL, &ndatums);
2185         if (ndatums != 3)
2186                 elog(ERROR, "expected 3-element numeric array");
2187         N = transdatums[0];
2188         sumX = transdatums[1];
2189         sumX2 = transdatums[2];
2190
2191         N = DirectFunctionCall1(numeric_inc, N);
2192         sumX = DirectFunctionCall2(numeric_add, sumX,
2193                                                            NumericGetDatum(newval));
2194         sumX2 = DirectFunctionCall2(numeric_add, sumX2,
2195                                                                 DirectFunctionCall2(numeric_mul,
2196                                                                                                         NumericGetDatum(newval),
2197                                                                                                         NumericGetDatum(newval)));
2198
2199         transdatums[0] = N;
2200         transdatums[1] = sumX;
2201         transdatums[2] = sumX2;
2202
2203         result = construct_array(transdatums, 3,
2204                                                          NUMERICOID, -1, false, 'i');
2205
2206         return result;
2207 }
2208
2209 /*
2210  * Improve avg performance by not caclulating sum(X*X).
2211  */
2212 static ArrayType *
2213 do_numeric_avg_accum(ArrayType *transarray, Numeric newval)
2214 {
2215         Datum      *transdatums;
2216         int                     ndatums;
2217         Datum           N,
2218                                 sumX;
2219         ArrayType  *result;
2220
2221         /* We assume the input is array of numeric */
2222         deconstruct_array(transarray,
2223                                           NUMERICOID, -1, false, 'i',
2224                                           &transdatums, NULL, &ndatums);
2225         if (ndatums != 2)
2226                 elog(ERROR, "expected 2-element numeric array");
2227         N = transdatums[0];
2228         sumX = transdatums[1];
2229
2230         N = DirectFunctionCall1(numeric_inc, N);
2231         sumX = DirectFunctionCall2(numeric_add, sumX,
2232                                                            NumericGetDatum(newval));
2233
2234         transdatums[0] = N;
2235         transdatums[1] = sumX;
2236
2237         result = construct_array(transdatums, 2,
2238                                                          NUMERICOID, -1, false, 'i');
2239
2240         return result;
2241 }
2242
2243 Datum
2244 numeric_accum(PG_FUNCTION_ARGS)
2245 {
2246         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2247         Numeric         newval = PG_GETARG_NUMERIC(1);
2248
2249         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2250 }
2251
2252 /*
2253  * Optimized case for average of numeric.
2254  */
2255 Datum
2256 numeric_avg_accum(PG_FUNCTION_ARGS)
2257 {
2258         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2259         Numeric         newval = PG_GETARG_NUMERIC(1);
2260
2261         PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
2262 }
2263
2264 /*
2265  * Integer data types all use Numeric accumulators to share code and
2266  * avoid risk of overflow.      For int2 and int4 inputs, Numeric accumulation
2267  * is overkill for the N and sum(X) values, but definitely not overkill
2268  * for the sum(X*X) value.      Hence, we use int2_accum and int4_accum only
2269  * for stddev/variance --- there are faster special-purpose accumulator
2270  * routines for SUM and AVG of these datatypes.
2271  */
2272
2273 Datum
2274 int2_accum(PG_FUNCTION_ARGS)
2275 {
2276         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2277         Datum           newval2 = PG_GETARG_DATUM(1);
2278         Numeric         newval;
2279
2280         newval = DatumGetNumeric(DirectFunctionCall1(int2_numeric, newval2));
2281
2282         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2283 }
2284
2285 Datum
2286 int4_accum(PG_FUNCTION_ARGS)
2287 {
2288         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2289         Datum           newval4 = PG_GETARG_DATUM(1);
2290         Numeric         newval;
2291
2292         newval = DatumGetNumeric(DirectFunctionCall1(int4_numeric, newval4));
2293
2294         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2295 }
2296
2297 Datum
2298 int8_accum(PG_FUNCTION_ARGS)
2299 {
2300         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2301         Datum           newval8 = PG_GETARG_DATUM(1);
2302         Numeric         newval;
2303
2304         newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
2305
2306         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2307 }
2308
2309 /*
2310  * Optimized case for average of int8.
2311  */
2312 Datum
2313 int8_avg_accum(PG_FUNCTION_ARGS)
2314 {
2315         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2316         Datum           newval8 = PG_GETARG_DATUM(1);
2317         Numeric         newval;
2318
2319         newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
2320
2321         PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
2322 }
2323
2324
2325 Datum
2326 numeric_avg(PG_FUNCTION_ARGS)
2327 {
2328         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2329         Datum      *transdatums;
2330         int                     ndatums;
2331         Numeric         N,
2332                                 sumX;
2333
2334         /* We assume the input is array of numeric */
2335         deconstruct_array(transarray,
2336                                           NUMERICOID, -1, false, 'i',
2337                                           &transdatums, NULL, &ndatums);
2338         if (ndatums != 2)
2339                 elog(ERROR, "expected 2-element numeric array");
2340         N = DatumGetNumeric(transdatums[0]);
2341         sumX = DatumGetNumeric(transdatums[1]);
2342
2343         /* SQL92 defines AVG of no values to be NULL */
2344         /* N is zero iff no digits (cf. numeric_uminus) */
2345         if (VARSIZE(N) == NUMERIC_HDRSZ)
2346                 PG_RETURN_NULL();
2347
2348         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div,
2349                                                                                 NumericGetDatum(sumX),
2350                                                                                 NumericGetDatum(N)));
2351 }
2352
2353 /*
2354  * Workhorse routine for the standard deviance and variance
2355  * aggregates. 'transarray' is the aggregate's transition
2356  * array. 'variance' specifies whether we should calculate the
2357  * variance or the standard deviation. 'sample' indicates whether the
2358  * caller is interested in the sample or the population
2359  * variance/stddev.
2360  *
2361  * If appropriate variance statistic is undefined for the input,
2362  * *is_null is set to true and NULL is returned.
2363  */
2364 static Numeric
2365 numeric_stddev_internal(ArrayType *transarray,
2366                                                 bool variance, bool sample,
2367                                                 bool *is_null)
2368 {
2369         Datum      *transdatums;
2370         int                     ndatums;
2371         Numeric         N,
2372                                 sumX,
2373                                 sumX2,
2374                                 res;
2375         NumericVar      vN,
2376                                 vsumX,
2377                                 vsumX2,
2378                                 vNminus1;
2379         NumericVar *comp;
2380         int                     rscale;
2381
2382         *is_null = false;
2383
2384         /* We assume the input is array of numeric */
2385         deconstruct_array(transarray,
2386                                           NUMERICOID, -1, false, 'i',
2387                                           &transdatums, NULL, &ndatums);
2388         if (ndatums != 3)
2389                 elog(ERROR, "expected 3-element numeric array");
2390         N = DatumGetNumeric(transdatums[0]);
2391         sumX = DatumGetNumeric(transdatums[1]);
2392         sumX2 = DatumGetNumeric(transdatums[2]);
2393
2394         if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2))
2395                 return make_result(&const_nan);
2396
2397         init_var(&vN);
2398         set_var_from_num(N, &vN);
2399
2400         /*
2401          * Sample stddev and variance are undefined when N <= 1; population stddev
2402          * is undefined when N == 0. Return NULL in either case.
2403          */
2404         if (sample)
2405                 comp = &const_one;
2406         else
2407                 comp = &const_zero;
2408
2409         if (cmp_var(&vN, comp) <= 0)
2410         {
2411                 free_var(&vN);
2412                 *is_null = true;
2413                 return NULL;
2414         }
2415
2416         init_var(&vNminus1);
2417         sub_var(&vN, &const_one, &vNminus1);
2418
2419         init_var(&vsumX);
2420         set_var_from_num(sumX, &vsumX);
2421         init_var(&vsumX2);
2422         set_var_from_num(sumX2, &vsumX2);
2423
2424         /* compute rscale for mul_var calls */
2425         rscale = vsumX.dscale * 2;
2426
2427         mul_var(&vsumX, &vsumX, &vsumX, rscale);        /* vsumX = sumX * sumX */
2428         mul_var(&vN, &vsumX2, &vsumX2, rscale);         /* vsumX2 = N * sumX2 */
2429         sub_var(&vsumX2, &vsumX, &vsumX2);      /* N * sumX2 - sumX * sumX */
2430
2431         if (cmp_var(&vsumX2, &const_zero) <= 0)
2432         {
2433                 /* Watch out for roundoff error producing a negative numerator */
2434                 res = make_result(&const_zero);
2435         }
2436         else
2437         {
2438                 mul_var(&vN, &vNminus1, &vNminus1, 0);  /* N * (N - 1) */
2439                 rscale = select_div_scale(&vsumX2, &vNminus1);
2440                 div_var(&vsumX2, &vNminus1, &vsumX, rscale, true);              /* variance */
2441                 if (!variance)
2442                         sqrt_var(&vsumX, &vsumX, rscale);       /* stddev */
2443
2444                 res = make_result(&vsumX);
2445         }
2446
2447         free_var(&vN);
2448         free_var(&vNminus1);
2449         free_var(&vsumX);
2450         free_var(&vsumX2);
2451
2452         return res;
2453 }
2454
2455 Datum
2456 numeric_var_samp(PG_FUNCTION_ARGS)
2457 {
2458         Numeric         res;
2459         bool            is_null;
2460
2461         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2462                                                                   true, true, &is_null);
2463
2464         if (is_null)
2465                 PG_RETURN_NULL();
2466         else
2467                 PG_RETURN_NUMERIC(res);
2468 }
2469
2470 Datum
2471 numeric_stddev_samp(PG_FUNCTION_ARGS)
2472 {
2473         Numeric         res;
2474         bool            is_null;
2475
2476         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2477                                                                   false, true, &is_null);
2478
2479         if (is_null)
2480                 PG_RETURN_NULL();
2481         else
2482                 PG_RETURN_NUMERIC(res);
2483 }
2484
2485 Datum
2486 numeric_var_pop(PG_FUNCTION_ARGS)
2487 {
2488         Numeric         res;
2489         bool            is_null;
2490
2491         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2492                                                                   true, false, &is_null);
2493
2494         if (is_null)
2495                 PG_RETURN_NULL();
2496         else
2497                 PG_RETURN_NUMERIC(res);
2498 }
2499
2500 Datum
2501 numeric_stddev_pop(PG_FUNCTION_ARGS)
2502 {
2503         Numeric         res;
2504         bool            is_null;
2505
2506         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2507                                                                   false, false, &is_null);
2508
2509         if (is_null)
2510                 PG_RETURN_NULL();
2511         else
2512                 PG_RETURN_NUMERIC(res);
2513 }
2514
2515 /*
2516  * SUM transition functions for integer datatypes.
2517  *
2518  * To avoid overflow, we use accumulators wider than the input datatype.
2519  * A Numeric accumulator is needed for int8 input; for int4 and int2
2520  * inputs, we use int8 accumulators which should be sufficient for practical
2521  * purposes.  (The latter two therefore don't really belong in this file,
2522  * but we keep them here anyway.)
2523  *
2524  * Because SQL92 defines the SUM() of no values to be NULL, not zero,
2525  * the initial condition of the transition data value needs to be NULL. This
2526  * means we can't rely on ExecAgg to automatically insert the first non-null
2527  * data value into the transition data: it doesn't know how to do the type
2528  * conversion.  The upshot is that these routines have to be marked non-strict
2529  * and handle substitution of the first non-null input themselves.
2530  */
2531
2532 Datum
2533 int2_sum(PG_FUNCTION_ARGS)
2534 {
2535         int64           newval;
2536
2537         if (PG_ARGISNULL(0))
2538         {
2539                 /* No non-null input seen so far... */
2540                 if (PG_ARGISNULL(1))
2541                         PG_RETURN_NULL();       /* still no non-null */
2542                 /* This is the first non-null input. */
2543                 newval = (int64) PG_GETARG_INT16(1);
2544                 PG_RETURN_INT64(newval);
2545         }
2546
2547         /*
2548          * If we're invoked by nodeAgg, we can cheat and modify out first
2549          * parameter in-place to avoid palloc overhead. If not, we need to return
2550          * the new value of the transition variable.
2551          */
2552         if (fcinfo->context && IsA(fcinfo->context, AggState))
2553         {
2554                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
2555
2556                 /* Leave the running sum unchanged in the new input is null */
2557                 if (!PG_ARGISNULL(1))
2558                         *oldsum = *oldsum + (int64) PG_GETARG_INT16(1);
2559
2560                 PG_RETURN_POINTER(oldsum);
2561         }
2562         else
2563         {
2564                 int64           oldsum = PG_GETARG_INT64(0);
2565
2566                 /* Leave sum unchanged if new input is null. */
2567                 if (PG_ARGISNULL(1))
2568                         PG_RETURN_INT64(oldsum);
2569
2570                 /* OK to do the addition. */
2571                 newval = oldsum + (int64) PG_GETARG_INT16(1);
2572
2573                 PG_RETURN_INT64(newval);
2574         }
2575 }
2576
2577 Datum
2578 int4_sum(PG_FUNCTION_ARGS)
2579 {
2580         int64           newval;
2581
2582         if (PG_ARGISNULL(0))
2583         {
2584                 /* No non-null input seen so far... */
2585                 if (PG_ARGISNULL(1))
2586                         PG_RETURN_NULL();       /* still no non-null */
2587                 /* This is the first non-null input. */
2588                 newval = (int64) PG_GETARG_INT32(1);
2589                 PG_RETURN_INT64(newval);
2590         }
2591
2592         /*
2593          * If we're invoked by nodeAgg, we can cheat and modify out first
2594          * parameter in-place to avoid palloc overhead. If not, we need to return
2595          * the new value of the transition variable.
2596          */
2597         if (fcinfo->context && IsA(fcinfo->context, AggState))
2598         {
2599                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
2600
2601                 /* Leave the running sum unchanged in the new input is null */
2602                 if (!PG_ARGISNULL(1))
2603                         *oldsum = *oldsum + (int64) PG_GETARG_INT32(1);
2604
2605                 PG_RETURN_POINTER(oldsum);
2606         }
2607         else
2608         {
2609                 int64           oldsum = PG_GETARG_INT64(0);
2610
2611                 /* Leave sum unchanged if new input is null. */
2612                 if (PG_ARGISNULL(1))
2613                         PG_RETURN_INT64(oldsum);
2614
2615                 /* OK to do the addition. */
2616                 newval = oldsum + (int64) PG_GETARG_INT32(1);
2617
2618                 PG_RETURN_INT64(newval);
2619         }
2620 }
2621
2622 Datum
2623 int8_sum(PG_FUNCTION_ARGS)
2624 {
2625         Numeric         oldsum;
2626         Datum           newval;
2627
2628         if (PG_ARGISNULL(0))
2629         {
2630                 /* No non-null input seen so far... */
2631                 if (PG_ARGISNULL(1))
2632                         PG_RETURN_NULL();       /* still no non-null */
2633                 /* This is the first non-null input. */
2634                 newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2635                 PG_RETURN_DATUM(newval);
2636         }
2637
2638         /*
2639          * Note that we cannot special-case the nodeAgg case here, as we do for
2640          * int2_sum and int4_sum: numeric is of variable size, so we cannot modify
2641          * our first parameter in-place.
2642          */
2643
2644         oldsum = PG_GETARG_NUMERIC(0);
2645
2646         /* Leave sum unchanged if new input is null. */
2647         if (PG_ARGISNULL(1))
2648                 PG_RETURN_NUMERIC(oldsum);
2649
2650         /* OK to do the addition. */
2651         newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2652
2653         PG_RETURN_DATUM(DirectFunctionCall2(numeric_add,
2654                                                                                 NumericGetDatum(oldsum), newval));
2655 }
2656
2657
2658 /*
2659  * Routines for avg(int2) and avg(int4).  The transition datatype
2660  * is a two-element int8 array, holding count and sum.
2661  */
2662
2663 typedef struct Int8TransTypeData
2664 {
2665 #ifndef INT64_IS_BUSTED
2666         int64           count;
2667         int64           sum;
2668 #else
2669         /* "int64" isn't really 64 bits, so fake up properly-aligned fields */
2670         int32           count;
2671         int32           pad1;
2672         int32           sum;
2673         int32           pad2;
2674 #endif
2675 } Int8TransTypeData;
2676
2677 Datum
2678 int2_avg_accum(PG_FUNCTION_ARGS)
2679 {
2680         ArrayType  *transarray;
2681         int16           newval = PG_GETARG_INT16(1);
2682         Int8TransTypeData *transdata;
2683
2684         /*
2685          * If we're invoked by nodeAgg, we can cheat and modify our first
2686          * parameter in-place to reduce palloc overhead. Otherwise we need to make
2687          * a copy of it before scribbling on it.
2688          */
2689         if (fcinfo->context && IsA(fcinfo->context, AggState))
2690                 transarray = PG_GETARG_ARRAYTYPE_P(0);
2691         else
2692                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2693
2694         if (ARR_HASNULL(transarray) ||
2695                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2696                 elog(ERROR, "expected 2-element int8 array");
2697
2698         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2699         transdata->count++;
2700         transdata->sum += newval;
2701
2702         PG_RETURN_ARRAYTYPE_P(transarray);
2703 }
2704
2705 Datum
2706 int4_avg_accum(PG_FUNCTION_ARGS)
2707 {
2708         ArrayType  *transarray;
2709         int32           newval = PG_GETARG_INT32(1);
2710         Int8TransTypeData *transdata;
2711
2712         /*
2713          * If we're invoked by nodeAgg, we can cheat and modify our first
2714          * parameter in-place to reduce palloc overhead. Otherwise we need to make
2715          * a copy of it before scribbling on it.
2716          */
2717         if (fcinfo->context && IsA(fcinfo->context, AggState))
2718                 transarray = PG_GETARG_ARRAYTYPE_P(0);
2719         else
2720                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2721
2722         if (ARR_HASNULL(transarray) ||
2723                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2724                 elog(ERROR, "expected 2-element int8 array");
2725
2726         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2727         transdata->count++;
2728         transdata->sum += newval;
2729
2730         PG_RETURN_ARRAYTYPE_P(transarray);
2731 }
2732
2733 Datum
2734 int8_avg(PG_FUNCTION_ARGS)
2735 {
2736         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2737         Int8TransTypeData *transdata;
2738         Datum           countd,
2739                                 sumd;
2740
2741         if (ARR_HASNULL(transarray) ||
2742                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2743                 elog(ERROR, "expected 2-element int8 array");
2744         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2745
2746         /* SQL92 defines AVG of no values to be NULL */
2747         if (transdata->count == 0)
2748                 PG_RETURN_NULL();
2749
2750         countd = DirectFunctionCall1(int8_numeric,
2751                                                                  Int64GetDatumFast(transdata->count));
2752         sumd = DirectFunctionCall1(int8_numeric,
2753                                                            Int64GetDatumFast(transdata->sum));
2754
2755         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumd, countd));
2756 }
2757
2758
2759 /* ----------------------------------------------------------------------
2760  *
2761  * Debug support
2762  *
2763  * ----------------------------------------------------------------------
2764  */
2765
2766 #ifdef NUMERIC_DEBUG
2767
2768 /*
2769  * dump_numeric() - Dump a value in the db storage format for debugging
2770  */
2771 static void
2772 dump_numeric(const char *str, Numeric num)
2773 {
2774         NumericDigit *digits = NUMERIC_DIGITS(num);
2775         int                     ndigits;
2776         int                     i;
2777
2778         ndigits = NUMERIC_NDIGITS(num);
2779
2780         printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num));
2781         switch (NUMERIC_SIGN(num))
2782         {
2783                 case NUMERIC_POS:
2784                         printf("POS");
2785                         break;
2786                 case NUMERIC_NEG:
2787                         printf("NEG");
2788                         break;
2789                 case NUMERIC_NAN:
2790                         printf("NaN");
2791                         break;
2792                 default:
2793                         printf("SIGN=0x%x", NUMERIC_SIGN(num));
2794                         break;
2795         }
2796
2797         for (i = 0; i < ndigits; i++)
2798                 printf(" %0*d", DEC_DIGITS, digits[i]);
2799         printf("\n");
2800 }
2801
2802
2803 /*
2804  * dump_var() - Dump a value in the variable format for debugging
2805  */
2806 static void
2807 dump_var(const char *str, NumericVar *var)
2808 {
2809         int                     i;
2810
2811         printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale);
2812         switch (var->sign)
2813         {
2814                 case NUMERIC_POS:
2815                         printf("POS");
2816                         break;
2817                 case NUMERIC_NEG:
2818                         printf("NEG");
2819                         break;
2820                 case NUMERIC_NAN:
2821                         printf("NaN");
2822                         break;
2823                 default:
2824                         printf("SIGN=0x%x", var->sign);
2825                         break;
2826         }
2827
2828         for (i = 0; i < var->ndigits; i++)
2829                 printf(" %0*d", DEC_DIGITS, var->digits[i]);
2830
2831         printf("\n");
2832 }
2833 #endif   /* NUMERIC_DEBUG */
2834
2835
2836 /* ----------------------------------------------------------------------
2837  *
2838  * Local functions follow
2839  *
2840  * In general, these do not support NaNs --- callers must eliminate
2841  * the possibility of NaN first.  (make_result() is an exception.)
2842  *
2843  * ----------------------------------------------------------------------
2844  */
2845
2846
2847 /*
2848  * alloc_var() -
2849  *
2850  *      Allocate a digit buffer of ndigits digits (plus a spare digit for rounding)
2851  */
2852 static void
2853 alloc_var(NumericVar *var, int ndigits)
2854 {
2855         digitbuf_free(var->buf);
2856         var->buf = digitbuf_alloc(ndigits + 1);
2857         var->buf[0] = 0;                        /* spare digit for rounding */
2858         var->digits = var->buf + 1;
2859         var->ndigits = ndigits;
2860 }
2861
2862
2863 /*
2864  * free_var() -
2865  *
2866  *      Return the digit buffer of a variable to the free pool
2867  */
2868 static void
2869 free_var(NumericVar *var)
2870 {
2871         digitbuf_free(var->buf);
2872         var->buf = NULL;
2873         var->digits = NULL;
2874         var->sign = NUMERIC_NAN;
2875 }
2876
2877
2878 /*
2879  * zero_var() -
2880  *
2881  *      Set a variable to ZERO.
2882  *      Note: its dscale is not touched.
2883  */
2884 static void
2885 zero_var(NumericVar *var)
2886 {
2887         digitbuf_free(var->buf);
2888         var->buf = NULL;
2889         var->digits = NULL;
2890         var->ndigits = 0;
2891         var->weight = 0;                        /* by convention; doesn't really matter */
2892         var->sign = NUMERIC_POS;        /* anything but NAN... */
2893 }
2894
2895
2896 /*
2897  * set_var_from_str()
2898  *
2899  *      Parse a string and put the number into a variable
2900  */
2901 static void
2902 set_var_from_str(const char *str, NumericVar *dest)
2903 {
2904         const char *cp = str;
2905         bool            have_dp = FALSE;
2906         int                     i;
2907         unsigned char *decdigits;
2908         int                     sign = NUMERIC_POS;
2909         int                     dweight = -1;
2910         int                     ddigits;
2911         int                     dscale = 0;
2912         int                     weight;
2913         int                     ndigits;
2914         int                     offset;
2915         NumericDigit *digits;
2916
2917         /*
2918          * We first parse the string to extract decimal digits and determine the
2919          * correct decimal weight.      Then convert to NBASE representation.
2920          */
2921
2922         /* skip leading spaces */
2923         while (*cp)
2924         {
2925                 if (!isspace((unsigned char) *cp))
2926                         break;
2927                 cp++;
2928         }
2929
2930         switch (*cp)
2931         {
2932                 case '+':
2933                         sign = NUMERIC_POS;
2934                         cp++;
2935                         break;
2936
2937                 case '-':
2938                         sign = NUMERIC_NEG;
2939                         cp++;
2940                         break;
2941         }
2942
2943         if (*cp == '.')
2944         {
2945                 have_dp = TRUE;
2946                 cp++;
2947         }
2948
2949         if (!isdigit((unsigned char) *cp))
2950                 ereport(ERROR,
2951                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2952                           errmsg("invalid input syntax for type numeric: \"%s\"", str)));
2953
2954         decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS * 2);
2955
2956         /* leading padding for digit alignment later */
2957         memset(decdigits, 0, DEC_DIGITS);
2958         i = DEC_DIGITS;
2959
2960         while (*cp)
2961         {
2962                 if (isdigit((unsigned char) *cp))
2963                 {
2964                         decdigits[i++] = *cp++ - '0';
2965                         if (!have_dp)
2966                                 dweight++;
2967                         else
2968                                 dscale++;
2969                 }
2970                 else if (*cp == '.')
2971                 {
2972                         if (have_dp)
2973                                 ereport(ERROR,
2974                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2975                                           errmsg("invalid input syntax for type numeric: \"%s\"",
2976                                                          str)));
2977                         have_dp = TRUE;
2978                         cp++;
2979                 }
2980                 else
2981                         break;
2982         }
2983
2984         ddigits = i - DEC_DIGITS;
2985         /* trailing padding for digit alignment later */
2986         memset(decdigits + i, 0, DEC_DIGITS - 1);
2987
2988         /* Handle exponent, if any */
2989         if (*cp == 'e' || *cp == 'E')
2990         {
2991                 long            exponent;
2992                 char       *endptr;
2993
2994                 cp++;
2995                 exponent = strtol(cp, &endptr, 10);
2996                 if (endptr == cp)
2997                         ereport(ERROR,
2998                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
2999                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3000                                                         str)));
3001                 cp = endptr;
3002                 if (exponent > NUMERIC_MAX_PRECISION ||
3003                         exponent < -NUMERIC_MAX_PRECISION)
3004                         ereport(ERROR,
3005                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3006                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3007                                                         str)));
3008                 dweight += (int) exponent;
3009                 dscale -= (int) exponent;
3010                 if (dscale < 0)
3011                         dscale = 0;
3012         }
3013
3014         /* Should be nothing left but spaces */
3015         while (*cp)
3016         {
3017                 if (!isspace((unsigned char) *cp))
3018                         ereport(ERROR,
3019                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3020                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3021                                                         str)));
3022                 cp++;
3023         }
3024
3025         /*
3026          * Okay, convert pure-decimal representation to base NBASE.  First we need
3027          * to determine the converted weight and ndigits.  offset is the number of
3028          * decimal zeroes to insert before the first given digit to have a
3029          * correctly aligned first NBASE digit.
3030          */
3031         if (dweight >= 0)
3032                 weight = (dweight + 1 + DEC_DIGITS - 1) / DEC_DIGITS - 1;
3033         else
3034                 weight = -((-dweight - 1) / DEC_DIGITS + 1);
3035         offset = (weight + 1) * DEC_DIGITS - (dweight + 1);
3036         ndigits = (ddigits + offset + DEC_DIGITS - 1) / DEC_DIGITS;
3037
3038         alloc_var(dest, ndigits);
3039         dest->sign = sign;
3040         dest->weight = weight;
3041         dest->dscale = dscale;
3042
3043         i = DEC_DIGITS - offset;
3044         digits = dest->digits;
3045
3046         while (ndigits-- > 0)
3047         {
3048 #if DEC_DIGITS == 4
3049                 *digits++ = ((decdigits[i] * 10 + decdigits[i + 1]) * 10 +
3050                                          decdigits[i + 2]) * 10 + decdigits[i + 3];
3051 #elif DEC_DIGITS == 2
3052                 *digits++ = decdigits[i] * 10 + decdigits[i + 1];
3053 #elif DEC_DIGITS == 1
3054                 *digits++ = decdigits[i];
3055 #else
3056 #error unsupported NBASE
3057 #endif
3058                 i += DEC_DIGITS;
3059         }
3060
3061         pfree(decdigits);
3062
3063         /* Strip any leading/trailing zeroes, and normalize weight if zero */
3064         strip_var(dest);
3065 }
3066
3067
3068 /*
3069  * set_var_from_num() -
3070  *
3071  *      Convert the packed db format into a variable
3072  */
3073 static void
3074 set_var_from_num(Numeric num, NumericVar *dest)
3075 {
3076         int                     ndigits;
3077
3078         ndigits = NUMERIC_NDIGITS(num);
3079
3080         alloc_var(dest, ndigits);
3081
3082         dest->weight = num->n_weight;
3083         dest->sign = NUMERIC_SIGN(num);
3084         dest->dscale = NUMERIC_DSCALE(num);
3085
3086         memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit));
3087 }
3088
3089
3090 /*
3091  * set_var_from_var() -
3092  *
3093  *      Copy one variable into another
3094  */
3095 static void
3096 set_var_from_var(NumericVar *value, NumericVar *dest)
3097 {
3098         NumericDigit *newbuf;
3099
3100         newbuf = digitbuf_alloc(value->ndigits + 1);
3101         newbuf[0] = 0;                          /* spare digit for rounding */
3102         memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit));
3103
3104         digitbuf_free(dest->buf);
3105
3106         memmove(dest, value, sizeof(NumericVar));
3107         dest->buf = newbuf;
3108         dest->digits = newbuf + 1;
3109 }
3110
3111
3112 /*
3113  * get_str_from_var() -
3114  *
3115  *      Convert a var to text representation (guts of numeric_out).
3116  *      CAUTION: var's contents may be modified by rounding!
3117  *      Returns a palloc'd string.
3118  */
3119 static char *
3120 get_str_from_var(NumericVar *var, int dscale)
3121 {
3122         char       *str;
3123         char       *cp;
3124         char       *endcp;
3125         int                     i;
3126         int                     d;
3127         NumericDigit dig;
3128
3129 #if DEC_DIGITS > 1
3130         NumericDigit d1;
3131 #endif
3132
3133         if (dscale < 0)
3134                 dscale = 0;
3135
3136         /*
3137          * Check if we must round up before printing the value and do so.
3138          */
3139         round_var(var, dscale);
3140
3141         /*
3142          * Allocate space for the result.
3143          *
3144          * i is set to to # of decimal digits before decimal point. dscale is the
3145          * # of decimal digits we will print after decimal point. We may generate
3146          * as many as DEC_DIGITS-1 excess digits at the end, and in addition we
3147          * need room for sign, decimal point, null terminator.
3148          */
3149         i = (var->weight + 1) * DEC_DIGITS;
3150         if (i <= 0)
3151                 i = 1;
3152
3153         str = palloc(i + dscale + DEC_DIGITS + 2);
3154         cp = str;
3155
3156         /*
3157          * Output a dash for negative values
3158          */
3159         if (var->sign == NUMERIC_NEG)
3160                 *cp++ = '-';
3161
3162         /*
3163          * Output all digits before the decimal point
3164          */
3165         if (var->weight < 0)
3166         {
3167                 d = var->weight + 1;
3168                 *cp++ = '0';
3169         }
3170         else
3171         {
3172                 for (d = 0; d <= var->weight; d++)
3173                 {
3174                         dig = (d < var->ndigits) ? var->digits[d] : 0;
3175                         /* In the first digit, suppress extra leading decimal zeroes */
3176 #if DEC_DIGITS == 4
3177                         {
3178                                 bool            putit = (d > 0);
3179
3180                                 d1 = dig / 1000;
3181                                 dig -= d1 * 1000;
3182                                 putit |= (d1 > 0);
3183                                 if (putit)
3184                                         *cp++ = d1 + '0';
3185                                 d1 = dig / 100;
3186                                 dig -= d1 * 100;
3187                                 putit |= (d1 > 0);
3188                                 if (putit)
3189                                         *cp++ = d1 + '0';
3190                                 d1 = dig / 10;
3191                                 dig -= d1 * 10;
3192                                 putit |= (d1 > 0);
3193                                 if (putit)
3194                                         *cp++ = d1 + '0';
3195                                 *cp++ = dig + '0';
3196                         }
3197 #elif DEC_DIGITS == 2
3198                         d1 = dig / 10;
3199                         dig -= d1 * 10;
3200                         if (d1 > 0 || d > 0)
3201                                 *cp++ = d1 + '0';
3202                         *cp++ = dig + '0';
3203 #elif DEC_DIGITS == 1
3204                         *cp++ = dig + '0';
3205 #else
3206 #error unsupported NBASE
3207 #endif
3208                 }
3209         }
3210
3211         /*
3212          * If requested, output a decimal point and all the digits that follow it.
3213          * We initially put out a multiple of DEC_DIGITS digits, then truncate if
3214          * needed.
3215          */
3216         if (dscale > 0)
3217         {
3218                 *cp++ = '.';
3219                 endcp = cp + dscale;
3220                 for (i = 0; i < dscale; d++, i += DEC_DIGITS)
3221                 {
3222                         dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0;
3223 #if DEC_DIGITS == 4
3224                         d1 = dig / 1000;
3225                         dig -= d1 * 1000;
3226                         *cp++ = d1 + '0';
3227                         d1 = dig / 100;
3228                         dig -= d1 * 100;
3229                         *cp++ = d1 + '0';
3230                         d1 = dig / 10;
3231                         dig -= d1 * 10;
3232                         *cp++ = d1 + '0';
3233                         *cp++ = dig + '0';
3234 #elif DEC_DIGITS == 2
3235                         d1 = dig / 10;
3236                         dig -= d1 * 10;
3237                         *cp++ = d1 + '0';
3238                         *cp++ = dig + '0';
3239 #elif DEC_DIGITS == 1
3240                         *cp++ = dig + '0';
3241 #else
3242 #error unsupported NBASE
3243 #endif
3244                 }
3245                 cp = endcp;
3246         }
3247
3248         /*
3249          * terminate the string and return it
3250          */
3251         *cp = '\0';
3252         return str;
3253 }
3254
3255
3256 /*
3257  * make_result() -
3258  *
3259  *      Create the packed db numeric format in palloc()'d memory from
3260  *      a variable.
3261  */
3262 static Numeric
3263 make_result(NumericVar *var)
3264 {
3265         Numeric         result;
3266         NumericDigit *digits = var->digits;
3267         int                     weight = var->weight;
3268         int                     sign = var->sign;
3269         int                     n;
3270         Size            len;
3271
3272         if (sign == NUMERIC_NAN)
3273         {
3274                 result = (Numeric) palloc(NUMERIC_HDRSZ);
3275
3276                 SET_VARSIZE(result, NUMERIC_HDRSZ);
3277                 result->n_weight = 0;
3278                 result->n_sign_dscale = NUMERIC_NAN;
3279
3280                 dump_numeric("make_result()", result);
3281                 return result;
3282         }
3283
3284         n = var->ndigits;
3285
3286         /* truncate leading zeroes */
3287         while (n > 0 && *digits == 0)
3288         {
3289                 digits++;
3290                 weight--;
3291                 n--;
3292         }
3293         /* truncate trailing zeroes */
3294         while (n > 0 && digits[n - 1] == 0)
3295                 n--;
3296
3297         /* If zero result, force to weight=0 and positive sign */
3298         if (n == 0)
3299         {
3300                 weight = 0;
3301                 sign = NUMERIC_POS;
3302         }
3303
3304         /* Build the result */
3305         len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
3306         result = (Numeric) palloc(len);
3307         SET_VARSIZE(result, len);
3308         result->n_weight = weight;
3309         result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK);
3310
3311         memcpy(result->n_data, digits, n * sizeof(NumericDigit));
3312
3313         /* Check for overflow of int16 fields */
3314         if (result->n_weight != weight ||
3315                 NUMERIC_DSCALE(result) != var->dscale)
3316                 ereport(ERROR,
3317                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3318                                  errmsg("value overflows numeric format")));
3319
3320         dump_numeric("make_result()", result);
3321         return result;
3322 }
3323
3324
3325 /*
3326  * apply_typmod() -
3327  *
3328  *      Do bounds checking and rounding according to the attributes
3329  *      typmod field.
3330  */
3331 static void
3332 apply_typmod(NumericVar *var, int32 typmod)
3333 {
3334         int                     precision;
3335         int                     scale;
3336         int                     maxdigits;
3337         int                     ddigits;
3338         int                     i;
3339
3340         /* Do nothing if we have a default typmod (-1) */
3341         if (typmod < (int32) (VARHDRSZ))
3342                 return;
3343
3344         typmod -= VARHDRSZ;
3345         precision = (typmod >> 16) & 0xffff;
3346         scale = typmod & 0xffff;
3347         maxdigits = precision - scale;
3348
3349         /* Round to target scale (and set var->dscale) */
3350         round_var(var, scale);
3351
3352         /*
3353          * Check for overflow - note we can't do this before rounding, because
3354          * rounding could raise the weight.  Also note that the var's weight could
3355          * be inflated by leading zeroes, which will be stripped before storage
3356          * but perhaps might not have been yet. In any case, we must recognize a
3357          * true zero, whose weight doesn't mean anything.
3358          */
3359         ddigits = (var->weight + 1) * DEC_DIGITS;
3360         if (ddigits > maxdigits)
3361         {
3362                 /* Determine true weight; and check for all-zero result */
3363                 for (i = 0; i < var->ndigits; i++)
3364                 {
3365                         NumericDigit dig = var->digits[i];
3366
3367                         if (dig)
3368                         {
3369                                 /* Adjust for any high-order decimal zero digits */
3370 #if DEC_DIGITS == 4
3371                                 if (dig < 10)
3372                                         ddigits -= 3;
3373                                 else if (dig < 100)
3374                                         ddigits -= 2;
3375                                 else if (dig < 1000)
3376                                         ddigits -= 1;
3377 #elif DEC_DIGITS == 2
3378                                 if (dig < 10)
3379                                         ddigits -= 1;
3380 #elif DEC_DIGITS == 1
3381                                 /* no adjustment */
3382 #else
3383 #error unsupported NBASE
3384 #endif
3385                                 if (ddigits > maxdigits)
3386                                         ereport(ERROR,
3387                                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3388                                                          errmsg("numeric field overflow"),
3389                                                          errdetail("A field with precision %d, scale %d must round to an absolute value less than %s%d.",
3390                                                                            precision, scale,
3391                                         /* Display 10^0 as 1 */
3392                                                                            maxdigits ? "10^" : "",
3393                                                                            maxdigits ? maxdigits : 1
3394                                                                            )));
3395                                 break;
3396                         }
3397                         ddigits -= DEC_DIGITS;
3398                 }
3399         }
3400 }
3401
3402 /*
3403  * Convert numeric to int8, rounding if needed.
3404  *
3405  * If overflow, return FALSE (no error is raised).      Return TRUE if okay.
3406  *
3407  *      CAUTION: var's contents may be modified by rounding!
3408  */
3409 static bool
3410 numericvar_to_int8(NumericVar *var, int64 *result)
3411 {
3412         NumericDigit *digits;
3413         int                     ndigits;
3414         int                     weight;
3415         int                     i;
3416         int64           val,
3417                                 oldval;
3418         bool            neg;
3419
3420         /* Round to nearest integer */
3421         round_var(var, 0);
3422
3423         /* Check for zero input */
3424         strip_var(var);
3425         ndigits = var->ndigits;
3426         if (ndigits == 0)
3427         {
3428                 *result = 0;
3429                 return true;
3430         }
3431
3432         /*
3433          * For input like 10000000000, we must treat stripped digits as real. So
3434          * the loop assumes there are weight+1 digits before the decimal point.
3435          */
3436         weight = var->weight;
3437         Assert(weight >= 0 && ndigits <= weight + 1);
3438
3439         /* Construct the result */
3440         digits = var->digits;
3441         neg = (var->sign == NUMERIC_NEG);
3442         val = digits[0];
3443         for (i = 1; i <= weight; i++)
3444         {
3445                 oldval = val;
3446                 val *= NBASE;
3447                 if (i < ndigits)
3448                         val += digits[i];
3449
3450                 /*
3451                  * The overflow check is a bit tricky because we want to accept
3452                  * INT64_MIN, which will overflow the positive accumulator.  We can
3453                  * detect this case easily though because INT64_MIN is the only
3454                  * nonzero value for which -val == val (on a two's complement machine,
3455                  * anyway).
3456                  */
3457                 if ((val / NBASE) != oldval)    /* possible overflow? */
3458                 {
3459                         if (!neg || (-val) != val || val == 0 || oldval < 0)
3460                                 return false;
3461                 }
3462         }
3463
3464         *result = neg ? -val : val;
3465         return true;
3466 }
3467
3468 /*
3469  * Convert int8 value to numeric.
3470  */
3471 static void
3472 int8_to_numericvar(int64 val, NumericVar *var)
3473 {
3474         uint64          uval,
3475                                 newuval;
3476         NumericDigit *ptr;
3477         int                     ndigits;
3478
3479         /* int8 can require at most 19 decimal digits; add one for safety */
3480         alloc_var(var, 20 / DEC_DIGITS);
3481         if (val < 0)
3482         {
3483                 var->sign = NUMERIC_NEG;
3484                 uval = -val;
3485         }
3486         else
3487         {
3488                 var->sign = NUMERIC_POS;
3489                 uval = val;
3490         }
3491         var->dscale = 0;
3492         if (val == 0)
3493         {
3494                 var->ndigits = 0;
3495                 var->weight = 0;
3496                 return;
3497         }
3498         ptr = var->digits + var->ndigits;
3499         ndigits = 0;
3500         do
3501         {
3502                 ptr--;
3503                 ndigits++;
3504                 newuval = uval / NBASE;
3505                 *ptr = uval - newuval * NBASE;
3506                 uval = newuval;
3507         } while (uval);
3508         var->digits = ptr;
3509         var->ndigits = ndigits;
3510         var->weight = ndigits - 1;
3511 }
3512
3513 /*
3514  * Convert numeric to float8; if out of range, return +/- HUGE_VAL
3515  */
3516 static double
3517 numeric_to_double_no_overflow(Numeric num)
3518 {
3519         char       *tmp;
3520         double          val;
3521         char       *endptr;
3522
3523         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
3524                                                                                           NumericGetDatum(num)));
3525
3526         /* unlike float8in, we ignore ERANGE from strtod */
3527         val = strtod(tmp, &endptr);
3528         if (*endptr != '\0')
3529         {
3530                 /* shouldn't happen ... */
3531                 ereport(ERROR,
3532                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3533                          errmsg("invalid input syntax for type double precision: \"%s\"",
3534                                         tmp)));
3535         }
3536
3537         pfree(tmp);
3538
3539         return val;
3540 }
3541
3542 /* As above, but work from a NumericVar */
3543 static double
3544 numericvar_to_double_no_overflow(NumericVar *var)
3545 {
3546         char       *tmp;
3547         double          val;
3548         char       *endptr;
3549
3550         tmp = get_str_from_var(var, var->dscale);
3551
3552         /* unlike float8in, we ignore ERANGE from strtod */
3553         val = strtod(tmp, &endptr);
3554         if (*endptr != '\0')
3555         {
3556                 /* shouldn't happen ... */
3557                 ereport(ERROR,
3558                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3559                          errmsg("invalid input syntax for type double precision: \"%s\"",
3560                                         tmp)));
3561         }
3562
3563         pfree(tmp);
3564
3565         return val;
3566 }
3567
3568
3569 /*
3570  * cmp_var() -
3571  *
3572  *      Compare two values on variable level.  We assume zeroes have been
3573  *      truncated to no digits.
3574  */
3575 static int
3576 cmp_var(NumericVar *var1, NumericVar *var2)
3577 {
3578         return cmp_var_common(var1->digits, var1->ndigits,
3579                                                   var1->weight, var1->sign,
3580                                                   var2->digits, var2->ndigits,
3581                                                   var2->weight, var2->sign);
3582 }
3583
3584 /*
3585  * cmp_var_common() -
3586  *
3587  *      Main routine of cmp_var(). This function can be used by both
3588  *      NumericVar and Numeric.
3589  */
3590 static int
3591 cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
3592                            int var1weight, int var1sign,
3593                            const NumericDigit *var2digits, int var2ndigits,
3594                            int var2weight, int var2sign)
3595 {
3596         if (var1ndigits == 0)
3597         {
3598                 if (var2ndigits == 0)
3599                         return 0;
3600                 if (var2sign == NUMERIC_NEG)
3601                         return 1;
3602                 return -1;
3603         }
3604         if (var2ndigits == 0)
3605         {
3606                 if (var1sign == NUMERIC_POS)
3607                         return 1;
3608                 return -1;
3609         }
3610
3611         if (var1sign == NUMERIC_POS)
3612         {
3613                 if (var2sign == NUMERIC_NEG)
3614                         return 1;
3615                 return cmp_abs_common(var1digits, var1ndigits, var1weight,
3616                                                           var2digits, var2ndigits, var2weight);
3617         }
3618
3619         if (var2sign == NUMERIC_POS)
3620                 return -1;
3621
3622         return cmp_abs_common(var2digits, var2ndigits, var2weight,
3623                                                   var1digits, var1ndigits, var1weight);
3624 }
3625
3626
3627 /*
3628  * add_var() -
3629  *
3630  *      Full version of add functionality on variable level (handling signs).
3631  *      result might point to one of the operands too without danger.
3632  */
3633 static void
3634 add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3635 {
3636         /*
3637          * Decide on the signs of the two variables what to do
3638          */
3639         if (var1->sign == NUMERIC_POS)
3640         {
3641                 if (var2->sign == NUMERIC_POS)
3642                 {
3643                         /*
3644                          * Both are positive result = +(ABS(var1) + ABS(var2))
3645                          */
3646                         add_abs(var1, var2, result);
3647                         result->sign = NUMERIC_POS;
3648                 }
3649                 else
3650                 {
3651                         /*
3652                          * var1 is positive, var2 is negative Must compare absolute values
3653                          */
3654                         switch (cmp_abs(var1, var2))
3655                         {
3656                                 case 0:
3657                                         /* ----------
3658                                          * ABS(var1) == ABS(var2)
3659                                          * result = ZERO
3660                                          * ----------
3661                                          */
3662                                         zero_var(result);
3663                                         result->dscale = Max(var1->dscale, var2->dscale);
3664                                         break;
3665
3666                                 case 1:
3667                                         /* ----------
3668                                          * ABS(var1) > ABS(var2)
3669                                          * result = +(ABS(var1) - ABS(var2))
3670                                          * ----------
3671                                          */
3672                                         sub_abs(var1, var2, result);
3673                                         result->sign = NUMERIC_POS;
3674                                         break;
3675
3676                                 case -1:
3677                                         /* ----------
3678                                          * ABS(var1) < ABS(var2)
3679                                          * result = -(ABS(var2) - ABS(var1))
3680                                          * ----------
3681                                          */
3682                                         sub_abs(var2, var1, result);
3683                                         result->sign = NUMERIC_NEG;
3684                                         break;
3685                         }
3686                 }
3687         }
3688         else
3689         {
3690                 if (var2->sign == NUMERIC_POS)
3691                 {
3692                         /* ----------
3693                          * var1 is negative, var2 is positive
3694                          * Must compare absolute values
3695                          * ----------
3696                          */
3697                         switch (cmp_abs(var1, var2))
3698                         {
3699                                 case 0:
3700                                         /* ----------
3701                                          * ABS(var1) == ABS(var2)
3702                                          * result = ZERO
3703                                          * ----------
3704                                          */
3705                                         zero_var(result);
3706                                         result->dscale = Max(var1->dscale, var2->dscale);
3707                                         break;
3708
3709                                 case 1:
3710                                         /* ----------
3711                                          * ABS(var1) > ABS(var2)
3712                                          * result = -(ABS(var1) - ABS(var2))
3713                                          * ----------
3714                                          */
3715                                         sub_abs(var1, var2, result);
3716                                         result->sign = NUMERIC_NEG;
3717                                         break;
3718
3719                                 case -1:
3720                                         /* ----------
3721                                          * ABS(var1) < ABS(var2)
3722                                          * result = +(ABS(var2) - ABS(var1))
3723                                          * ----------
3724                                          */
3725                                         sub_abs(var2, var1, result);
3726                                         result->sign = NUMERIC_POS;
3727                                         break;
3728                         }
3729                 }
3730                 else
3731                 {
3732                         /* ----------
3733                          * Both are negative
3734                          * result = -(ABS(var1) + ABS(var2))
3735                          * ----------
3736                          */
3737                         add_abs(var1, var2, result);
3738                         result->sign = NUMERIC_NEG;
3739                 }
3740         }
3741 }
3742
3743
3744 /*
3745  * sub_var() -
3746  *
3747  *      Full version of sub functionality on variable level (handling signs).
3748  *      result might point to one of the operands too without danger.
3749  */
3750 static void
3751 sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3752 {
3753         /*
3754          * Decide on the signs of the two variables what to do
3755          */
3756         if (var1->sign == NUMERIC_POS)
3757         {
3758                 if (var2->sign == NUMERIC_NEG)
3759                 {
3760                         /* ----------
3761                          * var1 is positive, var2 is negative
3762                          * result = +(ABS(var1) + ABS(var2))
3763                          * ----------
3764                          */
3765                         add_abs(var1, var2, result);
3766                         result->sign = NUMERIC_POS;
3767                 }
3768                 else
3769                 {
3770                         /* ----------
3771                          * Both are positive
3772                          * Must compare absolute values
3773                          * ----------
3774                          */
3775                         switch (cmp_abs(var1, var2))
3776                         {
3777                                 case 0:
3778                                         /* ----------
3779                                          * ABS(var1) == ABS(var2)
3780                                          * result = ZERO
3781                                          * ----------
3782                                          */
3783                                         zero_var(result);
3784                                         result->dscale = Max(var1->dscale, var2->dscale);
3785                                         break;
3786
3787                                 case 1:
3788                                         /* ----------
3789                                          * ABS(var1) > ABS(var2)
3790                                          * result = +(ABS(var1) - ABS(var2))
3791                                          * ----------
3792                                          */
3793                                         sub_abs(var1, var2, result);
3794                                         result->sign = NUMERIC_POS;
3795                                         break;
3796
3797                                 case -1:
3798                                         /* ----------
3799                                          * ABS(var1) < ABS(var2)
3800                                          * result = -(ABS(var2) - ABS(var1))
3801                                          * ----------
3802                                          */
3803                                         sub_abs(var2, var1, result);
3804                                         result->sign = NUMERIC_NEG;
3805                                         break;
3806                         }
3807                 }
3808         }
3809         else
3810         {
3811                 if (var2->sign == NUMERIC_NEG)
3812                 {
3813                         /* ----------
3814                          * Both are negative
3815                          * Must compare absolute values
3816                          * ----------
3817                          */
3818                         switch (cmp_abs(var1, var2))
3819                         {
3820                                 case 0:
3821                                         /* ----------
3822                                          * ABS(var1) == ABS(var2)
3823                                          * result = ZERO
3824                                          * ----------
3825                                          */
3826                                         zero_var(result);
3827                                         result->dscale = Max(var1->dscale, var2->dscale);
3828                                         break;
3829
3830                                 case 1:
3831                                         /* ----------
3832                                          * ABS(var1) > ABS(var2)
3833                                          * result = -(ABS(var1) - ABS(var2))
3834                                          * ----------
3835                                          */
3836                                         sub_abs(var1, var2, result);
3837                                         result->sign = NUMERIC_NEG;
3838                                         break;
3839
3840                                 case -1:
3841                                         /* ----------
3842                                          * ABS(var1) < ABS(var2)
3843                                          * result = +(ABS(var2) - ABS(var1))
3844                                          * ----------
3845                                          */
3846                                         sub_abs(var2, var1, result);
3847                                         result->sign = NUMERIC_POS;
3848                                         break;
3849                         }
3850                 }
3851                 else
3852                 {
3853                         /* ----------
3854                          * var1 is negative, var2 is positive
3855                          * result = -(ABS(var1) + ABS(var2))
3856                          * ----------
3857                          */
3858                         add_abs(var1, var2, result);
3859                         result->sign = NUMERIC_NEG;
3860                 }
3861         }
3862 }
3863
3864
3865 /*
3866  * mul_var() -
3867  *
3868  *      Multiplication on variable level. Product of var1 * var2 is stored
3869  *      in result.      Result is rounded to no more than rscale fractional digits.
3870  */
3871 static void
3872 mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
3873                 int rscale)
3874 {
3875         int                     res_ndigits;
3876         int                     res_sign;
3877         int                     res_weight;
3878         int                     maxdigits;
3879         int                *dig;
3880         int                     carry;
3881         int                     maxdig;
3882         int                     newdig;
3883         NumericDigit *res_digits;
3884         int                     i,
3885                                 ri,
3886                                 i1,
3887                                 i2;
3888
3889         /* copy these values into local vars for speed in inner loop */
3890         int                     var1ndigits = var1->ndigits;
3891         int                     var2ndigits = var2->ndigits;
3892         NumericDigit *var1digits = var1->digits;
3893         NumericDigit *var2digits = var2->digits;
3894
3895         if (var1ndigits == 0 || var2ndigits == 0)
3896         {
3897                 /* one or both inputs is zero; so is result */
3898                 zero_var(result);
3899                 result->dscale = rscale;
3900                 return;
3901         }
3902
3903         /* Determine result sign and (maximum possible) weight */
3904         if (var1->sign == var2->sign)
3905                 res_sign = NUMERIC_POS;
3906         else
3907                 res_sign = NUMERIC_NEG;
3908         res_weight = var1->weight + var2->weight + 2;
3909
3910         /*
3911          * Determine number of result digits to compute.  If the exact result
3912          * would have more than rscale fractional digits, truncate the computation
3913          * with MUL_GUARD_DIGITS guard digits.  We do that by pretending that one
3914          * or both inputs have fewer digits than they really do.
3915          */
3916         res_ndigits = var1ndigits + var2ndigits + 1;
3917         maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS;
3918         if (res_ndigits > maxdigits)
3919         {
3920                 if (maxdigits < 3)
3921                 {
3922                         /* no useful precision at all in the result... */
3923                         zero_var(result);
3924                         result->dscale = rscale;
3925                         return;
3926                 }
3927                 /* force maxdigits odd so that input ndigits can be equal */
3928                 if ((maxdigits & 1) == 0)
3929                         maxdigits++;
3930                 if (var1ndigits > var2ndigits)
3931                 {
3932                         var1ndigits -= res_ndigits - maxdigits;
3933                         if (var1ndigits < var2ndigits)
3934                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
3935                 }
3936                 else
3937                 {
3938                         var2ndigits -= res_ndigits - maxdigits;
3939                         if (var2ndigits < var1ndigits)
3940                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
3941                 }
3942                 res_ndigits = maxdigits;
3943                 Assert(res_ndigits == var1ndigits + var2ndigits + 1);
3944         }
3945
3946         /*
3947          * We do the arithmetic in an array "dig[]" of signed int's.  Since
3948          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
3949          * to avoid normalizing carries immediately.
3950          *
3951          * maxdig tracks the maximum possible value of any dig[] entry; when this
3952          * threatens to exceed INT_MAX, we take the time to propagate carries. To
3953          * avoid overflow in maxdig itself, it actually represents the max
3954          * possible value divided by NBASE-1.
3955          */
3956         dig = (int *) palloc0(res_ndigits * sizeof(int));
3957         maxdig = 0;
3958
3959         ri = res_ndigits - 1;
3960         for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--)
3961         {
3962                 int                     var1digit = var1digits[i1];
3963
3964                 if (var1digit == 0)
3965                         continue;
3966
3967                 /* Time to normalize? */
3968                 maxdig += var1digit;
3969                 if (maxdig > INT_MAX / (NBASE - 1))
3970                 {
3971                         /* Yes, do it */
3972                         carry = 0;
3973                         for (i = res_ndigits - 1; i >= 0; i--)
3974                         {
3975                                 newdig = dig[i] + carry;
3976                                 if (newdig >= NBASE)
3977                                 {
3978                                         carry = newdig / NBASE;
3979                                         newdig -= carry * NBASE;
3980                                 }
3981                                 else
3982                                         carry = 0;
3983                                 dig[i] = newdig;
3984                         }
3985                         Assert(carry == 0);
3986                         /* Reset maxdig to indicate new worst-case */
3987                         maxdig = 1 + var1digit;
3988                 }
3989
3990                 /* Add appropriate multiple of var2 into the accumulator */
3991                 i = ri;
3992                 for (i2 = var2ndigits - 1; i2 >= 0; i2--)
3993                         dig[i--] += var1digit * var2digits[i2];
3994         }
3995
3996         /*
3997          * Now we do a final carry propagation pass to normalize the result, which
3998          * we combine with storing the result digits into the output. Note that
3999          * this is still done at full precision w/guard digits.
4000          */
4001         alloc_var(result, res_ndigits);
4002         res_digits = result->digits;
4003         carry = 0;
4004         for (i = res_ndigits - 1; i >= 0; i--)
4005         {
4006                 newdig = dig[i] + carry;
4007                 if (newdig >= NBASE)
4008                 {
4009                         carry = newdig / NBASE;
4010                         newdig -= carry * NBASE;
4011                 }
4012                 else
4013                         carry = 0;
4014                 res_digits[i] = newdig;
4015         }
4016         Assert(carry == 0);
4017
4018         pfree(dig);
4019
4020         /*
4021          * Finally, round the result to the requested precision.
4022          */
4023         result->weight = res_weight;
4024         result->sign = res_sign;
4025
4026         /* Round to target rscale (and set result->dscale) */
4027         round_var(result, rscale);
4028
4029         /* Strip leading and trailing zeroes */
4030         strip_var(result);
4031 }
4032
4033
4034 /*
4035  * div_var() -
4036  *
4037  *      Division on variable level. Quotient of var1 / var2 is stored
4038  *      in result.      Result is rounded to no more than rscale fractional digits.
4039  */
4040 static void
4041 div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
4042                 int rscale, bool round)
4043 {
4044         int                     div_ndigits;
4045         int                     res_sign;
4046         int                     res_weight;
4047         int                *div;
4048         int                     qdigit;
4049         int                     carry;
4050         int                     maxdiv;
4051         int                     newdig;
4052         NumericDigit *res_digits;
4053         double          fdividend,
4054                                 fdivisor,
4055                                 fdivisorinverse,
4056                                 fquotient;
4057         int                     qi;
4058         int                     i;
4059
4060         /* copy these values into local vars for speed in inner loop */
4061         int                     var1ndigits = var1->ndigits;
4062         int                     var2ndigits = var2->ndigits;
4063         NumericDigit *var1digits = var1->digits;
4064         NumericDigit *var2digits = var2->digits;
4065
4066         /*
4067          * First of all division by zero check; we must not be handed an
4068          * unnormalized divisor.
4069          */
4070         if (var2ndigits == 0 || var2digits[0] == 0)
4071                 ereport(ERROR,
4072                                 (errcode(ERRCODE_DIVISION_BY_ZERO),
4073                                  errmsg("division by zero")));
4074
4075         /*
4076          * Now result zero check
4077          */
4078         if (var1ndigits == 0)
4079         {
4080                 zero_var(result);
4081                 result->dscale = rscale;
4082                 return;
4083         }
4084
4085         /*
4086          * Determine the result sign, weight and number of digits to calculate
4087          */
4088         if (var1->sign == var2->sign)
4089                 res_sign = NUMERIC_POS;
4090         else
4091                 res_sign = NUMERIC_NEG;
4092         res_weight = var1->weight - var2->weight + 1;
4093         /* The number of accurate result digits we need to produce: */
4094         div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
4095         /* Add guard digits for roundoff error */
4096         div_ndigits += DIV_GUARD_DIGITS;
4097         if (div_ndigits < DIV_GUARD_DIGITS)
4098                 div_ndigits = DIV_GUARD_DIGITS;
4099         /* Must be at least var1ndigits, too, to simplify data-loading loop */
4100         if (div_ndigits < var1ndigits)
4101                 div_ndigits = var1ndigits;
4102
4103         /*
4104          * We do the arithmetic in an array "div[]" of signed int's.  Since
4105          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
4106          * to avoid normalizing carries immediately.
4107          *
4108          * We start with div[] containing one zero digit followed by the
4109          * dividend's digits (plus appended zeroes to reach the desired precision
4110          * including guard digits).  Each step of the main loop computes an
4111          * (approximate) quotient digit and stores it into div[], removing one
4112          * position of dividend space.  A final pass of carry propagation takes
4113          * care of any mistaken quotient digits.
4114          */
4115         div = (int *) palloc0((div_ndigits + 1) * sizeof(int));
4116         for (i = 0; i < var1ndigits; i++)
4117                 div[i + 1] = var1digits[i];
4118
4119         /*
4120          * We estimate each quotient digit using floating-point arithmetic, taking
4121          * the first four digits of the (current) dividend and divisor. This must
4122          * be float to avoid overflow.
4123          */
4124         fdivisor = (double) var2digits[0];
4125         for (i = 1; i < 4; i++)
4126         {
4127                 fdivisor *= NBASE;
4128                 if (i < var2ndigits)
4129                         fdivisor += (double) var2digits[i];
4130         }
4131         fdivisorinverse = 1.0 / fdivisor;
4132
4133         /*
4134          * maxdiv tracks the maximum possible absolute value of any div[] entry;
4135          * when this threatens to exceed INT_MAX, we take the time to propagate
4136          * carries.  To avoid overflow in maxdiv itself, it actually represents
4137          * the max possible abs. value divided by NBASE-1.
4138          */
4139         maxdiv = 1;
4140
4141         /*
4142          * Outer loop computes next quotient digit, which will go into div[qi]
4143          */
4144         for (qi = 0; qi < div_ndigits; qi++)
4145         {
4146                 /* Approximate the current dividend value */
4147                 fdividend = (double) div[qi];
4148                 for (i = 1; i < 4; i++)
4149                 {
4150                         fdividend *= NBASE;
4151                         if (qi + i <= div_ndigits)
4152                                 fdividend += (double) div[qi + i];
4153                 }
4154                 /* Compute the (approximate) quotient digit */
4155                 fquotient = fdividend * fdivisorinverse;
4156                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4157                         (((int) fquotient) - 1);        /* truncate towards -infinity */
4158
4159                 if (qdigit != 0)
4160                 {
4161                         /* Do we need to normalize now? */
4162                         maxdiv += Abs(qdigit);
4163                         if (maxdiv > INT_MAX / (NBASE - 1))
4164                         {
4165                                 /* Yes, do it */
4166                                 carry = 0;
4167                                 for (i = div_ndigits; i > qi; i--)
4168                                 {
4169                                         newdig = div[i] + carry;
4170                                         if (newdig < 0)
4171                                         {
4172                                                 carry = -((-newdig - 1) / NBASE) - 1;
4173                                                 newdig -= carry * NBASE;
4174                                         }
4175                                         else if (newdig >= NBASE)
4176                                         {
4177                                                 carry = newdig / NBASE;
4178                                                 newdig -= carry * NBASE;
4179                                         }
4180                                         else
4181                                                 carry = 0;
4182                                         div[i] = newdig;
4183                                 }
4184                                 newdig = div[qi] + carry;
4185                                 div[qi] = newdig;
4186
4187                                 /*
4188                                  * All the div[] digits except possibly div[qi] are now in the
4189                                  * range 0..NBASE-1.
4190                                  */
4191                                 maxdiv = Abs(newdig) / (NBASE - 1);
4192                                 maxdiv = Max(maxdiv, 1);
4193
4194                                 /*
4195                                  * Recompute the quotient digit since new info may have
4196                                  * propagated into the top four dividend digits
4197                                  */
4198                                 fdividend = (double) div[qi];
4199                                 for (i = 1; i < 4; i++)
4200                                 {
4201                                         fdividend *= NBASE;
4202                                         if (qi + i <= div_ndigits)
4203                                                 fdividend += (double) div[qi + i];
4204                                 }
4205                                 /* Compute the (approximate) quotient digit */
4206                                 fquotient = fdividend * fdivisorinverse;
4207                                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4208                                         (((int) fquotient) - 1);        /* truncate towards -infinity */
4209                                 maxdiv += Abs(qdigit);
4210                         }
4211
4212                         /* Subtract off the appropriate multiple of the divisor */
4213                         if (qdigit != 0)
4214                         {
4215                                 int                     istop = Min(var2ndigits, div_ndigits - qi + 1);
4216
4217                                 for (i = 0; i < istop; i++)
4218                                         div[qi + i] -= qdigit * var2digits[i];
4219                         }
4220                 }
4221
4222                 /*
4223                  * The dividend digit we are about to replace might still be nonzero.
4224                  * Fold it into the next digit position.  We don't need to worry about
4225                  * overflow here since this should nearly cancel with the subtraction
4226                  * of the divisor.
4227                  */
4228                 div[qi + 1] += div[qi] * NBASE;
4229
4230                 div[qi] = qdigit;
4231         }
4232
4233         /*
4234          * Approximate and store the last quotient digit (div[div_ndigits])
4235          */
4236         fdividend = (double) div[qi];
4237         for (i = 1; i < 4; i++)
4238                 fdividend *= NBASE;
4239         fquotient = fdividend * fdivisorinverse;
4240         qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4241                 (((int) fquotient) - 1);        /* truncate towards -infinity */
4242         div[qi] = qdigit;
4243
4244         /*
4245          * Now we do a final carry propagation pass to normalize the result, which
4246          * we combine with storing the result digits into the output. Note that
4247          * this is still done at full precision w/guard digits.
4248          */
4249         alloc_var(result, div_ndigits + 1);
4250         res_digits = result->digits;
4251         carry = 0;
4252         for (i = div_ndigits; i >= 0; i--)
4253         {
4254                 newdig = div[i] + carry;
4255                 if (newdig < 0)
4256                 {
4257                         carry = -((-newdig - 1) / NBASE) - 1;
4258                         newdig -= carry * NBASE;
4259                 }
4260                 else if (newdig >= NBASE)
4261                 {
4262                         carry = newdig / NBASE;
4263                         newdig -= carry * NBASE;
4264                 }
4265                 else
4266                         carry = 0;
4267                 res_digits[i] = newdig;
4268         }
4269         Assert(carry == 0);
4270
4271         pfree(div);
4272
4273         /*
4274          * Finally, round the result to the requested precision.
4275          */
4276         result->weight = res_weight;
4277         result->sign = res_sign;
4278
4279         /* Round to target rscale (and set result->dscale) */
4280         if (round)
4281                 round_var(result, rscale);
4282         else
4283                 trunc_var(result, rscale);
4284
4285         /* Strip leading and trailing zeroes */
4286         strip_var(result);
4287 }
4288
4289
4290 /*
4291  * Default scale selection for division
4292  *
4293  * Returns the appropriate result scale for the division result.
4294  */
4295 static int
4296 select_div_scale(NumericVar *var1, NumericVar *var2)
4297 {
4298         int                     weight1,
4299                                 weight2,
4300                                 qweight,
4301                                 i;
4302         NumericDigit firstdigit1,
4303                                 firstdigit2;
4304         int                     rscale;
4305
4306         /*
4307          * The result scale of a division isn't specified in any SQL standard. For
4308          * PostgreSQL we select a result scale that will give at least
4309          * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a
4310          * result no less accurate than float8; but use a scale not less than
4311          * either input's display scale.
4312          */
4313
4314         /* Get the actual (normalized) weight and first digit of each input */
4315
4316         weight1 = 0;                            /* values to use if var1 is zero */
4317         firstdigit1 = 0;
4318         for (i = 0; i < var1->ndigits; i++)
4319         {
4320                 firstdigit1 = var1->digits[i];
4321                 if (firstdigit1 != 0)
4322                 {
4323                         weight1 = var1->weight - i;
4324                         break;
4325                 }
4326         }
4327
4328         weight2 = 0;                            /* values to use if var2 is zero */
4329         firstdigit2 = 0;
4330         for (i = 0; i < var2->ndigits; i++)
4331         {
4332                 firstdigit2 = var2->digits[i];
4333                 if (firstdigit2 != 0)
4334                 {
4335                         weight2 = var2->weight - i;
4336                         break;
4337                 }
4338         }
4339
4340         /*
4341          * Estimate weight of quotient.  If the two first digits are equal, we
4342          * can't be sure, but assume that var1 is less than var2.
4343          */
4344         qweight = weight1 - weight2;
4345         if (firstdigit1 <= firstdigit2)
4346                 qweight--;
4347
4348         /* Select result scale */
4349         rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS;
4350         rscale = Max(rscale, var1->dscale);
4351         rscale = Max(rscale, var2->dscale);
4352         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4353         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4354
4355         return rscale;
4356 }
4357
4358
4359 /*
4360  * mod_var() -
4361  *
4362  *      Calculate the modulo of two numerics at variable level
4363  */
4364 static void
4365 mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
4366 {
4367         NumericVar      tmp;
4368         int                     rscale;
4369
4370         init_var(&tmp);
4371
4372         /* ---------
4373          * We do this using the equation
4374          *              mod(x,y) = x - trunc(x/y)*y
4375          * We set rscale the same way numeric_div and numeric_mul do
4376          * to get the right answer from the equation.  The final result,
4377          * however, need not be displayed to more precision than the inputs.
4378          * ----------
4379          */
4380         rscale = select_div_scale(var1, var2);
4381
4382         div_var(var1, var2, &tmp, rscale, false);
4383
4384         trunc_var(&tmp, 0);
4385
4386         mul_var(var2, &tmp, &tmp, var2->dscale + tmp.dscale);
4387
4388         sub_var(var1, &tmp, result);
4389
4390         round_var(result, Max(var1->dscale, var2->dscale));
4391
4392         free_var(&tmp);
4393 }
4394
4395
4396 /*
4397  * ceil_var() -
4398  *
4399  *      Return the smallest integer greater than or equal to the argument
4400  *      on variable level
4401  */
4402 static void
4403 ceil_var(NumericVar *var, NumericVar *result)
4404 {
4405         NumericVar      tmp;
4406
4407         init_var(&tmp);
4408         set_var_from_var(var, &tmp);
4409
4410         trunc_var(&tmp, 0);
4411
4412         if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
4413                 add_var(&tmp, &const_one, &tmp);
4414
4415         set_var_from_var(&tmp, result);
4416         free_var(&tmp);
4417 }
4418
4419
4420 /*
4421  * floor_var() -
4422  *
4423  *      Return the largest integer equal to or less than the argument
4424  *      on variable level
4425  */
4426 static void
4427 floor_var(NumericVar *var, NumericVar *result)
4428 {
4429         NumericVar      tmp;
4430
4431         init_var(&tmp);
4432         set_var_from_var(var, &tmp);
4433
4434         trunc_var(&tmp, 0);
4435
4436         if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
4437                 sub_var(&tmp, &const_one, &tmp);
4438
4439         set_var_from_var(&tmp, result);
4440         free_var(&tmp);
4441 }
4442
4443
4444 /*
4445  * sqrt_var() -
4446  *
4447  *      Compute the square root of x using Newton's algorithm
4448  */
4449 static void
4450 sqrt_var(NumericVar *arg, NumericVar *result, int rscale)
4451 {
4452         NumericVar      tmp_arg;
4453         NumericVar      tmp_val;
4454         NumericVar      last_val;
4455         int                     local_rscale;
4456         int                     stat;
4457
4458         local_rscale = rscale + 8;
4459
4460         stat = cmp_var(arg, &const_zero);
4461         if (stat == 0)
4462         {
4463                 zero_var(result);
4464                 result->dscale = rscale;
4465                 return;
4466         }
4467
4468         /*
4469          * SQL2003 defines sqrt() in terms of power, so we need to emit the right
4470          * SQLSTATE error code if the operand is negative.
4471          */
4472         if (stat < 0)
4473                 ereport(ERROR,
4474                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
4475                                  errmsg("cannot take square root of a negative number")));
4476
4477         init_var(&tmp_arg);
4478         init_var(&tmp_val);
4479         init_var(&last_val);
4480
4481         /* Copy arg in case it is the same var as result */
4482         set_var_from_var(arg, &tmp_arg);
4483
4484         /*
4485          * Initialize the result to the first guess
4486          */
4487         alloc_var(result, 1);
4488         result->digits[0] = tmp_arg.digits[0] / 2;
4489         if (result->digits[0] == 0)
4490                 result->digits[0] = 1;
4491         result->weight = tmp_arg.weight / 2;
4492         result->sign = NUMERIC_POS;
4493
4494         set_var_from_var(result, &last_val);
4495
4496         for (;;)
4497         {
4498                 div_var(&tmp_arg, result, &tmp_val, local_rscale, true);
4499
4500                 add_var(result, &tmp_val, result);
4501                 mul_var(result, &const_zero_point_five, result, local_rscale);
4502
4503                 if (cmp_var(&last_val, result) == 0)
4504                         break;
4505                 set_var_from_var(result, &last_val);
4506         }
4507
4508         free_var(&last_val);
4509         free_var(&tmp_val);
4510         free_var(&tmp_arg);
4511
4512         /* Round to requested precision */
4513         round_var(result, rscale);
4514 }
4515
4516
4517 /*
4518  * exp_var() -
4519  *
4520  *      Raise e to the power of x
4521  */
4522 static void
4523 exp_var(NumericVar *arg, NumericVar *result, int rscale)
4524 {
4525         NumericVar      x;
4526         int                     xintval;
4527         bool            xneg = FALSE;
4528         int                     local_rscale;
4529
4530         /*----------
4531          * We separate the integral and fraction parts of x, then compute
4532          *              e^x = e^xint * e^xfrac
4533          * where e = exp(1) and e^xfrac = exp(xfrac) are computed by
4534          * exp_var_internal; the limited range of inputs allows that routine
4535          * to do a good job with a simple Taylor series.  Raising e^xint is
4536          * done by repeated multiplications in power_var_int.
4537          *----------
4538          */
4539         init_var(&x);
4540
4541         set_var_from_var(arg, &x);
4542
4543         if (x.sign == NUMERIC_NEG)
4544         {
4545                 xneg = TRUE;
4546                 x.sign = NUMERIC_POS;
4547         }
4548
4549         /* Extract the integer part, remove it from x */
4550         xintval = 0;
4551         while (x.weight >= 0)
4552         {
4553                 xintval *= NBASE;
4554                 if (x.ndigits > 0)
4555                 {
4556                         xintval += x.digits[0];
4557                         x.digits++;
4558                         x.ndigits--;
4559                 }
4560                 x.weight--;
4561                 /* Guard against overflow */
4562                 if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3)
4563                         ereport(ERROR,
4564                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
4565                                          errmsg("argument for function \"exp\" too big")));
4566         }
4567
4568         /* Select an appropriate scale for internal calculation */
4569         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
4570
4571         /* Compute e^xfrac */
4572         exp_var_internal(&x, result, local_rscale);
4573
4574         /* If there's an integer part, multiply by e^xint */
4575         if (xintval > 0)
4576         {
4577                 NumericVar      e;
4578
4579                 init_var(&e);
4580                 exp_var_internal(&const_one, &e, local_rscale);
4581                 power_var_int(&e, xintval, &e, local_rscale);
4582                 mul_var(&e, result, result, local_rscale);
4583                 free_var(&e);
4584         }
4585
4586         /* Compensate for input sign, and round to requested rscale */
4587         if (xneg)
4588                 div_var(&const_one, result, result, rscale, true);
4589         else
4590                 round_var(result, rscale);
4591
4592         free_var(&x);
4593 }
4594
4595
4596 /*
4597  * exp_var_internal() -
4598  *
4599  *      Raise e to the power of x, where 0 <= x <= 1
4600  *
4601  * NB: the result should be good to at least rscale digits, but it has
4602  * *not* been rounded off; the caller must do that if wanted.
4603  */
4604 static void
4605 exp_var_internal(NumericVar *arg, NumericVar *result, int rscale)
4606 {
4607         NumericVar      x;
4608         NumericVar      xpow;
4609         NumericVar      ifac;
4610         NumericVar      elem;
4611         NumericVar      ni;
4612         int                     ndiv2 = 0;
4613         int                     local_rscale;
4614
4615         init_var(&x);
4616         init_var(&xpow);
4617         init_var(&ifac);
4618         init_var(&elem);
4619         init_var(&ni);
4620
4621         set_var_from_var(arg, &x);
4622
4623         Assert(x.sign == NUMERIC_POS);
4624
4625         local_rscale = rscale + 8;
4626
4627         /* Reduce input into range 0 <= x <= 0.01 */
4628         while (cmp_var(&x, &const_zero_point_01) > 0)
4629         {
4630                 ndiv2++;
4631                 local_rscale++;
4632                 mul_var(&x, &const_zero_point_five, &x, x.dscale + 1);
4633         }
4634
4635         /*
4636          * Use the Taylor series
4637          *
4638          * exp(x) = 1 + x + x^2/2! + x^3/3! + ...
4639          *
4640          * Given the limited range of x, this should converge reasonably quickly.
4641          * We run the series until the terms fall below the local_rscale limit.
4642          */
4643         add_var(&const_one, &x, result);
4644         set_var_from_var(&x, &xpow);
4645         set_var_from_var(&const_one, &ifac);
4646         set_var_from_var(&const_one, &ni);
4647
4648         for (;;)
4649         {
4650                 add_var(&ni, &const_one, &ni);
4651                 mul_var(&xpow, &x, &xpow, local_rscale);
4652                 mul_var(&ifac, &ni, &ifac, 0);
4653                 div_var(&xpow, &ifac, &elem, local_rscale, true);
4654
4655                 if (elem.ndigits == 0)
4656                         break;
4657
4658                 add_var(result, &elem, result);
4659         }
4660
4661         /* Compensate for argument range reduction */
4662         while (ndiv2-- > 0)
4663                 mul_var(result, result, result, local_rscale);
4664
4665         free_var(&x);
4666         free_var(&xpow);
4667         free_var(&ifac);
4668         free_var(&elem);
4669         free_var(&ni);
4670 }
4671
4672
4673 /*
4674  * ln_var() -
4675  *
4676  *      Compute the natural log of x
4677  */
4678 static void
4679 ln_var(NumericVar *arg, NumericVar *result, int rscale)
4680 {
4681         NumericVar      x;
4682         NumericVar      xx;
4683         NumericVar      ni;
4684         NumericVar      elem;
4685         NumericVar      fact;
4686         int                     local_rscale;
4687         int                     cmp;
4688
4689         cmp = cmp_var(arg, &const_zero);
4690         if (cmp == 0)
4691                 ereport(ERROR,
4692                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
4693                                  errmsg("cannot take logarithm of zero")));
4694         else if (cmp < 0)
4695                 ereport(ERROR,
4696                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
4697                                  errmsg("cannot take logarithm of a negative number")));
4698
4699         local_rscale = rscale + 8;
4700
4701         init_var(&x);
4702         init_var(&xx);
4703         init_var(&ni);
4704         init_var(&elem);
4705         init_var(&fact);
4706
4707         set_var_from_var(arg, &x);
4708         set_var_from_var(&const_two, &fact);
4709
4710         /* Reduce input into range 0.9 < x < 1.1 */
4711         while (cmp_var(&x, &const_zero_point_nine) <= 0)
4712         {
4713                 local_rscale++;
4714                 sqrt_var(&x, &x, local_rscale);
4715                 mul_var(&fact, &const_two, &fact, 0);
4716         }
4717         while (cmp_var(&x, &const_one_point_one) >= 0)
4718         {
4719                 local_rscale++;
4720                 sqrt_var(&x, &x, local_rscale);
4721                 mul_var(&fact, &const_two, &fact, 0);
4722         }
4723
4724         /*
4725          * We use the Taylor series for 0.5 * ln((1+z)/(1-z)),
4726          *
4727          * z + z^3/3 + z^5/5 + ...
4728          *
4729          * where z = (x-1)/(x+1) is in the range (approximately) -0.053 .. 0.048
4730          * due to the above range-reduction of x.
4731          *
4732          * The convergence of this is not as fast as one would like, but is
4733          * tolerable given that z is small.
4734          */
4735         sub_var(&x, &const_one, result);
4736         add_var(&x, &const_one, &elem);
4737         div_var(result, &elem, result, local_rscale, true);
4738         set_var_from_var(result, &xx);
4739         mul_var(result, result, &x, local_rscale);
4740
4741         set_var_from_var(&const_one, &ni);
4742
4743         for (;;)
4744         {
4745                 add_var(&ni, &const_two, &ni);
4746                 mul_var(&xx, &x, &xx, local_rscale);
4747                 div_var(&xx, &ni, &elem, local_rscale, true);
4748
4749                 if (elem.ndigits == 0)
4750                         break;
4751
4752                 add_var(result, &elem, result);
4753
4754                 if (elem.weight < (result->weight - local_rscale * 2 / DEC_DIGITS))
4755                         break;
4756         }
4757
4758         /* Compensate for argument range reduction, round to requested rscale */
4759         mul_var(result, &fact, result, rscale);
4760
4761         free_var(&x);
4762         free_var(&xx);
4763         free_var(&ni);
4764         free_var(&elem);
4765         free_var(&fact);
4766 }
4767
4768
4769 /*
4770  * log_var() -
4771  *
4772  *      Compute the logarithm of num in a given base.
4773  *
4774  *      Note: this routine chooses dscale of the result.
4775  */
4776 static void
4777 log_var(NumericVar *base, NumericVar *num, NumericVar *result)
4778 {
4779         NumericVar      ln_base;
4780         NumericVar      ln_num;
4781         int                     dec_digits;
4782         int                     rscale;
4783         int                     local_rscale;
4784
4785         init_var(&ln_base);
4786         init_var(&ln_num);
4787
4788         /* Set scale for ln() calculations --- compare numeric_ln() */
4789
4790         /* Approx decimal digits before decimal point */
4791         dec_digits = (num->weight + 1) * DEC_DIGITS;
4792
4793         if (dec_digits > 1)
4794                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
4795         else if (dec_digits < 1)
4796                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
4797         else
4798                 rscale = NUMERIC_MIN_SIG_DIGITS;
4799
4800         rscale = Max(rscale, base->dscale);
4801         rscale = Max(rscale, num->dscale);
4802         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4803         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4804
4805         local_rscale = rscale + 8;
4806
4807         /* Form natural logarithms */
4808         ln_var(base, &ln_base, local_rscale);
4809         ln_var(num, &ln_num, local_rscale);
4810
4811         ln_base.dscale = rscale;
4812         ln_num.dscale = rscale;
4813
4814         /* Select scale for division result */
4815         rscale = select_div_scale(&ln_num, &ln_base);
4816
4817         div_var(&ln_num, &ln_base, result, rscale, true);
4818
4819         free_var(&ln_num);
4820         free_var(&ln_base);
4821 }
4822
4823
4824 /*
4825  * power_var() -
4826  *
4827  *      Raise base to the power of exp
4828  *
4829  *      Note: this routine chooses dscale of the result.
4830  */
4831 static void
4832 power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
4833 {
4834         NumericVar      ln_base;
4835         NumericVar      ln_num;
4836         int                     dec_digits;
4837         int                     rscale;
4838         int                     local_rscale;
4839         double          val;
4840
4841         /* If exp can be represented as an integer, use power_var_int */
4842         if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1)
4843         {
4844                 /* exact integer, but does it fit in int? */
4845                 NumericVar      x;
4846                 int64           expval64;
4847
4848                 /* must copy because numericvar_to_int8() scribbles on input */
4849                 init_var(&x);
4850                 set_var_from_var(exp, &x);
4851                 if (numericvar_to_int8(&x, &expval64))
4852                 {
4853                         int                     expval = (int) expval64;
4854
4855                         /* Test for overflow by reverse-conversion. */
4856                         if ((int64) expval == expval64)
4857                         {
4858                                 /* Okay, select rscale */
4859                                 rscale = NUMERIC_MIN_SIG_DIGITS;
4860                                 rscale = Max(rscale, base->dscale);
4861                                 rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4862                                 rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4863
4864                                 power_var_int(base, expval, result, rscale);
4865
4866                                 free_var(&x);
4867                                 return;
4868                         }
4869                 }
4870                 free_var(&x);
4871         }
4872
4873         init_var(&ln_base);
4874         init_var(&ln_num);
4875
4876         /* Set scale for ln() calculation --- need extra accuracy here */
4877
4878         /* Approx decimal digits before decimal point */
4879         dec_digits = (base->weight + 1) * DEC_DIGITS;
4880
4881         if (dec_digits > 1)
4882                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(dec_digits - 1);
4883         else if (dec_digits < 1)
4884                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(1 - dec_digits);
4885         else
4886                 rscale = NUMERIC_MIN_SIG_DIGITS * 2;
4887
4888         rscale = Max(rscale, base->dscale * 2);
4889         rscale = Max(rscale, exp->dscale * 2);
4890         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2);
4891         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2);
4892
4893         local_rscale = rscale + 8;
4894
4895         ln_var(base, &ln_base, local_rscale);
4896
4897         mul_var(&ln_base, exp, &ln_num, local_rscale);
4898
4899         /* Set scale for exp() -- compare numeric_exp() */
4900
4901         /* convert input to float8, ignoring overflow */
4902         val = numericvar_to_double_no_overflow(&ln_num);
4903
4904         /*
4905          * log10(result) = num * log10(e), so this is approximately the weight:
4906          */
4907         val *= 0.434294481903252;
4908
4909         /* limit to something that won't cause integer overflow */
4910         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
4911         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
4912
4913         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
4914         rscale = Max(rscale, base->dscale);
4915         rscale = Max(rscale, exp->dscale);
4916         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4917         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4918
4919         exp_var(&ln_num, result, rscale);
4920
4921         free_var(&ln_num);
4922         free_var(&ln_base);
4923 }
4924
4925 /*
4926  * power_var_int() -
4927  *
4928  *      Raise base to the power of exp, where exp is an integer.
4929  */
4930 static void
4931 power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale)
4932 {
4933         bool            neg;
4934         NumericVar      base_prod;
4935         int                     local_rscale;
4936
4937         /* Detect some special cases, particularly 0^0. */
4938
4939         switch (exp)
4940         {
4941                 case 0:
4942                         if (base->ndigits == 0)
4943                                 ereport(ERROR,
4944                                                 (errcode(ERRCODE_FLOATING_POINT_EXCEPTION),
4945                                                  errmsg("zero raised to zero is undefined")));
4946                         set_var_from_var(&const_one, result);
4947                         result->dscale = rscale;        /* no need to round */
4948                         return;
4949                 case 1:
4950                         set_var_from_var(base, result);
4951                         round_var(result, rscale);
4952                         return;
4953                 case -1:
4954                         div_var(&const_one, base, result, rscale, true);
4955                         return;
4956                 case 2:
4957                         mul_var(base, base, result, rscale);
4958                         return;
4959                 default:
4960                         break;
4961         }
4962
4963         /*
4964          * The general case repeatedly multiplies base according to the bit
4965          * pattern of exp.      We do the multiplications with some extra precision.
4966          */
4967         neg = (exp < 0);
4968         exp = Abs(exp);
4969
4970         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
4971
4972         init_var(&base_prod);
4973         set_var_from_var(base, &base_prod);
4974
4975         if (exp & 1)
4976                 set_var_from_var(base, result);
4977         else
4978                 set_var_from_var(&const_one, result);
4979
4980         while ((exp >>= 1) > 0)
4981         {
4982                 mul_var(&base_prod, &base_prod, &base_prod, local_rscale);
4983                 if (exp & 1)
4984                         mul_var(&base_prod, result, result, local_rscale);
4985         }
4986
4987         free_var(&base_prod);
4988
4989         /* Compensate for input sign, and round to requested rscale */
4990         if (neg)
4991                 div_var(&const_one, result, result, rscale, true);
4992         else
4993                 round_var(result, rscale);
4994 }
4995
4996
4997 /* ----------------------------------------------------------------------
4998  *
4999  * Following are the lowest level functions that operate unsigned
5000  * on the variable level
5001  *
5002  * ----------------------------------------------------------------------
5003  */
5004
5005
5006 /* ----------
5007  * cmp_abs() -
5008  *
5009  *      Compare the absolute values of var1 and var2
5010  *      Returns:        -1 for ABS(var1) < ABS(var2)
5011  *                              0  for ABS(var1) == ABS(var2)
5012  *                              1  for ABS(var1) > ABS(var2)
5013  * ----------
5014  */
5015 static int
5016 cmp_abs(NumericVar *var1, NumericVar *var2)
5017 {
5018         return cmp_abs_common(var1->digits, var1->ndigits, var1->weight,
5019                                                   var2->digits, var2->ndigits, var2->weight);
5020 }
5021
5022 /* ----------
5023  * cmp_abs_common() -
5024  *
5025  *      Main routine of cmp_abs(). This function can be used by both
5026  *      NumericVar and Numeric.
5027  * ----------
5028  */
5029 static int
5030 cmp_abs_common(const NumericDigit *var1digits, int var1ndigits, int var1weight,
5031                          const NumericDigit *var2digits, int var2ndigits, int var2weight)
5032 {
5033         int                     i1 = 0;
5034         int                     i2 = 0;
5035
5036         /* Check any digits before the first common digit */
5037
5038         while (var1weight > var2weight && i1 < var1ndigits)
5039         {
5040                 if (var1digits[i1++] != 0)
5041                         return 1;
5042                 var1weight--;
5043         }
5044         while (var2weight > var1weight && i2 < var2ndigits)
5045         {
5046                 if (var2digits[i2++] != 0)
5047                         return -1;
5048                 var2weight--;
5049         }
5050
5051         /* At this point, either w1 == w2 or we've run out of digits */
5052
5053         if (var1weight == var2weight)
5054         {
5055                 while (i1 < var1ndigits && i2 < var2ndigits)
5056                 {
5057                         int                     stat = var1digits[i1++] - var2digits[i2++];
5058
5059                         if (stat)
5060                         {
5061                                 if (stat > 0)
5062                                         return 1;
5063                                 return -1;
5064                         }
5065                 }
5066         }
5067
5068         /*
5069          * At this point, we've run out of digits on one side or the other; so any
5070          * remaining nonzero digits imply that side is larger
5071          */
5072         while (i1 < var1ndigits)
5073         {
5074                 if (var1digits[i1++] != 0)
5075                         return 1;
5076         }
5077         while (i2 < var2ndigits)
5078         {
5079                 if (var2digits[i2++] != 0)
5080                         return -1;
5081         }
5082
5083         return 0;
5084 }
5085
5086
5087 /*
5088  * add_abs() -
5089  *
5090  *      Add the absolute values of two variables into result.
5091  *      result might point to one of the operands without danger.
5092  */
5093 static void
5094 add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5095 {
5096         NumericDigit *res_buf;
5097         NumericDigit *res_digits;
5098         int                     res_ndigits;
5099         int                     res_weight;
5100         int                     res_rscale,
5101                                 rscale1,
5102                                 rscale2;
5103         int                     res_dscale;
5104         int                     i,
5105                                 i1,
5106                                 i2;
5107         int                     carry = 0;
5108
5109         /* copy these values into local vars for speed in inner loop */
5110         int                     var1ndigits = var1->ndigits;
5111         int                     var2ndigits = var2->ndigits;
5112         NumericDigit *var1digits = var1->digits;
5113         NumericDigit *var2digits = var2->digits;
5114
5115         res_weight = Max(var1->weight, var2->weight) + 1;
5116
5117         res_dscale = Max(var1->dscale, var2->dscale);
5118
5119         /* Note: here we are figuring rscale in base-NBASE digits */
5120         rscale1 = var1->ndigits - var1->weight - 1;
5121         rscale2 = var2->ndigits - var2->weight - 1;
5122         res_rscale = Max(rscale1, rscale2);
5123
5124         res_ndigits = res_rscale + res_weight + 1;
5125         if (res_ndigits <= 0)
5126                 res_ndigits = 1;
5127
5128         res_buf = digitbuf_alloc(res_ndigits + 1);
5129         res_buf[0] = 0;                         /* spare digit for later rounding */
5130         res_digits = res_buf + 1;
5131
5132         i1 = res_rscale + var1->weight + 1;
5133         i2 = res_rscale + var2->weight + 1;
5134         for (i = res_ndigits - 1; i >= 0; i--)
5135         {
5136                 i1--;
5137                 i2--;
5138                 if (i1 >= 0 && i1 < var1ndigits)
5139                         carry += var1digits[i1];
5140                 if (i2 >= 0 && i2 < var2ndigits)
5141                         carry += var2digits[i2];
5142
5143                 if (carry >= NBASE)
5144                 {
5145                         res_digits[i] = carry - NBASE;
5146                         carry = 1;
5147                 }
5148                 else
5149                 {
5150                         res_digits[i] = carry;
5151                         carry = 0;
5152                 }
5153         }
5154
5155         Assert(carry == 0);                     /* else we failed to allow for carry out */
5156
5157         digitbuf_free(result->buf);
5158         result->ndigits = res_ndigits;
5159         result->buf = res_buf;
5160         result->digits = res_digits;
5161         result->weight = res_weight;
5162         result->dscale = res_dscale;
5163
5164         /* Remove leading/trailing zeroes */
5165         strip_var(result);
5166 }
5167
5168
5169 /*
5170  * sub_abs()
5171  *
5172  *      Subtract the absolute value of var2 from the absolute value of var1
5173  *      and store in result. result might point to one of the operands
5174  *      without danger.
5175  *
5176  *      ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!!
5177  */
5178 static void
5179 sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5180 {
5181         NumericDigit *res_buf;
5182         NumericDigit *res_digits;
5183         int                     res_ndigits;
5184         int                     res_weight;
5185         int                     res_rscale,
5186                                 rscale1,
5187                                 rscale2;
5188         int                     res_dscale;
5189         int                     i,
5190                                 i1,
5191                                 i2;
5192         int                     borrow = 0;
5193
5194         /* copy these values into local vars for speed in inner loop */
5195         int                     var1ndigits = var1->ndigits;
5196         int                     var2ndigits = var2->ndigits;
5197         NumericDigit *var1digits = var1->digits;
5198         NumericDigit *var2digits = var2->digits;
5199
5200         res_weight = var1->weight;
5201
5202         res_dscale = Max(var1->dscale, var2->dscale);
5203
5204         /* Note: here we are figuring rscale in base-NBASE digits */
5205         rscale1 = var1->ndigits - var1->weight - 1;
5206         rscale2 = var2->ndigits - var2->weight - 1;
5207         res_rscale = Max(rscale1, rscale2);
5208
5209         res_ndigits = res_rscale + res_weight + 1;
5210         if (res_ndigits <= 0)
5211                 res_ndigits = 1;
5212
5213         res_buf = digitbuf_alloc(res_ndigits + 1);
5214         res_buf[0] = 0;                         /* spare digit for later rounding */
5215         res_digits = res_buf + 1;
5216
5217         i1 = res_rscale + var1->weight + 1;
5218         i2 = res_rscale + var2->weight + 1;
5219         for (i = res_ndigits - 1; i >= 0; i--)
5220         {
5221                 i1--;
5222                 i2--;
5223                 if (i1 >= 0 && i1 < var1ndigits)
5224                         borrow += var1digits[i1];
5225                 if (i2 >= 0 && i2 < var2ndigits)
5226                         borrow -= var2digits[i2];
5227
5228                 if (borrow < 0)
5229                 {
5230                         res_digits[i] = borrow + NBASE;
5231                         borrow = -1;
5232                 }
5233                 else
5234                 {
5235                         res_digits[i] = borrow;
5236                         borrow = 0;
5237                 }
5238         }
5239
5240         Assert(borrow == 0);            /* else caller gave us var1 < var2 */
5241
5242         digitbuf_free(result->buf);
5243         result->ndigits = res_ndigits;
5244         result->buf = res_buf;
5245         result->digits = res_digits;
5246         result->weight = res_weight;
5247         result->dscale = res_dscale;
5248
5249         /* Remove leading/trailing zeroes */
5250         strip_var(result);
5251 }
5252
5253 /*
5254  * round_var
5255  *
5256  * Round the value of a variable to no more than rscale decimal digits
5257  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
5258  * rounding before the decimal point.
5259  */
5260 static void
5261 round_var(NumericVar *var, int rscale)
5262 {
5263         NumericDigit *digits = var->digits;
5264         int                     di;
5265         int                     ndigits;
5266         int                     carry;
5267
5268         var->dscale = rscale;
5269
5270         /* decimal digits wanted */
5271         di = (var->weight + 1) * DEC_DIGITS + rscale;
5272
5273         /*
5274          * If di = 0, the value loses all digits, but could round up to 1 if its
5275          * first extra digit is >= 5.  If di < 0 the result must be 0.
5276          */
5277         if (di < 0)
5278         {
5279                 var->ndigits = 0;
5280                 var->weight = 0;
5281                 var->sign = NUMERIC_POS;
5282         }
5283         else
5284         {
5285                 /* NBASE digits wanted */
5286                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5287
5288                 /* 0, or number of decimal digits to keep in last NBASE digit */
5289                 di %= DEC_DIGITS;
5290
5291                 if (ndigits < var->ndigits ||
5292                         (ndigits == var->ndigits && di > 0))
5293                 {
5294                         var->ndigits = ndigits;
5295
5296 #if DEC_DIGITS == 1
5297                         /* di must be zero */
5298                         carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5299 #else
5300                         if (di == 0)
5301                                 carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5302                         else
5303                         {
5304                                 /* Must round within last NBASE digit */
5305                                 int                     extra,
5306                                                         pow10;
5307
5308 #if DEC_DIGITS == 4
5309                                 pow10 = round_powers[di];
5310 #elif DEC_DIGITS == 2
5311                                 pow10 = 10;
5312 #else
5313 #error unsupported NBASE
5314 #endif
5315                                 extra = digits[--ndigits] % pow10;
5316                                 digits[ndigits] -= extra;
5317                                 carry = 0;
5318                                 if (extra >= pow10 / 2)
5319                                 {
5320                                         pow10 += digits[ndigits];
5321                                         if (pow10 >= NBASE)
5322                                         {
5323                                                 pow10 -= NBASE;
5324                                                 carry = 1;
5325                                         }
5326                                         digits[ndigits] = pow10;
5327                                 }
5328                         }
5329 #endif
5330
5331                         /* Propagate carry if needed */
5332                         while (carry)
5333                         {
5334                                 carry += digits[--ndigits];
5335                                 if (carry >= NBASE)
5336                                 {
5337                                         digits[ndigits] = carry - NBASE;
5338                                         carry = 1;
5339                                 }
5340                                 else
5341                                 {
5342                                         digits[ndigits] = carry;
5343                                         carry = 0;
5344                                 }
5345                         }
5346
5347                         if (ndigits < 0)
5348                         {
5349                                 Assert(ndigits == -1);  /* better not have added > 1 digit */
5350                                 Assert(var->digits > var->buf);
5351                                 var->digits--;
5352                                 var->ndigits++;
5353                                 var->weight++;
5354                         }
5355                 }
5356         }
5357 }
5358
5359 /*
5360  * trunc_var
5361  *
5362  * Truncate the value of a variable at rscale decimal digits after the
5363  * decimal point.  NOTE: we allow rscale < 0 here, implying
5364  * truncation before the decimal point.
5365  */
5366 static void
5367 trunc_var(NumericVar *var, int rscale)
5368 {
5369         int                     di;
5370         int                     ndigits;
5371
5372         var->dscale = rscale;
5373
5374         /* decimal digits wanted */
5375         di = (var->weight + 1) * DEC_DIGITS + rscale;
5376
5377         /*
5378          * If di <= 0, the value loses all digits.
5379          */
5380         if (di <= 0)
5381         {
5382                 var->ndigits = 0;
5383                 var->weight = 0;
5384                 var->sign = NUMERIC_POS;
5385         }
5386         else
5387         {
5388                 /* NBASE digits wanted */
5389                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5390
5391                 if (ndigits <= var->ndigits)
5392                 {
5393                         var->ndigits = ndigits;
5394
5395 #if DEC_DIGITS == 1
5396                         /* no within-digit stuff to worry about */
5397 #else
5398                         /* 0, or number of decimal digits to keep in last NBASE digit */
5399                         di %= DEC_DIGITS;
5400
5401                         if (di > 0)
5402                         {
5403                                 /* Must truncate within last NBASE digit */
5404                                 NumericDigit *digits = var->digits;
5405                                 int                     extra,
5406                                                         pow10;
5407
5408 #if DEC_DIGITS == 4
5409                                 pow10 = round_powers[di];
5410 #elif DEC_DIGITS == 2
5411                                 pow10 = 10;
5412 #else
5413 #error unsupported NBASE
5414 #endif
5415                                 extra = digits[--ndigits] % pow10;
5416                                 digits[ndigits] -= extra;
5417                         }
5418 #endif
5419                 }
5420         }
5421 }
5422
5423 /*
5424  * strip_var
5425  *
5426  * Strip any leading and trailing zeroes from a numeric variable
5427  */
5428 static void
5429 strip_var(NumericVar *var)
5430 {
5431         NumericDigit *digits = var->digits;
5432         int                     ndigits = var->ndigits;
5433
5434         /* Strip leading zeroes */
5435         while (ndigits > 0 && *digits == 0)
5436         {
5437                 digits++;
5438                 var->weight--;
5439                 ndigits--;
5440         }
5441
5442         /* Strip trailing zeroes */
5443         while (ndigits > 0 && digits[ndigits - 1] == 0)
5444                 ndigits--;
5445
5446         /* If it's zero, normalize the sign and weight */
5447         if (ndigits == 0)
5448         {
5449                 var->sign = NUMERIC_POS;
5450                 var->weight = 0;
5451         }
5452
5453         var->digits = digits;
5454         var->ndigits = ndigits;
5455 }