]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/numeric.c
Re-implement division for numeric values using the traditional "schoolbook"
[postgresql] / src / backend / utils / adt / numeric.c
1 /*-------------------------------------------------------------------------
2  *
3  * numeric.c
4  *        An exact numeric data type for the Postgres database system
5  *
6  * Original coding 1998, Jan Wieck.  Heavily revised 2003, Tom Lane.
7  *
8  * Many of the algorithmic ideas are borrowed from David M. Smith's "FM"
9  * multiple-precision math library, most recently published as Algorithm
10  * 786: Multiple-Precision Complex Arithmetic and Functions, ACM
11  * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998,
12  * pages 359-367.
13  *
14  * Copyright (c) 1998-2008, PostgreSQL Global Development Group
15  *
16  * IDENTIFICATION
17  *        $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.109 2008/04/04 18:45:36 tgl Exp $
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include <ctype.h>
25 #include <float.h>
26 #include <limits.h>
27 #include <math.h>
28
29 #include "access/hash.h"
30 #include "catalog/pg_type.h"
31 #include "libpq/pqformat.h"
32 #include "miscadmin.h"
33 #include "utils/array.h"
34 #include "utils/builtins.h"
35 #include "utils/int8.h"
36 #include "utils/numeric.h"
37
38 /* ----------
39  * Uncomment the following to enable compilation of dump_numeric()
40  * and dump_var() and to get a dump of any result produced by make_result().
41  * ----------
42 #define NUMERIC_DEBUG
43  */
44
45
46 /* ----------
47  * Local data types
48  *
49  * Numeric values are represented in a base-NBASE floating point format.
50  * Each "digit" ranges from 0 to NBASE-1.  The type NumericDigit is signed
51  * and wide enough to store a digit.  We assume that NBASE*NBASE can fit in
52  * an int.      Although the purely calculational routines could handle any even
53  * NBASE that's less than sqrt(INT_MAX), in practice we are only interested
54  * in NBASE a power of ten, so that I/O conversions and decimal rounding
55  * are easy.  Also, it's actually more efficient if NBASE is rather less than
56  * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var_fast to
57  * postpone processing carries.
58  * ----------
59  */
60
61 #if 0
62 #define NBASE           10
63 #define HALF_NBASE      5
64 #define DEC_DIGITS      1                       /* decimal digits per NBASE digit */
65 #define MUL_GUARD_DIGITS        4       /* these are measured in NBASE digits */
66 #define DIV_GUARD_DIGITS        8
67
68 typedef signed char NumericDigit;
69 #endif
70
71 #if 0
72 #define NBASE           100
73 #define HALF_NBASE      50
74 #define DEC_DIGITS      2                       /* decimal digits per NBASE digit */
75 #define MUL_GUARD_DIGITS        3       /* these are measured in NBASE digits */
76 #define DIV_GUARD_DIGITS        6
77
78 typedef signed char NumericDigit;
79 #endif
80
81 #if 1
82 #define NBASE           10000
83 #define HALF_NBASE      5000
84 #define DEC_DIGITS      4                       /* decimal digits per NBASE digit */
85 #define MUL_GUARD_DIGITS        2       /* these are measured in NBASE digits */
86 #define DIV_GUARD_DIGITS        4
87
88 typedef int16 NumericDigit;
89 #endif
90
91
92 /* ----------
93  * NumericVar is the format we use for arithmetic.  The digit-array part
94  * is the same as the NumericData storage format, but the header is more
95  * complex.
96  *
97  * The value represented by a NumericVar is determined by the sign, weight,
98  * ndigits, and digits[] array.
99  * Note: the first digit of a NumericVar's value is assumed to be multiplied
100  * by NBASE ** weight.  Another way to say it is that there are weight+1
101  * digits before the decimal point.  It is possible to have weight < 0.
102  *
103  * buf points at the physical start of the palloc'd digit buffer for the
104  * NumericVar.  digits points at the first digit in actual use (the one
105  * with the specified weight).  We normally leave an unused digit or two
106  * (preset to zeroes) between buf and digits, so that there is room to store
107  * a carry out of the top digit without reallocating space.  We just need to
108  * decrement digits (and increment weight) to make room for the carry digit.
109  * (There is no such extra space in a numeric value stored in the database,
110  * only in a NumericVar in memory.)
111  *
112  * If buf is NULL then the digit buffer isn't actually palloc'd and should
113  * not be freed --- see the constants below for an example.
114  *
115  * dscale, or display scale, is the nominal precision expressed as number
116  * of digits after the decimal point (it must always be >= 0 at present).
117  * dscale may be more than the number of physically stored fractional digits,
118  * implying that we have suppressed storage of significant trailing zeroes.
119  * It should never be less than the number of stored digits, since that would
120  * imply hiding digits that are present.  NOTE that dscale is always expressed
121  * in *decimal* digits, and so it may correspond to a fractional number of
122  * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits.
123  *
124  * rscale, or result scale, is the target precision for a computation.
125  * Like dscale it is expressed as number of *decimal* digits after the decimal
126  * point, and is always >= 0 at present.
127  * Note that rscale is not stored in variables --- it's figured on-the-fly
128  * from the dscales of the inputs.
129  *
130  * NB: All the variable-level functions are written in a style that makes it
131  * possible to give one and the same variable as argument and destination.
132  * This is feasible because the digit buffer is separate from the variable.
133  * ----------
134  */
135 typedef struct NumericVar
136 {
137         int                     ndigits;                /* # of digits in digits[] - can be 0! */
138         int                     weight;                 /* weight of first digit */
139         int                     sign;                   /* NUMERIC_POS, NUMERIC_NEG, or NUMERIC_NAN */
140         int                     dscale;                 /* display scale */
141         NumericDigit *buf;                      /* start of palloc'd space for digits[] */
142         NumericDigit *digits;           /* base-NBASE digits */
143 } NumericVar;
144
145
146 /* ----------
147  * Some preinitialized constants
148  * ----------
149  */
150 static NumericDigit const_zero_data[1] = {0};
151 static NumericVar const_zero =
152 {0, 0, NUMERIC_POS, 0, NULL, const_zero_data};
153
154 static NumericDigit const_one_data[1] = {1};
155 static NumericVar const_one =
156 {1, 0, NUMERIC_POS, 0, NULL, const_one_data};
157
158 static NumericDigit const_two_data[1] = {2};
159 static NumericVar const_two =
160 {1, 0, NUMERIC_POS, 0, NULL, const_two_data};
161
162 #if DEC_DIGITS == 4
163 static NumericDigit const_zero_point_five_data[1] = {5000};
164 #elif DEC_DIGITS == 2
165 static NumericDigit const_zero_point_five_data[1] = {50};
166 #elif DEC_DIGITS == 1
167 static NumericDigit const_zero_point_five_data[1] = {5};
168 #endif
169 static NumericVar const_zero_point_five =
170 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data};
171
172 #if DEC_DIGITS == 4
173 static NumericDigit const_zero_point_nine_data[1] = {9000};
174 #elif DEC_DIGITS == 2
175 static NumericDigit const_zero_point_nine_data[1] = {90};
176 #elif DEC_DIGITS == 1
177 static NumericDigit const_zero_point_nine_data[1] = {9};
178 #endif
179 static NumericVar const_zero_point_nine =
180 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data};
181
182 #if DEC_DIGITS == 4
183 static NumericDigit const_zero_point_01_data[1] = {100};
184 static NumericVar const_zero_point_01 =
185 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
186 #elif DEC_DIGITS == 2
187 static NumericDigit const_zero_point_01_data[1] = {1};
188 static NumericVar const_zero_point_01 =
189 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
190 #elif DEC_DIGITS == 1
191 static NumericDigit const_zero_point_01_data[1] = {1};
192 static NumericVar const_zero_point_01 =
193 {1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
194 #endif
195
196 #if DEC_DIGITS == 4
197 static NumericDigit const_one_point_one_data[2] = {1, 1000};
198 #elif DEC_DIGITS == 2
199 static NumericDigit const_one_point_one_data[2] = {1, 10};
200 #elif DEC_DIGITS == 1
201 static NumericDigit const_one_point_one_data[2] = {1, 1};
202 #endif
203 static NumericVar const_one_point_one =
204 {2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data};
205
206 static NumericVar const_nan =
207 {0, 0, NUMERIC_NAN, 0, NULL, NULL};
208
209 #if DEC_DIGITS == 4
210 static const int round_powers[4] = {0, 1000, 100, 10};
211 #endif
212
213
214 /* ----------
215  * Local functions
216  * ----------
217  */
218
219 #ifdef NUMERIC_DEBUG
220 static void dump_numeric(const char *str, Numeric num);
221 static void dump_var(const char *str, NumericVar *var);
222 #else
223 #define dump_numeric(s,n)
224 #define dump_var(s,v)
225 #endif
226
227 #define digitbuf_alloc(ndigits)  \
228         ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit)))
229 #define digitbuf_free(buf)      \
230         do { \
231                  if ((buf) != NULL) \
232                          pfree(buf); \
233         } while (0)
234
235 #define init_var(v)             MemSetAligned(v, 0, sizeof(NumericVar))
236
237 #define NUMERIC_DIGITS(num) ((NumericDigit *)(num)->n_data)
238 #define NUMERIC_NDIGITS(num) \
239         ((VARSIZE(num) - NUMERIC_HDRSZ) / sizeof(NumericDigit))
240
241 static void alloc_var(NumericVar *var, int ndigits);
242 static void free_var(NumericVar *var);
243 static void zero_var(NumericVar *var);
244
245 static void set_var_from_str(const char *str, NumericVar *dest);
246 static void set_var_from_num(Numeric value, NumericVar *dest);
247 static void set_var_from_var(NumericVar *value, NumericVar *dest);
248 static char *get_str_from_var(NumericVar *var, int dscale);
249
250 static Numeric make_result(NumericVar *var);
251
252 static void apply_typmod(NumericVar *var, int32 typmod);
253
254 static int32 numericvar_to_int4(NumericVar *var);
255 static bool numericvar_to_int8(NumericVar *var, int64 *result);
256 static void int8_to_numericvar(int64 val, NumericVar *var);
257 static double numeric_to_double_no_overflow(Numeric num);
258 static double numericvar_to_double_no_overflow(NumericVar *var);
259
260 static int      cmp_numerics(Numeric num1, Numeric num2);
261 static int      cmp_var(NumericVar *var1, NumericVar *var2);
262 static int cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
263                            int var1weight, int var1sign,
264                            const NumericDigit *var2digits, int var2ndigits,
265                            int var2weight, int var2sign);
266 static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
267 static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
268 static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
269                 int rscale);
270 static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
271                 int rscale, bool round);
272 static void div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result,
273                 int rscale, bool round);
274 static int      select_div_scale(NumericVar *var1, NumericVar *var2);
275 static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
276 static void ceil_var(NumericVar *var, NumericVar *result);
277 static void floor_var(NumericVar *var, NumericVar *result);
278
279 static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale);
280 static void exp_var(NumericVar *arg, NumericVar *result, int rscale);
281 static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale);
282 static void ln_var(NumericVar *arg, NumericVar *result, int rscale);
283 static void log_var(NumericVar *base, NumericVar *num, NumericVar *result);
284 static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result);
285 static void power_var_int(NumericVar *base, int exp, NumericVar *result,
286                           int rscale);
287
288 static int      cmp_abs(NumericVar *var1, NumericVar *var2);
289 static int cmp_abs_common(const NumericDigit *var1digits, int var1ndigits,
290                            int var1weight,
291                            const NumericDigit *var2digits, int var2ndigits,
292                            int var2weight);
293 static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
294 static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
295 static void round_var(NumericVar *var, int rscale);
296 static void trunc_var(NumericVar *var, int rscale);
297 static void strip_var(NumericVar *var);
298 static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
299                            NumericVar *count_var, NumericVar *result_var);
300
301
302 /* ----------------------------------------------------------------------
303  *
304  * Input-, output- and rounding-functions
305  *
306  * ----------------------------------------------------------------------
307  */
308
309
310 /*
311  * numeric_in() -
312  *
313  *      Input function for numeric data type
314  */
315 Datum
316 numeric_in(PG_FUNCTION_ARGS)
317 {
318         char       *str = PG_GETARG_CSTRING(0);
319
320 #ifdef NOT_USED
321         Oid                     typelem = PG_GETARG_OID(1);
322 #endif
323         int32           typmod = PG_GETARG_INT32(2);
324         NumericVar      value;
325         Numeric         res;
326
327         /*
328          * Check for NaN
329          */
330         if (pg_strcasecmp(str, "NaN") == 0)
331                 PG_RETURN_NUMERIC(make_result(&const_nan));
332
333         /*
334          * Use set_var_from_str() to parse the input string and return it in the
335          * packed DB storage format
336          */
337         init_var(&value);
338         set_var_from_str(str, &value);
339
340         apply_typmod(&value, typmod);
341
342         res = make_result(&value);
343         free_var(&value);
344
345         PG_RETURN_NUMERIC(res);
346 }
347
348
349 /*
350  * numeric_out() -
351  *
352  *      Output function for numeric data type
353  */
354 Datum
355 numeric_out(PG_FUNCTION_ARGS)
356 {
357         Numeric         num = PG_GETARG_NUMERIC(0);
358         NumericVar      x;
359         char       *str;
360
361         /*
362          * Handle NaN
363          */
364         if (NUMERIC_IS_NAN(num))
365                 PG_RETURN_CSTRING(pstrdup("NaN"));
366
367         /*
368          * Get the number in the variable format.
369          *
370          * Even if we didn't need to change format, we'd still need to copy the
371          * value to have a modifiable copy for rounding.  set_var_from_num() also
372          * guarantees there is extra digit space in case we produce a carry out
373          * from rounding.
374          */
375         init_var(&x);
376         set_var_from_num(num, &x);
377
378         str = get_str_from_var(&x, x.dscale);
379
380         free_var(&x);
381
382         PG_RETURN_CSTRING(str);
383 }
384
385 /*
386  *              numeric_recv                    - converts external binary format to numeric
387  *
388  * External format is a sequence of int16's:
389  * ndigits, weight, sign, dscale, NumericDigits.
390  */
391 Datum
392 numeric_recv(PG_FUNCTION_ARGS)
393 {
394         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
395
396 #ifdef NOT_USED
397         Oid                     typelem = PG_GETARG_OID(1);
398 #endif
399         int32           typmod = PG_GETARG_INT32(2);
400         NumericVar      value;
401         Numeric         res;
402         int                     len,
403                                 i;
404
405         init_var(&value);
406
407         len = (uint16) pq_getmsgint(buf, sizeof(uint16));
408         if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE)
409                 ereport(ERROR,
410                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
411                                  errmsg("invalid length in external \"numeric\" value")));
412
413         alloc_var(&value, len);
414
415         value.weight = (int16) pq_getmsgint(buf, sizeof(int16));
416         value.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
417         if (!(value.sign == NUMERIC_POS ||
418                   value.sign == NUMERIC_NEG ||
419                   value.sign == NUMERIC_NAN))
420                 ereport(ERROR,
421                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
422                                  errmsg("invalid sign in external \"numeric\" value")));
423
424         value.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
425         for (i = 0; i < len; i++)
426         {
427                 NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
428
429                 if (d < 0 || d >= NBASE)
430                         ereport(ERROR,
431                                         (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
432                                          errmsg("invalid digit in external \"numeric\" value")));
433                 value.digits[i] = d;
434         }
435
436         apply_typmod(&value, typmod);
437
438         res = make_result(&value);
439         free_var(&value);
440
441         PG_RETURN_NUMERIC(res);
442 }
443
444 /*
445  *              numeric_send                    - converts numeric to binary format
446  */
447 Datum
448 numeric_send(PG_FUNCTION_ARGS)
449 {
450         Numeric         num = PG_GETARG_NUMERIC(0);
451         NumericVar      x;
452         StringInfoData buf;
453         int                     i;
454
455         init_var(&x);
456         set_var_from_num(num, &x);
457
458         pq_begintypsend(&buf);
459
460         pq_sendint(&buf, x.ndigits, sizeof(int16));
461         pq_sendint(&buf, x.weight, sizeof(int16));
462         pq_sendint(&buf, x.sign, sizeof(int16));
463         pq_sendint(&buf, x.dscale, sizeof(int16));
464         for (i = 0; i < x.ndigits; i++)
465                 pq_sendint(&buf, x.digits[i], sizeof(NumericDigit));
466
467         free_var(&x);
468
469         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
470 }
471
472
473 /*
474  * numeric() -
475  *
476  *      This is a special function called by the Postgres database system
477  *      before a value is stored in a tuple's attribute. The precision and
478  *      scale of the attribute have to be applied on the value.
479  */
480 Datum
481 numeric         (PG_FUNCTION_ARGS)
482 {
483         Numeric         num = PG_GETARG_NUMERIC(0);
484         int32           typmod = PG_GETARG_INT32(1);
485         Numeric         new;
486         int32           tmp_typmod;
487         int                     precision;
488         int                     scale;
489         int                     ddigits;
490         int                     maxdigits;
491         NumericVar      var;
492
493         /*
494          * Handle NaN
495          */
496         if (NUMERIC_IS_NAN(num))
497                 PG_RETURN_NUMERIC(make_result(&const_nan));
498
499         /*
500          * If the value isn't a valid type modifier, simply return a copy of the
501          * input value
502          */
503         if (typmod < (int32) (VARHDRSZ))
504         {
505                 new = (Numeric) palloc(VARSIZE(num));
506                 memcpy(new, num, VARSIZE(num));
507                 PG_RETURN_NUMERIC(new);
508         }
509
510         /*
511          * Get the precision and scale out of the typmod value
512          */
513         tmp_typmod = typmod - VARHDRSZ;
514         precision = (tmp_typmod >> 16) & 0xffff;
515         scale = tmp_typmod & 0xffff;
516         maxdigits = precision - scale;
517
518         /*
519          * If the number is certainly in bounds and due to the target scale no
520          * rounding could be necessary, just make a copy of the input and modify
521          * its scale fields.  (Note we assume the existing dscale is honest...)
522          */
523         ddigits = (num->n_weight + 1) * DEC_DIGITS;
524         if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num))
525         {
526                 new = (Numeric) palloc(VARSIZE(num));
527                 memcpy(new, num, VARSIZE(num));
528                 new->n_sign_dscale = NUMERIC_SIGN(new) |
529                         ((uint16) scale & NUMERIC_DSCALE_MASK);
530                 PG_RETURN_NUMERIC(new);
531         }
532
533         /*
534          * We really need to fiddle with things - unpack the number into a
535          * variable and let apply_typmod() do it.
536          */
537         init_var(&var);
538
539         set_var_from_num(num, &var);
540         apply_typmod(&var, typmod);
541         new = make_result(&var);
542
543         free_var(&var);
544
545         PG_RETURN_NUMERIC(new);
546 }
547
548 Datum
549 numerictypmodin(PG_FUNCTION_ARGS)
550 {
551         ArrayType  *ta = PG_GETARG_ARRAYTYPE_P(0);
552         int32      *tl;
553         int                     n;
554         int32           typmod;
555
556         tl = ArrayGetIntegerTypmods(ta, &n);
557
558         if (n == 2)
559         {
560                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
561                         ereport(ERROR,
562                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
563                                          errmsg("NUMERIC precision %d must be between 1 and %d",
564                                                         tl[0], NUMERIC_MAX_PRECISION)));
565                 if (tl[1] < 0 || tl[1] > tl[0])
566                         ereport(ERROR,
567                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
568                                 errmsg("NUMERIC scale %d must be between 0 and precision %d",
569                                            tl[1], tl[0])));
570                 typmod = ((tl[0] << 16) | tl[1]) + VARHDRSZ;
571         }
572         else if (n == 1)
573         {
574                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
575                         ereport(ERROR,
576                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
577                                          errmsg("NUMERIC precision %d must be between 1 and %d",
578                                                         tl[0], NUMERIC_MAX_PRECISION)));
579                 /* scale defaults to zero */
580                 typmod = (tl[0] << 16) + VARHDRSZ;
581         }
582         else
583         {
584                 ereport(ERROR,
585                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
586                                  errmsg("invalid NUMERIC type modifier")));
587                 typmod = 0;                             /* keep compiler quiet */
588         }
589
590         PG_RETURN_INT32(typmod);
591 }
592
593 Datum
594 numerictypmodout(PG_FUNCTION_ARGS)
595 {
596         int32           typmod = PG_GETARG_INT32(0);
597         char       *res = (char *) palloc(64);
598
599         if (typmod >= 0)
600                 snprintf(res, 64, "(%d,%d)",
601                                  ((typmod - VARHDRSZ) >> 16) & 0xffff,
602                                  (typmod - VARHDRSZ) & 0xffff);
603         else
604                 *res = '\0';
605
606         PG_RETURN_CSTRING(res);
607 }
608
609
610 /* ----------------------------------------------------------------------
611  *
612  * Sign manipulation, rounding and the like
613  *
614  * ----------------------------------------------------------------------
615  */
616
617 Datum
618 numeric_abs(PG_FUNCTION_ARGS)
619 {
620         Numeric         num = PG_GETARG_NUMERIC(0);
621         Numeric         res;
622
623         /*
624          * Handle NaN
625          */
626         if (NUMERIC_IS_NAN(num))
627                 PG_RETURN_NUMERIC(make_result(&const_nan));
628
629         /*
630          * Do it the easy way directly on the packed format
631          */
632         res = (Numeric) palloc(VARSIZE(num));
633         memcpy(res, num, VARSIZE(num));
634
635         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
636
637         PG_RETURN_NUMERIC(res);
638 }
639
640
641 Datum
642 numeric_uminus(PG_FUNCTION_ARGS)
643 {
644         Numeric         num = PG_GETARG_NUMERIC(0);
645         Numeric         res;
646
647         /*
648          * Handle NaN
649          */
650         if (NUMERIC_IS_NAN(num))
651                 PG_RETURN_NUMERIC(make_result(&const_nan));
652
653         /*
654          * Do it the easy way directly on the packed format
655          */
656         res = (Numeric) palloc(VARSIZE(num));
657         memcpy(res, num, VARSIZE(num));
658
659         /*
660          * The packed format is known to be totally zero digit trimmed always. So
661          * we can identify a ZERO by the fact that there are no digits at all.  Do
662          * nothing to a zero.
663          */
664         if (VARSIZE(num) != NUMERIC_HDRSZ)
665         {
666                 /* Else, flip the sign */
667                 if (NUMERIC_SIGN(num) == NUMERIC_POS)
668                         res->n_sign_dscale = NUMERIC_NEG | NUMERIC_DSCALE(num);
669                 else
670                         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
671         }
672
673         PG_RETURN_NUMERIC(res);
674 }
675
676
677 Datum
678 numeric_uplus(PG_FUNCTION_ARGS)
679 {
680         Numeric         num = PG_GETARG_NUMERIC(0);
681         Numeric         res;
682
683         res = (Numeric) palloc(VARSIZE(num));
684         memcpy(res, num, VARSIZE(num));
685
686         PG_RETURN_NUMERIC(res);
687 }
688
689 /*
690  * numeric_sign() -
691  *
692  * returns -1 if the argument is less than 0, 0 if the argument is equal
693  * to 0, and 1 if the argument is greater than zero.
694  */
695 Datum
696 numeric_sign(PG_FUNCTION_ARGS)
697 {
698         Numeric         num = PG_GETARG_NUMERIC(0);
699         Numeric         res;
700         NumericVar      result;
701
702         /*
703          * Handle NaN
704          */
705         if (NUMERIC_IS_NAN(num))
706                 PG_RETURN_NUMERIC(make_result(&const_nan));
707
708         init_var(&result);
709
710         /*
711          * The packed format is known to be totally zero digit trimmed always. So
712          * we can identify a ZERO by the fact that there are no digits at all.
713          */
714         if (VARSIZE(num) == NUMERIC_HDRSZ)
715                 set_var_from_var(&const_zero, &result);
716         else
717         {
718                 /*
719                  * And if there are some, we return a copy of ONE with the sign of our
720                  * argument
721                  */
722                 set_var_from_var(&const_one, &result);
723                 result.sign = NUMERIC_SIGN(num);
724         }
725
726         res = make_result(&result);
727         free_var(&result);
728
729         PG_RETURN_NUMERIC(res);
730 }
731
732
733 /*
734  * numeric_round() -
735  *
736  *      Round a value to have 'scale' digits after the decimal point.
737  *      We allow negative 'scale', implying rounding before the decimal
738  *      point --- Oracle interprets rounding that way.
739  */
740 Datum
741 numeric_round(PG_FUNCTION_ARGS)
742 {
743         Numeric         num = PG_GETARG_NUMERIC(0);
744         int32           scale = PG_GETARG_INT32(1);
745         Numeric         res;
746         NumericVar      arg;
747
748         /*
749          * Handle NaN
750          */
751         if (NUMERIC_IS_NAN(num))
752                 PG_RETURN_NUMERIC(make_result(&const_nan));
753
754         /*
755          * Limit the scale value to avoid possible overflow in calculations
756          */
757         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
758         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
759
760         /*
761          * Unpack the argument and round it at the proper digit position
762          */
763         init_var(&arg);
764         set_var_from_num(num, &arg);
765
766         round_var(&arg, scale);
767
768         /* We don't allow negative output dscale */
769         if (scale < 0)
770                 arg.dscale = 0;
771
772         /*
773          * Return the rounded result
774          */
775         res = make_result(&arg);
776
777         free_var(&arg);
778         PG_RETURN_NUMERIC(res);
779 }
780
781
782 /*
783  * numeric_trunc() -
784  *
785  *      Truncate a value to have 'scale' digits after the decimal point.
786  *      We allow negative 'scale', implying a truncation before the decimal
787  *      point --- Oracle interprets truncation that way.
788  */
789 Datum
790 numeric_trunc(PG_FUNCTION_ARGS)
791 {
792         Numeric         num = PG_GETARG_NUMERIC(0);
793         int32           scale = PG_GETARG_INT32(1);
794         Numeric         res;
795         NumericVar      arg;
796
797         /*
798          * Handle NaN
799          */
800         if (NUMERIC_IS_NAN(num))
801                 PG_RETURN_NUMERIC(make_result(&const_nan));
802
803         /*
804          * Limit the scale value to avoid possible overflow in calculations
805          */
806         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
807         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
808
809         /*
810          * Unpack the argument and truncate it at the proper digit position
811          */
812         init_var(&arg);
813         set_var_from_num(num, &arg);
814
815         trunc_var(&arg, scale);
816
817         /* We don't allow negative output dscale */
818         if (scale < 0)
819                 arg.dscale = 0;
820
821         /*
822          * Return the truncated result
823          */
824         res = make_result(&arg);
825
826         free_var(&arg);
827         PG_RETURN_NUMERIC(res);
828 }
829
830
831 /*
832  * numeric_ceil() -
833  *
834  *      Return the smallest integer greater than or equal to the argument
835  */
836 Datum
837 numeric_ceil(PG_FUNCTION_ARGS)
838 {
839         Numeric         num = PG_GETARG_NUMERIC(0);
840         Numeric         res;
841         NumericVar      result;
842
843         if (NUMERIC_IS_NAN(num))
844                 PG_RETURN_NUMERIC(make_result(&const_nan));
845
846         init_var(&result);
847
848         set_var_from_num(num, &result);
849         ceil_var(&result, &result);
850
851         res = make_result(&result);
852         free_var(&result);
853
854         PG_RETURN_NUMERIC(res);
855 }
856
857
858 /*
859  * numeric_floor() -
860  *
861  *      Return the largest integer equal to or less than the argument
862  */
863 Datum
864 numeric_floor(PG_FUNCTION_ARGS)
865 {
866         Numeric         num = PG_GETARG_NUMERIC(0);
867         Numeric         res;
868         NumericVar      result;
869
870         if (NUMERIC_IS_NAN(num))
871                 PG_RETURN_NUMERIC(make_result(&const_nan));
872
873         init_var(&result);
874
875         set_var_from_num(num, &result);
876         floor_var(&result, &result);
877
878         res = make_result(&result);
879         free_var(&result);
880
881         PG_RETURN_NUMERIC(res);
882 }
883
884 /*
885  * Implements the numeric version of the width_bucket() function
886  * defined by SQL2003. See also width_bucket_float8().
887  *
888  * 'bound1' and 'bound2' are the lower and upper bounds of the
889  * histogram's range, respectively. 'count' is the number of buckets
890  * in the histogram. width_bucket() returns an integer indicating the
891  * bucket number that 'operand' belongs to in an equiwidth histogram
892  * with the specified characteristics. An operand smaller than the
893  * lower bound is assigned to bucket 0. An operand greater than the
894  * upper bound is assigned to an additional bucket (with number
895  * count+1). We don't allow "NaN" for any of the numeric arguments.
896  */
897 Datum
898 width_bucket_numeric(PG_FUNCTION_ARGS)
899 {
900         Numeric         operand = PG_GETARG_NUMERIC(0);
901         Numeric         bound1 = PG_GETARG_NUMERIC(1);
902         Numeric         bound2 = PG_GETARG_NUMERIC(2);
903         int32           count = PG_GETARG_INT32(3);
904         NumericVar      count_var;
905         NumericVar      result_var;
906         int32           result;
907
908         if (count <= 0)
909                 ereport(ERROR,
910                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
911                                  errmsg("count must be greater than zero")));
912
913         if (NUMERIC_IS_NAN(operand) ||
914                 NUMERIC_IS_NAN(bound1) ||
915                 NUMERIC_IS_NAN(bound2))
916                 ereport(ERROR,
917                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
918                           errmsg("operand, lower bound and upper bound cannot be NaN")));
919
920         init_var(&result_var);
921         init_var(&count_var);
922
923         /* Convert 'count' to a numeric, for ease of use later */
924         int8_to_numericvar((int64) count, &count_var);
925
926         switch (cmp_numerics(bound1, bound2))
927         {
928                 case 0:
929                         ereport(ERROR,
930                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
931                                  errmsg("lower bound cannot equal upper bound")));
932
933                         /* bound1 < bound2 */
934                 case -1:
935                         if (cmp_numerics(operand, bound1) < 0)
936                                 set_var_from_var(&const_zero, &result_var);
937                         else if (cmp_numerics(operand, bound2) >= 0)
938                                 add_var(&count_var, &const_one, &result_var);
939                         else
940                                 compute_bucket(operand, bound1, bound2,
941                                                            &count_var, &result_var);
942                         break;
943
944                         /* bound1 > bound2 */
945                 case 1:
946                         if (cmp_numerics(operand, bound1) > 0)
947                                 set_var_from_var(&const_zero, &result_var);
948                         else if (cmp_numerics(operand, bound2) <= 0)
949                                 add_var(&count_var, &const_one, &result_var);
950                         else
951                                 compute_bucket(operand, bound1, bound2,
952                                                            &count_var, &result_var);
953                         break;
954         }
955
956         /* if result exceeds the range of a legal int4, we ereport here */
957         result = numericvar_to_int4(&result_var);
958
959         free_var(&count_var);
960         free_var(&result_var);
961
962         PG_RETURN_INT32(result);
963 }
964
965 /*
966  * If 'operand' is not outside the bucket range, determine the correct
967  * bucket for it to go. The calculations performed by this function
968  * are derived directly from the SQL2003 spec.
969  */
970 static void
971 compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
972                            NumericVar *count_var, NumericVar *result_var)
973 {
974         NumericVar      bound1_var;
975         NumericVar      bound2_var;
976         NumericVar      operand_var;
977
978         init_var(&bound1_var);
979         init_var(&bound2_var);
980         init_var(&operand_var);
981
982         set_var_from_num(bound1, &bound1_var);
983         set_var_from_num(bound2, &bound2_var);
984         set_var_from_num(operand, &operand_var);
985
986         if (cmp_var(&bound1_var, &bound2_var) < 0)
987         {
988                 sub_var(&operand_var, &bound1_var, &operand_var);
989                 sub_var(&bound2_var, &bound1_var, &bound2_var);
990                 div_var(&operand_var, &bound2_var, result_var,
991                                 select_div_scale(&operand_var, &bound2_var), true);
992         }
993         else
994         {
995                 sub_var(&bound1_var, &operand_var, &operand_var);
996                 sub_var(&bound1_var, &bound2_var, &bound1_var);
997                 div_var(&operand_var, &bound1_var, result_var,
998                                 select_div_scale(&operand_var, &bound1_var), true);
999         }
1000
1001         mul_var(result_var, count_var, result_var,
1002                         result_var->dscale + count_var->dscale);
1003         add_var(result_var, &const_one, result_var);
1004         floor_var(result_var, result_var);
1005
1006         free_var(&bound1_var);
1007         free_var(&bound2_var);
1008         free_var(&operand_var);
1009 }
1010
1011 /* ----------------------------------------------------------------------
1012  *
1013  * Comparison functions
1014  *
1015  * Note: btree indexes need these routines not to leak memory; therefore,
1016  * be careful to free working copies of toasted datums.  Most places don't
1017  * need to be so careful.
1018  * ----------------------------------------------------------------------
1019  */
1020
1021
1022 Datum
1023 numeric_cmp(PG_FUNCTION_ARGS)
1024 {
1025         Numeric         num1 = PG_GETARG_NUMERIC(0);
1026         Numeric         num2 = PG_GETARG_NUMERIC(1);
1027         int                     result;
1028
1029         result = cmp_numerics(num1, num2);
1030
1031         PG_FREE_IF_COPY(num1, 0);
1032         PG_FREE_IF_COPY(num2, 1);
1033
1034         PG_RETURN_INT32(result);
1035 }
1036
1037
1038 Datum
1039 numeric_eq(PG_FUNCTION_ARGS)
1040 {
1041         Numeric         num1 = PG_GETARG_NUMERIC(0);
1042         Numeric         num2 = PG_GETARG_NUMERIC(1);
1043         bool            result;
1044
1045         result = cmp_numerics(num1, num2) == 0;
1046
1047         PG_FREE_IF_COPY(num1, 0);
1048         PG_FREE_IF_COPY(num2, 1);
1049
1050         PG_RETURN_BOOL(result);
1051 }
1052
1053 Datum
1054 numeric_ne(PG_FUNCTION_ARGS)
1055 {
1056         Numeric         num1 = PG_GETARG_NUMERIC(0);
1057         Numeric         num2 = PG_GETARG_NUMERIC(1);
1058         bool            result;
1059
1060         result = cmp_numerics(num1, num2) != 0;
1061
1062         PG_FREE_IF_COPY(num1, 0);
1063         PG_FREE_IF_COPY(num2, 1);
1064
1065         PG_RETURN_BOOL(result);
1066 }
1067
1068 Datum
1069 numeric_gt(PG_FUNCTION_ARGS)
1070 {
1071         Numeric         num1 = PG_GETARG_NUMERIC(0);
1072         Numeric         num2 = PG_GETARG_NUMERIC(1);
1073         bool            result;
1074
1075         result = cmp_numerics(num1, num2) > 0;
1076
1077         PG_FREE_IF_COPY(num1, 0);
1078         PG_FREE_IF_COPY(num2, 1);
1079
1080         PG_RETURN_BOOL(result);
1081 }
1082
1083 Datum
1084 numeric_ge(PG_FUNCTION_ARGS)
1085 {
1086         Numeric         num1 = PG_GETARG_NUMERIC(0);
1087         Numeric         num2 = PG_GETARG_NUMERIC(1);
1088         bool            result;
1089
1090         result = cmp_numerics(num1, num2) >= 0;
1091
1092         PG_FREE_IF_COPY(num1, 0);
1093         PG_FREE_IF_COPY(num2, 1);
1094
1095         PG_RETURN_BOOL(result);
1096 }
1097
1098 Datum
1099 numeric_lt(PG_FUNCTION_ARGS)
1100 {
1101         Numeric         num1 = PG_GETARG_NUMERIC(0);
1102         Numeric         num2 = PG_GETARG_NUMERIC(1);
1103         bool            result;
1104
1105         result = cmp_numerics(num1, num2) < 0;
1106
1107         PG_FREE_IF_COPY(num1, 0);
1108         PG_FREE_IF_COPY(num2, 1);
1109
1110         PG_RETURN_BOOL(result);
1111 }
1112
1113 Datum
1114 numeric_le(PG_FUNCTION_ARGS)
1115 {
1116         Numeric         num1 = PG_GETARG_NUMERIC(0);
1117         Numeric         num2 = PG_GETARG_NUMERIC(1);
1118         bool            result;
1119
1120         result = cmp_numerics(num1, num2) <= 0;
1121
1122         PG_FREE_IF_COPY(num1, 0);
1123         PG_FREE_IF_COPY(num2, 1);
1124
1125         PG_RETURN_BOOL(result);
1126 }
1127
1128 static int
1129 cmp_numerics(Numeric num1, Numeric num2)
1130 {
1131         int                     result;
1132
1133         /*
1134          * We consider all NANs to be equal and larger than any non-NAN. This is
1135          * somewhat arbitrary; the important thing is to have a consistent sort
1136          * order.
1137          */
1138         if (NUMERIC_IS_NAN(num1))
1139         {
1140                 if (NUMERIC_IS_NAN(num2))
1141                         result = 0;                     /* NAN = NAN */
1142                 else
1143                         result = 1;                     /* NAN > non-NAN */
1144         }
1145         else if (NUMERIC_IS_NAN(num2))
1146         {
1147                 result = -1;                    /* non-NAN < NAN */
1148         }
1149         else
1150         {
1151                 result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
1152                                                                 num1->n_weight, NUMERIC_SIGN(num1),
1153                                                                 NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
1154                                                                 num2->n_weight, NUMERIC_SIGN(num2));
1155         }
1156
1157         return result;
1158 }
1159
1160 Datum
1161 hash_numeric(PG_FUNCTION_ARGS)
1162 {
1163         Numeric         key = PG_GETARG_NUMERIC(0);
1164         Datum           digit_hash;
1165         Datum           result;
1166         int                     weight;
1167         int                     start_offset;
1168         int                     end_offset;
1169         int                     i;
1170         int                     hash_len;
1171
1172         /* If it's NaN, don't try to hash the rest of the fields */
1173         if (NUMERIC_IS_NAN(key))
1174                 PG_RETURN_UINT32(0);
1175
1176         weight = key->n_weight;
1177         start_offset = 0;
1178         end_offset = 0;
1179
1180         /*
1181          * Omit any leading or trailing zeros from the input to the hash. The
1182          * numeric implementation *should* guarantee that leading and trailing
1183          * zeros are suppressed, but we're paranoid. Note that we measure the
1184          * starting and ending offsets in units of NumericDigits, not bytes.
1185          */
1186         for (i = 0; i < NUMERIC_NDIGITS(key); i++)
1187         {
1188                 if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
1189                         break;
1190
1191                 start_offset++;
1192
1193                 /*
1194                  * The weight is effectively the # of digits before the decimal point,
1195                  * so decrement it for each leading zero we skip.
1196                  */
1197                 weight--;
1198         }
1199
1200         /*
1201          * If there are no non-zero digits, then the value of the number is zero,
1202          * regardless of any other fields.
1203          */
1204         if (NUMERIC_NDIGITS(key) == start_offset)
1205                 PG_RETURN_UINT32(-1);
1206
1207         for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--)
1208         {
1209                 if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
1210                         break;
1211
1212                 end_offset++;
1213         }
1214
1215         /* If we get here, there should be at least one non-zero digit */
1216         Assert(start_offset + end_offset < NUMERIC_NDIGITS(key));
1217
1218         /*
1219          * Note that we don't hash on the Numeric's scale, since two numerics can
1220          * compare equal but have different scales. We also don't hash on the
1221          * sign, although we could: since a sign difference implies inequality,
1222          * this shouldn't affect correctness.
1223          */
1224         hash_len = NUMERIC_NDIGITS(key) - start_offset - end_offset;
1225         digit_hash = hash_any((unsigned char *) (NUMERIC_DIGITS(key) + start_offset),
1226                                                   hash_len * sizeof(NumericDigit));
1227
1228         /* Mix in the weight, via XOR */
1229         result = digit_hash ^ weight;
1230
1231         PG_RETURN_DATUM(result);
1232 }
1233
1234
1235 /* ----------------------------------------------------------------------
1236  *
1237  * Basic arithmetic functions
1238  *
1239  * ----------------------------------------------------------------------
1240  */
1241
1242
1243 /*
1244  * numeric_add() -
1245  *
1246  *      Add two numerics
1247  */
1248 Datum
1249 numeric_add(PG_FUNCTION_ARGS)
1250 {
1251         Numeric         num1 = PG_GETARG_NUMERIC(0);
1252         Numeric         num2 = PG_GETARG_NUMERIC(1);
1253         NumericVar      arg1;
1254         NumericVar      arg2;
1255         NumericVar      result;
1256         Numeric         res;
1257
1258         /*
1259          * Handle NaN
1260          */
1261         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1262                 PG_RETURN_NUMERIC(make_result(&const_nan));
1263
1264         /*
1265          * Unpack the values, let add_var() compute the result and return it.
1266          */
1267         init_var(&arg1);
1268         init_var(&arg2);
1269         init_var(&result);
1270
1271         set_var_from_num(num1, &arg1);
1272         set_var_from_num(num2, &arg2);
1273
1274         add_var(&arg1, &arg2, &result);
1275
1276         res = make_result(&result);
1277
1278         free_var(&arg1);
1279         free_var(&arg2);
1280         free_var(&result);
1281
1282         PG_RETURN_NUMERIC(res);
1283 }
1284
1285
1286 /*
1287  * numeric_sub() -
1288  *
1289  *      Subtract one numeric from another
1290  */
1291 Datum
1292 numeric_sub(PG_FUNCTION_ARGS)
1293 {
1294         Numeric         num1 = PG_GETARG_NUMERIC(0);
1295         Numeric         num2 = PG_GETARG_NUMERIC(1);
1296         NumericVar      arg1;
1297         NumericVar      arg2;
1298         NumericVar      result;
1299         Numeric         res;
1300
1301         /*
1302          * Handle NaN
1303          */
1304         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1305                 PG_RETURN_NUMERIC(make_result(&const_nan));
1306
1307         /*
1308          * Unpack the values, let sub_var() compute the result and return it.
1309          */
1310         init_var(&arg1);
1311         init_var(&arg2);
1312         init_var(&result);
1313
1314         set_var_from_num(num1, &arg1);
1315         set_var_from_num(num2, &arg2);
1316
1317         sub_var(&arg1, &arg2, &result);
1318
1319         res = make_result(&result);
1320
1321         free_var(&arg1);
1322         free_var(&arg2);
1323         free_var(&result);
1324
1325         PG_RETURN_NUMERIC(res);
1326 }
1327
1328
1329 /*
1330  * numeric_mul() -
1331  *
1332  *      Calculate the product of two numerics
1333  */
1334 Datum
1335 numeric_mul(PG_FUNCTION_ARGS)
1336 {
1337         Numeric         num1 = PG_GETARG_NUMERIC(0);
1338         Numeric         num2 = PG_GETARG_NUMERIC(1);
1339         NumericVar      arg1;
1340         NumericVar      arg2;
1341         NumericVar      result;
1342         Numeric         res;
1343
1344         /*
1345          * Handle NaN
1346          */
1347         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1348                 PG_RETURN_NUMERIC(make_result(&const_nan));
1349
1350         /*
1351          * Unpack the values, let mul_var() compute the result and return it.
1352          * Unlike add_var() and sub_var(), mul_var() will round its result. In the
1353          * case of numeric_mul(), which is invoked for the * operator on numerics,
1354          * we request exact representation for the product (rscale = sum(dscale of
1355          * arg1, dscale of arg2)).
1356          */
1357         init_var(&arg1);
1358         init_var(&arg2);
1359         init_var(&result);
1360
1361         set_var_from_num(num1, &arg1);
1362         set_var_from_num(num2, &arg2);
1363
1364         mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
1365
1366         res = make_result(&result);
1367
1368         free_var(&arg1);
1369         free_var(&arg2);
1370         free_var(&result);
1371
1372         PG_RETURN_NUMERIC(res);
1373 }
1374
1375
1376 /*
1377  * numeric_div() -
1378  *
1379  *      Divide one numeric into another
1380  */
1381 Datum
1382 numeric_div(PG_FUNCTION_ARGS)
1383 {
1384         Numeric         num1 = PG_GETARG_NUMERIC(0);
1385         Numeric         num2 = PG_GETARG_NUMERIC(1);
1386         NumericVar      arg1;
1387         NumericVar      arg2;
1388         NumericVar      result;
1389         Numeric         res;
1390         int                     rscale;
1391
1392         /*
1393          * Handle NaN
1394          */
1395         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1396                 PG_RETURN_NUMERIC(make_result(&const_nan));
1397
1398         /*
1399          * Unpack the arguments
1400          */
1401         init_var(&arg1);
1402         init_var(&arg2);
1403         init_var(&result);
1404
1405         set_var_from_num(num1, &arg1);
1406         set_var_from_num(num2, &arg2);
1407
1408         /*
1409          * Select scale for division result
1410          */
1411         rscale = select_div_scale(&arg1, &arg2);
1412
1413         /*
1414          * Do the divide and return the result
1415          */
1416         div_var(&arg1, &arg2, &result, rscale, true);
1417
1418         res = make_result(&result);
1419
1420         free_var(&arg1);
1421         free_var(&arg2);
1422         free_var(&result);
1423
1424         PG_RETURN_NUMERIC(res);
1425 }
1426
1427
1428 /*
1429  * numeric_div_trunc() -
1430  *
1431  *      Divide one numeric into another, truncating the result to an integer
1432  */
1433 Datum
1434 numeric_div_trunc(PG_FUNCTION_ARGS)
1435 {
1436         Numeric         num1 = PG_GETARG_NUMERIC(0);
1437         Numeric         num2 = PG_GETARG_NUMERIC(1);
1438         NumericVar      arg1;
1439         NumericVar      arg2;
1440         NumericVar      result;
1441         Numeric         res;
1442
1443         /*
1444          * Handle NaN
1445          */
1446         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1447                 PG_RETURN_NUMERIC(make_result(&const_nan));
1448
1449         /*
1450          * Unpack the arguments
1451          */
1452         init_var(&arg1);
1453         init_var(&arg2);
1454         init_var(&result);
1455
1456         set_var_from_num(num1, &arg1);
1457         set_var_from_num(num2, &arg2);
1458
1459         /*
1460          * Do the divide and return the result
1461          */
1462         div_var(&arg1, &arg2, &result, 0, false);
1463
1464         res = make_result(&result);
1465
1466         free_var(&arg1);
1467         free_var(&arg2);
1468         free_var(&result);
1469
1470         PG_RETURN_NUMERIC(res);
1471 }
1472
1473
1474 /*
1475  * numeric_mod() -
1476  *
1477  *      Calculate the modulo of two numerics
1478  */
1479 Datum
1480 numeric_mod(PG_FUNCTION_ARGS)
1481 {
1482         Numeric         num1 = PG_GETARG_NUMERIC(0);
1483         Numeric         num2 = PG_GETARG_NUMERIC(1);
1484         Numeric         res;
1485         NumericVar      arg1;
1486         NumericVar      arg2;
1487         NumericVar      result;
1488
1489         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1490                 PG_RETURN_NUMERIC(make_result(&const_nan));
1491
1492         init_var(&arg1);
1493         init_var(&arg2);
1494         init_var(&result);
1495
1496         set_var_from_num(num1, &arg1);
1497         set_var_from_num(num2, &arg2);
1498
1499         mod_var(&arg1, &arg2, &result);
1500
1501         res = make_result(&result);
1502
1503         free_var(&result);
1504         free_var(&arg2);
1505         free_var(&arg1);
1506
1507         PG_RETURN_NUMERIC(res);
1508 }
1509
1510
1511 /*
1512  * numeric_inc() -
1513  *
1514  *      Increment a number by one
1515  */
1516 Datum
1517 numeric_inc(PG_FUNCTION_ARGS)
1518 {
1519         Numeric         num = PG_GETARG_NUMERIC(0);
1520         NumericVar      arg;
1521         Numeric         res;
1522
1523         /*
1524          * Handle NaN
1525          */
1526         if (NUMERIC_IS_NAN(num))
1527                 PG_RETURN_NUMERIC(make_result(&const_nan));
1528
1529         /*
1530          * Compute the result and return it
1531          */
1532         init_var(&arg);
1533
1534         set_var_from_num(num, &arg);
1535
1536         add_var(&arg, &const_one, &arg);
1537
1538         res = make_result(&arg);
1539
1540         free_var(&arg);
1541
1542         PG_RETURN_NUMERIC(res);
1543 }
1544
1545
1546 /*
1547  * numeric_smaller() -
1548  *
1549  *      Return the smaller of two numbers
1550  */
1551 Datum
1552 numeric_smaller(PG_FUNCTION_ARGS)
1553 {
1554         Numeric         num1 = PG_GETARG_NUMERIC(0);
1555         Numeric         num2 = PG_GETARG_NUMERIC(1);
1556
1557         /*
1558          * Use cmp_numerics so that this will agree with the comparison operators,
1559          * particularly as regards comparisons involving NaN.
1560          */
1561         if (cmp_numerics(num1, num2) < 0)
1562                 PG_RETURN_NUMERIC(num1);
1563         else
1564                 PG_RETURN_NUMERIC(num2);
1565 }
1566
1567
1568 /*
1569  * numeric_larger() -
1570  *
1571  *      Return the larger of two numbers
1572  */
1573 Datum
1574 numeric_larger(PG_FUNCTION_ARGS)
1575 {
1576         Numeric         num1 = PG_GETARG_NUMERIC(0);
1577         Numeric         num2 = PG_GETARG_NUMERIC(1);
1578
1579         /*
1580          * Use cmp_numerics so that this will agree with the comparison operators,
1581          * particularly as regards comparisons involving NaN.
1582          */
1583         if (cmp_numerics(num1, num2) > 0)
1584                 PG_RETURN_NUMERIC(num1);
1585         else
1586                 PG_RETURN_NUMERIC(num2);
1587 }
1588
1589
1590 /* ----------------------------------------------------------------------
1591  *
1592  * Advanced math functions
1593  *
1594  * ----------------------------------------------------------------------
1595  */
1596
1597 /*
1598  * numeric_fac()
1599  *
1600  * Compute factorial
1601  */
1602 Datum
1603 numeric_fac(PG_FUNCTION_ARGS)
1604 {
1605         int64           num = PG_GETARG_INT64(0);
1606         Numeric         res;
1607         NumericVar      fact;
1608         NumericVar      result;
1609
1610         if (num <= 1)
1611         {
1612                 res = make_result(&const_one);
1613                 PG_RETURN_NUMERIC(res);
1614         }
1615         /* Fail immediately if the result would overflow */
1616         if (num > 32177)
1617                 ereport(ERROR,
1618                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1619                                  errmsg("value overflows numeric format")));
1620
1621         init_var(&fact);
1622         init_var(&result);
1623
1624         int8_to_numericvar(num, &result);
1625
1626         for (num = num - 1; num > 1; num--)
1627         {
1628                 /* this loop can take awhile, so allow it to be interrupted */
1629                 CHECK_FOR_INTERRUPTS();
1630
1631                 int8_to_numericvar(num, &fact);
1632
1633                 mul_var(&result, &fact, &result, 0);
1634         }
1635
1636         res = make_result(&result);
1637
1638         free_var(&fact);
1639         free_var(&result);
1640
1641         PG_RETURN_NUMERIC(res);
1642 }
1643
1644
1645 /*
1646  * numeric_sqrt() -
1647  *
1648  *      Compute the square root of a numeric.
1649  */
1650 Datum
1651 numeric_sqrt(PG_FUNCTION_ARGS)
1652 {
1653         Numeric         num = PG_GETARG_NUMERIC(0);
1654         Numeric         res;
1655         NumericVar      arg;
1656         NumericVar      result;
1657         int                     sweight;
1658         int                     rscale;
1659
1660         /*
1661          * Handle NaN
1662          */
1663         if (NUMERIC_IS_NAN(num))
1664                 PG_RETURN_NUMERIC(make_result(&const_nan));
1665
1666         /*
1667          * Unpack the argument and determine the result scale.  We choose a scale
1668          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1669          * case not less than the input's dscale.
1670          */
1671         init_var(&arg);
1672         init_var(&result);
1673
1674         set_var_from_num(num, &arg);
1675
1676         /* Assume the input was normalized, so arg.weight is accurate */
1677         sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
1678
1679         rscale = NUMERIC_MIN_SIG_DIGITS - sweight;
1680         rscale = Max(rscale, arg.dscale);
1681         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1682         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1683
1684         /*
1685          * Let sqrt_var() do the calculation and return the result.
1686          */
1687         sqrt_var(&arg, &result, rscale);
1688
1689         res = make_result(&result);
1690
1691         free_var(&result);
1692         free_var(&arg);
1693
1694         PG_RETURN_NUMERIC(res);
1695 }
1696
1697
1698 /*
1699  * numeric_exp() -
1700  *
1701  *      Raise e to the power of x
1702  */
1703 Datum
1704 numeric_exp(PG_FUNCTION_ARGS)
1705 {
1706         Numeric         num = PG_GETARG_NUMERIC(0);
1707         Numeric         res;
1708         NumericVar      arg;
1709         NumericVar      result;
1710         int                     rscale;
1711         double          val;
1712
1713         /*
1714          * Handle NaN
1715          */
1716         if (NUMERIC_IS_NAN(num))
1717                 PG_RETURN_NUMERIC(make_result(&const_nan));
1718
1719         /*
1720          * Unpack the argument and determine the result scale.  We choose a scale
1721          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1722          * case not less than the input's dscale.
1723          */
1724         init_var(&arg);
1725         init_var(&result);
1726
1727         set_var_from_num(num, &arg);
1728
1729         /* convert input to float8, ignoring overflow */
1730         val = numericvar_to_double_no_overflow(&arg);
1731
1732         /*
1733          * log10(result) = num * log10(e), so this is approximately the decimal
1734          * weight of the result:
1735          */
1736         val *= 0.434294481903252;
1737
1738         /* limit to something that won't cause integer overflow */
1739         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
1740         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
1741
1742         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
1743         rscale = Max(rscale, arg.dscale);
1744         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1745         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1746
1747         /*
1748          * Let exp_var() do the calculation and return the result.
1749          */
1750         exp_var(&arg, &result, rscale);
1751
1752         res = make_result(&result);
1753
1754         free_var(&result);
1755         free_var(&arg);
1756
1757         PG_RETURN_NUMERIC(res);
1758 }
1759
1760
1761 /*
1762  * numeric_ln() -
1763  *
1764  *      Compute the natural logarithm of x
1765  */
1766 Datum
1767 numeric_ln(PG_FUNCTION_ARGS)
1768 {
1769         Numeric         num = PG_GETARG_NUMERIC(0);
1770         Numeric         res;
1771         NumericVar      arg;
1772         NumericVar      result;
1773         int                     dec_digits;
1774         int                     rscale;
1775
1776         /*
1777          * Handle NaN
1778          */
1779         if (NUMERIC_IS_NAN(num))
1780                 PG_RETURN_NUMERIC(make_result(&const_nan));
1781
1782         init_var(&arg);
1783         init_var(&result);
1784
1785         set_var_from_num(num, &arg);
1786
1787         /* Approx decimal digits before decimal point */
1788         dec_digits = (arg.weight + 1) * DEC_DIGITS;
1789
1790         if (dec_digits > 1)
1791                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
1792         else if (dec_digits < 1)
1793                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
1794         else
1795                 rscale = NUMERIC_MIN_SIG_DIGITS;
1796
1797         rscale = Max(rscale, arg.dscale);
1798         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1799         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1800
1801         ln_var(&arg, &result, rscale);
1802
1803         res = make_result(&result);
1804
1805         free_var(&result);
1806         free_var(&arg);
1807
1808         PG_RETURN_NUMERIC(res);
1809 }
1810
1811
1812 /*
1813  * numeric_log() -
1814  *
1815  *      Compute the logarithm of x in a given base
1816  */
1817 Datum
1818 numeric_log(PG_FUNCTION_ARGS)
1819 {
1820         Numeric         num1 = PG_GETARG_NUMERIC(0);
1821         Numeric         num2 = PG_GETARG_NUMERIC(1);
1822         Numeric         res;
1823         NumericVar      arg1;
1824         NumericVar      arg2;
1825         NumericVar      result;
1826
1827         /*
1828          * Handle NaN
1829          */
1830         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1831                 PG_RETURN_NUMERIC(make_result(&const_nan));
1832
1833         /*
1834          * Initialize things
1835          */
1836         init_var(&arg1);
1837         init_var(&arg2);
1838         init_var(&result);
1839
1840         set_var_from_num(num1, &arg1);
1841         set_var_from_num(num2, &arg2);
1842
1843         /*
1844          * Call log_var() to compute and return the result; note it handles scale
1845          * selection itself.
1846          */
1847         log_var(&arg1, &arg2, &result);
1848
1849         res = make_result(&result);
1850
1851         free_var(&result);
1852         free_var(&arg2);
1853         free_var(&arg1);
1854
1855         PG_RETURN_NUMERIC(res);
1856 }
1857
1858
1859 /*
1860  * numeric_power() -
1861  *
1862  *      Raise b to the power of x
1863  */
1864 Datum
1865 numeric_power(PG_FUNCTION_ARGS)
1866 {
1867         Numeric         num1 = PG_GETARG_NUMERIC(0);
1868         Numeric         num2 = PG_GETARG_NUMERIC(1);
1869         Numeric         res;
1870         NumericVar      arg1;
1871         NumericVar      arg2;
1872         NumericVar      arg2_trunc;
1873         NumericVar      result;
1874
1875         /*
1876          * Handle NaN
1877          */
1878         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1879                 PG_RETURN_NUMERIC(make_result(&const_nan));
1880
1881         /*
1882          * Initialize things
1883          */
1884         init_var(&arg1);
1885         init_var(&arg2);
1886         init_var(&arg2_trunc);
1887         init_var(&result);
1888
1889         set_var_from_num(num1, &arg1);
1890         set_var_from_num(num2, &arg2);
1891         set_var_from_var(&arg2, &arg2_trunc);
1892
1893         trunc_var(&arg2_trunc, 0);
1894
1895         /*
1896          * Return special SQLSTATE error codes for a few conditions mandated by
1897          * the standard.
1898          */
1899         if ((cmp_var(&arg1, &const_zero) == 0 &&
1900                  cmp_var(&arg2, &const_zero) < 0) ||
1901                 (cmp_var(&arg1, &const_zero) < 0 &&
1902                  cmp_var(&arg2, &arg2_trunc) != 0))
1903                 ereport(ERROR,
1904                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
1905                                  errmsg("invalid argument for power function")));
1906
1907         /*
1908          * Call power_var() to compute and return the result; note it handles
1909          * scale selection itself.
1910          */
1911         power_var(&arg1, &arg2, &result);
1912
1913         res = make_result(&result);
1914
1915         free_var(&result);
1916         free_var(&arg2);
1917         free_var(&arg2_trunc);
1918         free_var(&arg1);
1919
1920         PG_RETURN_NUMERIC(res);
1921 }
1922
1923
1924 /* ----------------------------------------------------------------------
1925  *
1926  * Type conversion functions
1927  *
1928  * ----------------------------------------------------------------------
1929  */
1930
1931
1932 Datum
1933 int4_numeric(PG_FUNCTION_ARGS)
1934 {
1935         int32           val = PG_GETARG_INT32(0);
1936         Numeric         res;
1937         NumericVar      result;
1938
1939         init_var(&result);
1940
1941         int8_to_numericvar((int64) val, &result);
1942
1943         res = make_result(&result);
1944
1945         free_var(&result);
1946
1947         PG_RETURN_NUMERIC(res);
1948 }
1949
1950
1951 Datum
1952 numeric_int4(PG_FUNCTION_ARGS)
1953 {
1954         Numeric         num = PG_GETARG_NUMERIC(0);
1955         NumericVar      x;
1956         int32           result;
1957
1958         /* XXX would it be better to return NULL? */
1959         if (NUMERIC_IS_NAN(num))
1960                 ereport(ERROR,
1961                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1962                                  errmsg("cannot convert NaN to integer")));
1963
1964         /* Convert to variable format, then convert to int4 */
1965         init_var(&x);
1966         set_var_from_num(num, &x);
1967         result = numericvar_to_int4(&x);
1968         free_var(&x);
1969         PG_RETURN_INT32(result);
1970 }
1971
1972 /*
1973  * Given a NumericVar, convert it to an int32. If the NumericVar
1974  * exceeds the range of an int32, raise the appropriate error via
1975  * ereport(). The input NumericVar is *not* free'd.
1976  */
1977 static int32
1978 numericvar_to_int4(NumericVar *var)
1979 {
1980         int32           result;
1981         int64           val;
1982
1983         if (!numericvar_to_int8(var, &val))
1984                 ereport(ERROR,
1985                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1986                                  errmsg("integer out of range")));
1987
1988         /* Down-convert to int4 */
1989         result = (int32) val;
1990
1991         /* Test for overflow by reverse-conversion. */
1992         if ((int64) result != val)
1993                 ereport(ERROR,
1994                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1995                                  errmsg("integer out of range")));
1996
1997         return result;
1998 }
1999
2000 Datum
2001 int8_numeric(PG_FUNCTION_ARGS)
2002 {
2003         int64           val = PG_GETARG_INT64(0);
2004         Numeric         res;
2005         NumericVar      result;
2006
2007         init_var(&result);
2008
2009         int8_to_numericvar(val, &result);
2010
2011         res = make_result(&result);
2012
2013         free_var(&result);
2014
2015         PG_RETURN_NUMERIC(res);
2016 }
2017
2018
2019 Datum
2020 numeric_int8(PG_FUNCTION_ARGS)
2021 {
2022         Numeric         num = PG_GETARG_NUMERIC(0);
2023         NumericVar      x;
2024         int64           result;
2025
2026         /* XXX would it be better to return NULL? */
2027         if (NUMERIC_IS_NAN(num))
2028                 ereport(ERROR,
2029                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2030                                  errmsg("cannot convert NaN to bigint")));
2031
2032         /* Convert to variable format and thence to int8 */
2033         init_var(&x);
2034         set_var_from_num(num, &x);
2035
2036         if (!numericvar_to_int8(&x, &result))
2037                 ereport(ERROR,
2038                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2039                                  errmsg("bigint out of range")));
2040
2041         free_var(&x);
2042
2043         PG_RETURN_INT64(result);
2044 }
2045
2046
2047 Datum
2048 int2_numeric(PG_FUNCTION_ARGS)
2049 {
2050         int16           val = PG_GETARG_INT16(0);
2051         Numeric         res;
2052         NumericVar      result;
2053
2054         init_var(&result);
2055
2056         int8_to_numericvar((int64) val, &result);
2057
2058         res = make_result(&result);
2059
2060         free_var(&result);
2061
2062         PG_RETURN_NUMERIC(res);
2063 }
2064
2065
2066 Datum
2067 numeric_int2(PG_FUNCTION_ARGS)
2068 {
2069         Numeric         num = PG_GETARG_NUMERIC(0);
2070         NumericVar      x;
2071         int64           val;
2072         int16           result;
2073
2074         /* XXX would it be better to return NULL? */
2075         if (NUMERIC_IS_NAN(num))
2076                 ereport(ERROR,
2077                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2078                                  errmsg("cannot convert NaN to smallint")));
2079
2080         /* Convert to variable format and thence to int8 */
2081         init_var(&x);
2082         set_var_from_num(num, &x);
2083
2084         if (!numericvar_to_int8(&x, &val))
2085                 ereport(ERROR,
2086                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2087                                  errmsg("smallint out of range")));
2088
2089         free_var(&x);
2090
2091         /* Down-convert to int2 */
2092         result = (int16) val;
2093
2094         /* Test for overflow by reverse-conversion. */
2095         if ((int64) result != val)
2096                 ereport(ERROR,
2097                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2098                                  errmsg("smallint out of range")));
2099
2100         PG_RETURN_INT16(result);
2101 }
2102
2103
2104 Datum
2105 float8_numeric(PG_FUNCTION_ARGS)
2106 {
2107         float8          val = PG_GETARG_FLOAT8(0);
2108         Numeric         res;
2109         NumericVar      result;
2110         char            buf[DBL_DIG + 100];
2111
2112         if (isnan(val))
2113                 PG_RETURN_NUMERIC(make_result(&const_nan));
2114
2115         sprintf(buf, "%.*g", DBL_DIG, val);
2116
2117         init_var(&result);
2118
2119         set_var_from_str(buf, &result);
2120         res = make_result(&result);
2121
2122         free_var(&result);
2123
2124         PG_RETURN_NUMERIC(res);
2125 }
2126
2127
2128 Datum
2129 numeric_float8(PG_FUNCTION_ARGS)
2130 {
2131         Numeric         num = PG_GETARG_NUMERIC(0);
2132         char       *tmp;
2133         Datum           result;
2134
2135         if (NUMERIC_IS_NAN(num))
2136                 PG_RETURN_FLOAT8(get_float8_nan());
2137
2138         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2139                                                                                           NumericGetDatum(num)));
2140
2141         result = DirectFunctionCall1(float8in, CStringGetDatum(tmp));
2142
2143         pfree(tmp);
2144
2145         PG_RETURN_DATUM(result);
2146 }
2147
2148
2149 /* Convert numeric to float8; if out of range, return +/- HUGE_VAL */
2150 Datum
2151 numeric_float8_no_overflow(PG_FUNCTION_ARGS)
2152 {
2153         Numeric         num = PG_GETARG_NUMERIC(0);
2154         double          val;
2155
2156         if (NUMERIC_IS_NAN(num))
2157                 PG_RETURN_FLOAT8(get_float8_nan());
2158
2159         val = numeric_to_double_no_overflow(num);
2160
2161         PG_RETURN_FLOAT8(val);
2162 }
2163
2164 Datum
2165 float4_numeric(PG_FUNCTION_ARGS)
2166 {
2167         float4          val = PG_GETARG_FLOAT4(0);
2168         Numeric         res;
2169         NumericVar      result;
2170         char            buf[FLT_DIG + 100];
2171
2172         if (isnan(val))
2173                 PG_RETURN_NUMERIC(make_result(&const_nan));
2174
2175         sprintf(buf, "%.*g", FLT_DIG, val);
2176
2177         init_var(&result);
2178
2179         set_var_from_str(buf, &result);
2180         res = make_result(&result);
2181
2182         free_var(&result);
2183
2184         PG_RETURN_NUMERIC(res);
2185 }
2186
2187
2188 Datum
2189 numeric_float4(PG_FUNCTION_ARGS)
2190 {
2191         Numeric         num = PG_GETARG_NUMERIC(0);
2192         char       *tmp;
2193         Datum           result;
2194
2195         if (NUMERIC_IS_NAN(num))
2196                 PG_RETURN_FLOAT4(get_float4_nan());
2197
2198         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2199                                                                                           NumericGetDatum(num)));
2200
2201         result = DirectFunctionCall1(float4in, CStringGetDatum(tmp));
2202
2203         pfree(tmp);
2204
2205         PG_RETURN_DATUM(result);
2206 }
2207
2208
2209 /* ----------------------------------------------------------------------
2210  *
2211  * Aggregate functions
2212  *
2213  * The transition datatype for all these aggregates is a 3-element array
2214  * of Numeric, holding the values N, sum(X), sum(X*X) in that order.
2215  *
2216  * We represent N as a numeric mainly to avoid having to build a special
2217  * datatype; it's unlikely it'd overflow an int4, but ...
2218  *
2219  * ----------------------------------------------------------------------
2220  */
2221
2222 static ArrayType *
2223 do_numeric_accum(ArrayType *transarray, Numeric newval)
2224 {
2225         Datum      *transdatums;
2226         int                     ndatums;
2227         Datum           N,
2228                                 sumX,
2229                                 sumX2;
2230         ArrayType  *result;
2231
2232         /* We assume the input is array of numeric */
2233         deconstruct_array(transarray,
2234                                           NUMERICOID, -1, false, 'i',
2235                                           &transdatums, NULL, &ndatums);
2236         if (ndatums != 3)
2237                 elog(ERROR, "expected 3-element numeric array");
2238         N = transdatums[0];
2239         sumX = transdatums[1];
2240         sumX2 = transdatums[2];
2241
2242         N = DirectFunctionCall1(numeric_inc, N);
2243         sumX = DirectFunctionCall2(numeric_add, sumX,
2244                                                            NumericGetDatum(newval));
2245         sumX2 = DirectFunctionCall2(numeric_add, sumX2,
2246                                                                 DirectFunctionCall2(numeric_mul,
2247                                                                                                         NumericGetDatum(newval),
2248                                                                                                         NumericGetDatum(newval)));
2249
2250         transdatums[0] = N;
2251         transdatums[1] = sumX;
2252         transdatums[2] = sumX2;
2253
2254         result = construct_array(transdatums, 3,
2255                                                          NUMERICOID, -1, false, 'i');
2256
2257         return result;
2258 }
2259
2260 /*
2261  * Improve avg performance by not caclulating sum(X*X).
2262  */
2263 static ArrayType *
2264 do_numeric_avg_accum(ArrayType *transarray, Numeric newval)
2265 {
2266         Datum      *transdatums;
2267         int                     ndatums;
2268         Datum           N,
2269                                 sumX;
2270         ArrayType  *result;
2271
2272         /* We assume the input is array of numeric */
2273         deconstruct_array(transarray,
2274                                           NUMERICOID, -1, false, 'i',
2275                                           &transdatums, NULL, &ndatums);
2276         if (ndatums != 2)
2277                 elog(ERROR, "expected 2-element numeric array");
2278         N = transdatums[0];
2279         sumX = transdatums[1];
2280
2281         N = DirectFunctionCall1(numeric_inc, N);
2282         sumX = DirectFunctionCall2(numeric_add, sumX,
2283                                                            NumericGetDatum(newval));
2284
2285         transdatums[0] = N;
2286         transdatums[1] = sumX;
2287
2288         result = construct_array(transdatums, 2,
2289                                                          NUMERICOID, -1, false, 'i');
2290
2291         return result;
2292 }
2293
2294 Datum
2295 numeric_accum(PG_FUNCTION_ARGS)
2296 {
2297         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2298         Numeric         newval = PG_GETARG_NUMERIC(1);
2299
2300         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2301 }
2302
2303 /*
2304  * Optimized case for average of numeric.
2305  */
2306 Datum
2307 numeric_avg_accum(PG_FUNCTION_ARGS)
2308 {
2309         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2310         Numeric         newval = PG_GETARG_NUMERIC(1);
2311
2312         PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
2313 }
2314
2315 /*
2316  * Integer data types all use Numeric accumulators to share code and
2317  * avoid risk of overflow.      For int2 and int4 inputs, Numeric accumulation
2318  * is overkill for the N and sum(X) values, but definitely not overkill
2319  * for the sum(X*X) value.      Hence, we use int2_accum and int4_accum only
2320  * for stddev/variance --- there are faster special-purpose accumulator
2321  * routines for SUM and AVG of these datatypes.
2322  */
2323
2324 Datum
2325 int2_accum(PG_FUNCTION_ARGS)
2326 {
2327         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2328         Datum           newval2 = PG_GETARG_DATUM(1);
2329         Numeric         newval;
2330
2331         newval = DatumGetNumeric(DirectFunctionCall1(int2_numeric, newval2));
2332
2333         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2334 }
2335
2336 Datum
2337 int4_accum(PG_FUNCTION_ARGS)
2338 {
2339         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2340         Datum           newval4 = PG_GETARG_DATUM(1);
2341         Numeric         newval;
2342
2343         newval = DatumGetNumeric(DirectFunctionCall1(int4_numeric, newval4));
2344
2345         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2346 }
2347
2348 Datum
2349 int8_accum(PG_FUNCTION_ARGS)
2350 {
2351         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2352         Datum           newval8 = PG_GETARG_DATUM(1);
2353         Numeric         newval;
2354
2355         newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
2356
2357         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2358 }
2359
2360 /*
2361  * Optimized case for average of int8.
2362  */
2363 Datum
2364 int8_avg_accum(PG_FUNCTION_ARGS)
2365 {
2366         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2367         Datum           newval8 = PG_GETARG_DATUM(1);
2368         Numeric         newval;
2369
2370         newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
2371
2372         PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
2373 }
2374
2375
2376 Datum
2377 numeric_avg(PG_FUNCTION_ARGS)
2378 {
2379         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2380         Datum      *transdatums;
2381         int                     ndatums;
2382         Numeric         N,
2383                                 sumX;
2384
2385         /* We assume the input is array of numeric */
2386         deconstruct_array(transarray,
2387                                           NUMERICOID, -1, false, 'i',
2388                                           &transdatums, NULL, &ndatums);
2389         if (ndatums != 2)
2390                 elog(ERROR, "expected 2-element numeric array");
2391         N = DatumGetNumeric(transdatums[0]);
2392         sumX = DatumGetNumeric(transdatums[1]);
2393
2394         /* SQL92 defines AVG of no values to be NULL */
2395         /* N is zero iff no digits (cf. numeric_uminus) */
2396         if (VARSIZE(N) == NUMERIC_HDRSZ)
2397                 PG_RETURN_NULL();
2398
2399         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div,
2400                                                                                 NumericGetDatum(sumX),
2401                                                                                 NumericGetDatum(N)));
2402 }
2403
2404 /*
2405  * Workhorse routine for the standard deviance and variance
2406  * aggregates. 'transarray' is the aggregate's transition
2407  * array. 'variance' specifies whether we should calculate the
2408  * variance or the standard deviation. 'sample' indicates whether the
2409  * caller is interested in the sample or the population
2410  * variance/stddev.
2411  *
2412  * If appropriate variance statistic is undefined for the input,
2413  * *is_null is set to true and NULL is returned.
2414  */
2415 static Numeric
2416 numeric_stddev_internal(ArrayType *transarray,
2417                                                 bool variance, bool sample,
2418                                                 bool *is_null)
2419 {
2420         Datum      *transdatums;
2421         int                     ndatums;
2422         Numeric         N,
2423                                 sumX,
2424                                 sumX2,
2425                                 res;
2426         NumericVar      vN,
2427                                 vsumX,
2428                                 vsumX2,
2429                                 vNminus1;
2430         NumericVar *comp;
2431         int                     rscale;
2432
2433         *is_null = false;
2434
2435         /* We assume the input is array of numeric */
2436         deconstruct_array(transarray,
2437                                           NUMERICOID, -1, false, 'i',
2438                                           &transdatums, NULL, &ndatums);
2439         if (ndatums != 3)
2440                 elog(ERROR, "expected 3-element numeric array");
2441         N = DatumGetNumeric(transdatums[0]);
2442         sumX = DatumGetNumeric(transdatums[1]);
2443         sumX2 = DatumGetNumeric(transdatums[2]);
2444
2445         if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2))
2446                 return make_result(&const_nan);
2447
2448         init_var(&vN);
2449         set_var_from_num(N, &vN);
2450
2451         /*
2452          * Sample stddev and variance are undefined when N <= 1; population stddev
2453          * is undefined when N == 0. Return NULL in either case.
2454          */
2455         if (sample)
2456                 comp = &const_one;
2457         else
2458                 comp = &const_zero;
2459
2460         if (cmp_var(&vN, comp) <= 0)
2461         {
2462                 free_var(&vN);
2463                 *is_null = true;
2464                 return NULL;
2465         }
2466
2467         init_var(&vNminus1);
2468         sub_var(&vN, &const_one, &vNminus1);
2469
2470         init_var(&vsumX);
2471         set_var_from_num(sumX, &vsumX);
2472         init_var(&vsumX2);
2473         set_var_from_num(sumX2, &vsumX2);
2474
2475         /* compute rscale for mul_var calls */
2476         rscale = vsumX.dscale * 2;
2477
2478         mul_var(&vsumX, &vsumX, &vsumX, rscale);        /* vsumX = sumX * sumX */
2479         mul_var(&vN, &vsumX2, &vsumX2, rscale);         /* vsumX2 = N * sumX2 */
2480         sub_var(&vsumX2, &vsumX, &vsumX2);      /* N * sumX2 - sumX * sumX */
2481
2482         if (cmp_var(&vsumX2, &const_zero) <= 0)
2483         {
2484                 /* Watch out for roundoff error producing a negative numerator */
2485                 res = make_result(&const_zero);
2486         }
2487         else
2488         {
2489                 if (sample)
2490                         mul_var(&vN, &vNminus1, &vNminus1, 0);          /* N * (N - 1) */
2491                 else
2492                         mul_var(&vN, &vN, &vNminus1, 0);        /* N * N */
2493                 rscale = select_div_scale(&vsumX2, &vNminus1);
2494                 div_var(&vsumX2, &vNminus1, &vsumX, rscale, true);              /* variance */
2495                 if (!variance)
2496                         sqrt_var(&vsumX, &vsumX, rscale);       /* stddev */
2497
2498                 res = make_result(&vsumX);
2499         }
2500
2501         free_var(&vN);
2502         free_var(&vNminus1);
2503         free_var(&vsumX);
2504         free_var(&vsumX2);
2505
2506         return res;
2507 }
2508
2509 Datum
2510 numeric_var_samp(PG_FUNCTION_ARGS)
2511 {
2512         Numeric         res;
2513         bool            is_null;
2514
2515         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2516                                                                   true, true, &is_null);
2517
2518         if (is_null)
2519                 PG_RETURN_NULL();
2520         else
2521                 PG_RETURN_NUMERIC(res);
2522 }
2523
2524 Datum
2525 numeric_stddev_samp(PG_FUNCTION_ARGS)
2526 {
2527         Numeric         res;
2528         bool            is_null;
2529
2530         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2531                                                                   false, true, &is_null);
2532
2533         if (is_null)
2534                 PG_RETURN_NULL();
2535         else
2536                 PG_RETURN_NUMERIC(res);
2537 }
2538
2539 Datum
2540 numeric_var_pop(PG_FUNCTION_ARGS)
2541 {
2542         Numeric         res;
2543         bool            is_null;
2544
2545         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2546                                                                   true, false, &is_null);
2547
2548         if (is_null)
2549                 PG_RETURN_NULL();
2550         else
2551                 PG_RETURN_NUMERIC(res);
2552 }
2553
2554 Datum
2555 numeric_stddev_pop(PG_FUNCTION_ARGS)
2556 {
2557         Numeric         res;
2558         bool            is_null;
2559
2560         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2561                                                                   false, false, &is_null);
2562
2563         if (is_null)
2564                 PG_RETURN_NULL();
2565         else
2566                 PG_RETURN_NUMERIC(res);
2567 }
2568
2569 /*
2570  * SUM transition functions for integer datatypes.
2571  *
2572  * To avoid overflow, we use accumulators wider than the input datatype.
2573  * A Numeric accumulator is needed for int8 input; for int4 and int2
2574  * inputs, we use int8 accumulators which should be sufficient for practical
2575  * purposes.  (The latter two therefore don't really belong in this file,
2576  * but we keep them here anyway.)
2577  *
2578  * Because SQL92 defines the SUM() of no values to be NULL, not zero,
2579  * the initial condition of the transition data value needs to be NULL. This
2580  * means we can't rely on ExecAgg to automatically insert the first non-null
2581  * data value into the transition data: it doesn't know how to do the type
2582  * conversion.  The upshot is that these routines have to be marked non-strict
2583  * and handle substitution of the first non-null input themselves.
2584  */
2585
2586 Datum
2587 int2_sum(PG_FUNCTION_ARGS)
2588 {
2589         int64           newval;
2590
2591         if (PG_ARGISNULL(0))
2592         {
2593                 /* No non-null input seen so far... */
2594                 if (PG_ARGISNULL(1))
2595                         PG_RETURN_NULL();       /* still no non-null */
2596                 /* This is the first non-null input. */
2597                 newval = (int64) PG_GETARG_INT16(1);
2598                 PG_RETURN_INT64(newval);
2599         }
2600
2601         /*
2602          * If we're invoked by nodeAgg, we can cheat and modify out first
2603          * parameter in-place to avoid palloc overhead. If not, we need to return
2604          * the new value of the transition variable.
2605          */
2606         if (fcinfo->context && IsA(fcinfo->context, AggState))
2607         {
2608                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
2609
2610                 /* Leave the running sum unchanged in the new input is null */
2611                 if (!PG_ARGISNULL(1))
2612                         *oldsum = *oldsum + (int64) PG_GETARG_INT16(1);
2613
2614                 PG_RETURN_POINTER(oldsum);
2615         }
2616         else
2617         {
2618                 int64           oldsum = PG_GETARG_INT64(0);
2619
2620                 /* Leave sum unchanged if new input is null. */
2621                 if (PG_ARGISNULL(1))
2622                         PG_RETURN_INT64(oldsum);
2623
2624                 /* OK to do the addition. */
2625                 newval = oldsum + (int64) PG_GETARG_INT16(1);
2626
2627                 PG_RETURN_INT64(newval);
2628         }
2629 }
2630
2631 Datum
2632 int4_sum(PG_FUNCTION_ARGS)
2633 {
2634         int64           newval;
2635
2636         if (PG_ARGISNULL(0))
2637         {
2638                 /* No non-null input seen so far... */
2639                 if (PG_ARGISNULL(1))
2640                         PG_RETURN_NULL();       /* still no non-null */
2641                 /* This is the first non-null input. */
2642                 newval = (int64) PG_GETARG_INT32(1);
2643                 PG_RETURN_INT64(newval);
2644         }
2645
2646         /*
2647          * If we're invoked by nodeAgg, we can cheat and modify out first
2648          * parameter in-place to avoid palloc overhead. If not, we need to return
2649          * the new value of the transition variable.
2650          */
2651         if (fcinfo->context && IsA(fcinfo->context, AggState))
2652         {
2653                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
2654
2655                 /* Leave the running sum unchanged in the new input is null */
2656                 if (!PG_ARGISNULL(1))
2657                         *oldsum = *oldsum + (int64) PG_GETARG_INT32(1);
2658
2659                 PG_RETURN_POINTER(oldsum);
2660         }
2661         else
2662         {
2663                 int64           oldsum = PG_GETARG_INT64(0);
2664
2665                 /* Leave sum unchanged if new input is null. */
2666                 if (PG_ARGISNULL(1))
2667                         PG_RETURN_INT64(oldsum);
2668
2669                 /* OK to do the addition. */
2670                 newval = oldsum + (int64) PG_GETARG_INT32(1);
2671
2672                 PG_RETURN_INT64(newval);
2673         }
2674 }
2675
2676 Datum
2677 int8_sum(PG_FUNCTION_ARGS)
2678 {
2679         Numeric         oldsum;
2680         Datum           newval;
2681
2682         if (PG_ARGISNULL(0))
2683         {
2684                 /* No non-null input seen so far... */
2685                 if (PG_ARGISNULL(1))
2686                         PG_RETURN_NULL();       /* still no non-null */
2687                 /* This is the first non-null input. */
2688                 newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2689                 PG_RETURN_DATUM(newval);
2690         }
2691
2692         /*
2693          * Note that we cannot special-case the nodeAgg case here, as we do for
2694          * int2_sum and int4_sum: numeric is of variable size, so we cannot modify
2695          * our first parameter in-place.
2696          */
2697
2698         oldsum = PG_GETARG_NUMERIC(0);
2699
2700         /* Leave sum unchanged if new input is null. */
2701         if (PG_ARGISNULL(1))
2702                 PG_RETURN_NUMERIC(oldsum);
2703
2704         /* OK to do the addition. */
2705         newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2706
2707         PG_RETURN_DATUM(DirectFunctionCall2(numeric_add,
2708                                                                                 NumericGetDatum(oldsum), newval));
2709 }
2710
2711
2712 /*
2713  * Routines for avg(int2) and avg(int4).  The transition datatype
2714  * is a two-element int8 array, holding count and sum.
2715  */
2716
2717 typedef struct Int8TransTypeData
2718 {
2719 #ifndef INT64_IS_BUSTED
2720         int64           count;
2721         int64           sum;
2722 #else
2723         /* "int64" isn't really 64 bits, so fake up properly-aligned fields */
2724         int32           count;
2725         int32           pad1;
2726         int32           sum;
2727         int32           pad2;
2728 #endif
2729 } Int8TransTypeData;
2730
2731 Datum
2732 int2_avg_accum(PG_FUNCTION_ARGS)
2733 {
2734         ArrayType  *transarray;
2735         int16           newval = PG_GETARG_INT16(1);
2736         Int8TransTypeData *transdata;
2737
2738         /*
2739          * If we're invoked by nodeAgg, we can cheat and modify our first
2740          * parameter in-place to reduce palloc overhead. Otherwise we need to make
2741          * a copy of it before scribbling on it.
2742          */
2743         if (fcinfo->context && IsA(fcinfo->context, AggState))
2744                 transarray = PG_GETARG_ARRAYTYPE_P(0);
2745         else
2746                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2747
2748         if (ARR_HASNULL(transarray) ||
2749                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2750                 elog(ERROR, "expected 2-element int8 array");
2751
2752         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2753         transdata->count++;
2754         transdata->sum += newval;
2755
2756         PG_RETURN_ARRAYTYPE_P(transarray);
2757 }
2758
2759 Datum
2760 int4_avg_accum(PG_FUNCTION_ARGS)
2761 {
2762         ArrayType  *transarray;
2763         int32           newval = PG_GETARG_INT32(1);
2764         Int8TransTypeData *transdata;
2765
2766         /*
2767          * If we're invoked by nodeAgg, we can cheat and modify our first
2768          * parameter in-place to reduce palloc overhead. Otherwise we need to make
2769          * a copy of it before scribbling on it.
2770          */
2771         if (fcinfo->context && IsA(fcinfo->context, AggState))
2772                 transarray = PG_GETARG_ARRAYTYPE_P(0);
2773         else
2774                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2775
2776         if (ARR_HASNULL(transarray) ||
2777                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2778                 elog(ERROR, "expected 2-element int8 array");
2779
2780         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2781         transdata->count++;
2782         transdata->sum += newval;
2783
2784         PG_RETURN_ARRAYTYPE_P(transarray);
2785 }
2786
2787 Datum
2788 int8_avg(PG_FUNCTION_ARGS)
2789 {
2790         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2791         Int8TransTypeData *transdata;
2792         Datum           countd,
2793                                 sumd;
2794
2795         if (ARR_HASNULL(transarray) ||
2796                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2797                 elog(ERROR, "expected 2-element int8 array");
2798         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2799
2800         /* SQL92 defines AVG of no values to be NULL */
2801         if (transdata->count == 0)
2802                 PG_RETURN_NULL();
2803
2804         countd = DirectFunctionCall1(int8_numeric,
2805                                                                  Int64GetDatumFast(transdata->count));
2806         sumd = DirectFunctionCall1(int8_numeric,
2807                                                            Int64GetDatumFast(transdata->sum));
2808
2809         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumd, countd));
2810 }
2811
2812
2813 /* ----------------------------------------------------------------------
2814  *
2815  * Debug support
2816  *
2817  * ----------------------------------------------------------------------
2818  */
2819
2820 #ifdef NUMERIC_DEBUG
2821
2822 /*
2823  * dump_numeric() - Dump a value in the db storage format for debugging
2824  */
2825 static void
2826 dump_numeric(const char *str, Numeric num)
2827 {
2828         NumericDigit *digits = NUMERIC_DIGITS(num);
2829         int                     ndigits;
2830         int                     i;
2831
2832         ndigits = NUMERIC_NDIGITS(num);
2833
2834         printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num));
2835         switch (NUMERIC_SIGN(num))
2836         {
2837                 case NUMERIC_POS:
2838                         printf("POS");
2839                         break;
2840                 case NUMERIC_NEG:
2841                         printf("NEG");
2842                         break;
2843                 case NUMERIC_NAN:
2844                         printf("NaN");
2845                         break;
2846                 default:
2847                         printf("SIGN=0x%x", NUMERIC_SIGN(num));
2848                         break;
2849         }
2850
2851         for (i = 0; i < ndigits; i++)
2852                 printf(" %0*d", DEC_DIGITS, digits[i]);
2853         printf("\n");
2854 }
2855
2856
2857 /*
2858  * dump_var() - Dump a value in the variable format for debugging
2859  */
2860 static void
2861 dump_var(const char *str, NumericVar *var)
2862 {
2863         int                     i;
2864
2865         printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale);
2866         switch (var->sign)
2867         {
2868                 case NUMERIC_POS:
2869                         printf("POS");
2870                         break;
2871                 case NUMERIC_NEG:
2872                         printf("NEG");
2873                         break;
2874                 case NUMERIC_NAN:
2875                         printf("NaN");
2876                         break;
2877                 default:
2878                         printf("SIGN=0x%x", var->sign);
2879                         break;
2880         }
2881
2882         for (i = 0; i < var->ndigits; i++)
2883                 printf(" %0*d", DEC_DIGITS, var->digits[i]);
2884
2885         printf("\n");
2886 }
2887 #endif   /* NUMERIC_DEBUG */
2888
2889
2890 /* ----------------------------------------------------------------------
2891  *
2892  * Local functions follow
2893  *
2894  * In general, these do not support NaNs --- callers must eliminate
2895  * the possibility of NaN first.  (make_result() is an exception.)
2896  *
2897  * ----------------------------------------------------------------------
2898  */
2899
2900
2901 /*
2902  * alloc_var() -
2903  *
2904  *      Allocate a digit buffer of ndigits digits (plus a spare digit for rounding)
2905  */
2906 static void
2907 alloc_var(NumericVar *var, int ndigits)
2908 {
2909         digitbuf_free(var->buf);
2910         var->buf = digitbuf_alloc(ndigits + 1);
2911         var->buf[0] = 0;                        /* spare digit for rounding */
2912         var->digits = var->buf + 1;
2913         var->ndigits = ndigits;
2914 }
2915
2916
2917 /*
2918  * free_var() -
2919  *
2920  *      Return the digit buffer of a variable to the free pool
2921  */
2922 static void
2923 free_var(NumericVar *var)
2924 {
2925         digitbuf_free(var->buf);
2926         var->buf = NULL;
2927         var->digits = NULL;
2928         var->sign = NUMERIC_NAN;
2929 }
2930
2931
2932 /*
2933  * zero_var() -
2934  *
2935  *      Set a variable to ZERO.
2936  *      Note: its dscale is not touched.
2937  */
2938 static void
2939 zero_var(NumericVar *var)
2940 {
2941         digitbuf_free(var->buf);
2942         var->buf = NULL;
2943         var->digits = NULL;
2944         var->ndigits = 0;
2945         var->weight = 0;                        /* by convention; doesn't really matter */
2946         var->sign = NUMERIC_POS;        /* anything but NAN... */
2947 }
2948
2949
2950 /*
2951  * set_var_from_str()
2952  *
2953  *      Parse a string and put the number into a variable
2954  */
2955 static void
2956 set_var_from_str(const char *str, NumericVar *dest)
2957 {
2958         const char *cp = str;
2959         bool            have_dp = FALSE;
2960         int                     i;
2961         unsigned char *decdigits;
2962         int                     sign = NUMERIC_POS;
2963         int                     dweight = -1;
2964         int                     ddigits;
2965         int                     dscale = 0;
2966         int                     weight;
2967         int                     ndigits;
2968         int                     offset;
2969         NumericDigit *digits;
2970
2971         /*
2972          * We first parse the string to extract decimal digits and determine the
2973          * correct decimal weight.      Then convert to NBASE representation.
2974          */
2975
2976         /* skip leading spaces */
2977         while (*cp)
2978         {
2979                 if (!isspace((unsigned char) *cp))
2980                         break;
2981                 cp++;
2982         }
2983
2984         switch (*cp)
2985         {
2986                 case '+':
2987                         sign = NUMERIC_POS;
2988                         cp++;
2989                         break;
2990
2991                 case '-':
2992                         sign = NUMERIC_NEG;
2993                         cp++;
2994                         break;
2995         }
2996
2997         if (*cp == '.')
2998         {
2999                 have_dp = TRUE;
3000                 cp++;
3001         }
3002
3003         if (!isdigit((unsigned char) *cp))
3004                 ereport(ERROR,
3005                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3006                           errmsg("invalid input syntax for type numeric: \"%s\"", str)));
3007
3008         decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS * 2);
3009
3010         /* leading padding for digit alignment later */
3011         memset(decdigits, 0, DEC_DIGITS);
3012         i = DEC_DIGITS;
3013
3014         while (*cp)
3015         {
3016                 if (isdigit((unsigned char) *cp))
3017                 {
3018                         decdigits[i++] = *cp++ - '0';
3019                         if (!have_dp)
3020                                 dweight++;
3021                         else
3022                                 dscale++;
3023                 }
3024                 else if (*cp == '.')
3025                 {
3026                         if (have_dp)
3027                                 ereport(ERROR,
3028                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3029                                           errmsg("invalid input syntax for type numeric: \"%s\"",
3030                                                          str)));
3031                         have_dp = TRUE;
3032                         cp++;
3033                 }
3034                 else
3035                         break;
3036         }
3037
3038         ddigits = i - DEC_DIGITS;
3039         /* trailing padding for digit alignment later */
3040         memset(decdigits + i, 0, DEC_DIGITS - 1);
3041
3042         /* Handle exponent, if any */
3043         if (*cp == 'e' || *cp == 'E')
3044         {
3045                 long            exponent;
3046                 char       *endptr;
3047
3048                 cp++;
3049                 exponent = strtol(cp, &endptr, 10);
3050                 if (endptr == cp)
3051                         ereport(ERROR,
3052                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3053                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3054                                                         str)));
3055                 cp = endptr;
3056                 if (exponent > NUMERIC_MAX_PRECISION ||
3057                         exponent < -NUMERIC_MAX_PRECISION)
3058                         ereport(ERROR,
3059                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3060                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3061                                                         str)));
3062                 dweight += (int) exponent;
3063                 dscale -= (int) exponent;
3064                 if (dscale < 0)
3065                         dscale = 0;
3066         }
3067
3068         /* Should be nothing left but spaces */
3069         while (*cp)
3070         {
3071                 if (!isspace((unsigned char) *cp))
3072                         ereport(ERROR,
3073                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3074                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3075                                                         str)));
3076                 cp++;
3077         }
3078
3079         /*
3080          * Okay, convert pure-decimal representation to base NBASE.  First we need
3081          * to determine the converted weight and ndigits.  offset is the number of
3082          * decimal zeroes to insert before the first given digit to have a
3083          * correctly aligned first NBASE digit.
3084          */
3085         if (dweight >= 0)
3086                 weight = (dweight + 1 + DEC_DIGITS - 1) / DEC_DIGITS - 1;
3087         else
3088                 weight = -((-dweight - 1) / DEC_DIGITS + 1);
3089         offset = (weight + 1) * DEC_DIGITS - (dweight + 1);
3090         ndigits = (ddigits + offset + DEC_DIGITS - 1) / DEC_DIGITS;
3091
3092         alloc_var(dest, ndigits);
3093         dest->sign = sign;
3094         dest->weight = weight;
3095         dest->dscale = dscale;
3096
3097         i = DEC_DIGITS - offset;
3098         digits = dest->digits;
3099
3100         while (ndigits-- > 0)
3101         {
3102 #if DEC_DIGITS == 4
3103                 *digits++ = ((decdigits[i] * 10 + decdigits[i + 1]) * 10 +
3104                                          decdigits[i + 2]) * 10 + decdigits[i + 3];
3105 #elif DEC_DIGITS == 2
3106                 *digits++ = decdigits[i] * 10 + decdigits[i + 1];
3107 #elif DEC_DIGITS == 1
3108                 *digits++ = decdigits[i];
3109 #else
3110 #error unsupported NBASE
3111 #endif
3112                 i += DEC_DIGITS;
3113         }
3114
3115         pfree(decdigits);
3116
3117         /* Strip any leading/trailing zeroes, and normalize weight if zero */
3118         strip_var(dest);
3119 }
3120
3121
3122 /*
3123  * set_var_from_num() -
3124  *
3125  *      Convert the packed db format into a variable
3126  */
3127 static void
3128 set_var_from_num(Numeric num, NumericVar *dest)
3129 {
3130         int                     ndigits;
3131
3132         ndigits = NUMERIC_NDIGITS(num);
3133
3134         alloc_var(dest, ndigits);
3135
3136         dest->weight = num->n_weight;
3137         dest->sign = NUMERIC_SIGN(num);
3138         dest->dscale = NUMERIC_DSCALE(num);
3139
3140         memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit));
3141 }
3142
3143
3144 /*
3145  * set_var_from_var() -
3146  *
3147  *      Copy one variable into another
3148  */
3149 static void
3150 set_var_from_var(NumericVar *value, NumericVar *dest)
3151 {
3152         NumericDigit *newbuf;
3153
3154         newbuf = digitbuf_alloc(value->ndigits + 1);
3155         newbuf[0] = 0;                          /* spare digit for rounding */
3156         memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit));
3157
3158         digitbuf_free(dest->buf);
3159
3160         memmove(dest, value, sizeof(NumericVar));
3161         dest->buf = newbuf;
3162         dest->digits = newbuf + 1;
3163 }
3164
3165
3166 /*
3167  * get_str_from_var() -
3168  *
3169  *      Convert a var to text representation (guts of numeric_out).
3170  *      CAUTION: var's contents may be modified by rounding!
3171  *      Returns a palloc'd string.
3172  */
3173 static char *
3174 get_str_from_var(NumericVar *var, int dscale)
3175 {
3176         char       *str;
3177         char       *cp;
3178         char       *endcp;
3179         int                     i;
3180         int                     d;
3181         NumericDigit dig;
3182
3183 #if DEC_DIGITS > 1
3184         NumericDigit d1;
3185 #endif
3186
3187         if (dscale < 0)
3188                 dscale = 0;
3189
3190         /*
3191          * Check if we must round up before printing the value and do so.
3192          */
3193         round_var(var, dscale);
3194
3195         /*
3196          * Allocate space for the result.
3197          *
3198          * i is set to to # of decimal digits before decimal point. dscale is the
3199          * # of decimal digits we will print after decimal point. We may generate
3200          * as many as DEC_DIGITS-1 excess digits at the end, and in addition we
3201          * need room for sign, decimal point, null terminator.
3202          */
3203         i = (var->weight + 1) * DEC_DIGITS;
3204         if (i <= 0)
3205                 i = 1;
3206
3207         str = palloc(i + dscale + DEC_DIGITS + 2);
3208         cp = str;
3209
3210         /*
3211          * Output a dash for negative values
3212          */
3213         if (var->sign == NUMERIC_NEG)
3214                 *cp++ = '-';
3215
3216         /*
3217          * Output all digits before the decimal point
3218          */
3219         if (var->weight < 0)
3220         {
3221                 d = var->weight + 1;
3222                 *cp++ = '0';
3223         }
3224         else
3225         {
3226                 for (d = 0; d <= var->weight; d++)
3227                 {
3228                         dig = (d < var->ndigits) ? var->digits[d] : 0;
3229                         /* In the first digit, suppress extra leading decimal zeroes */
3230 #if DEC_DIGITS == 4
3231                         {
3232                                 bool            putit = (d > 0);
3233
3234                                 d1 = dig / 1000;
3235                                 dig -= d1 * 1000;
3236                                 putit |= (d1 > 0);
3237                                 if (putit)
3238                                         *cp++ = d1 + '0';
3239                                 d1 = dig / 100;
3240                                 dig -= d1 * 100;
3241                                 putit |= (d1 > 0);
3242                                 if (putit)
3243                                         *cp++ = d1 + '0';
3244                                 d1 = dig / 10;
3245                                 dig -= d1 * 10;
3246                                 putit |= (d1 > 0);
3247                                 if (putit)
3248                                         *cp++ = d1 + '0';
3249                                 *cp++ = dig + '0';
3250                         }
3251 #elif DEC_DIGITS == 2
3252                         d1 = dig / 10;
3253                         dig -= d1 * 10;
3254                         if (d1 > 0 || d > 0)
3255                                 *cp++ = d1 + '0';
3256                         *cp++ = dig + '0';
3257 #elif DEC_DIGITS == 1
3258                         *cp++ = dig + '0';
3259 #else
3260 #error unsupported NBASE
3261 #endif
3262                 }
3263         }
3264
3265         /*
3266          * If requested, output a decimal point and all the digits that follow it.
3267          * We initially put out a multiple of DEC_DIGITS digits, then truncate if
3268          * needed.
3269          */
3270         if (dscale > 0)
3271         {
3272                 *cp++ = '.';
3273                 endcp = cp + dscale;
3274                 for (i = 0; i < dscale; d++, i += DEC_DIGITS)
3275                 {
3276                         dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0;
3277 #if DEC_DIGITS == 4
3278                         d1 = dig / 1000;
3279                         dig -= d1 * 1000;
3280                         *cp++ = d1 + '0';
3281                         d1 = dig / 100;
3282                         dig -= d1 * 100;
3283                         *cp++ = d1 + '0';
3284                         d1 = dig / 10;
3285                         dig -= d1 * 10;
3286                         *cp++ = d1 + '0';
3287                         *cp++ = dig + '0';
3288 #elif DEC_DIGITS == 2
3289                         d1 = dig / 10;
3290                         dig -= d1 * 10;
3291                         *cp++ = d1 + '0';
3292                         *cp++ = dig + '0';
3293 #elif DEC_DIGITS == 1
3294                         *cp++ = dig + '0';
3295 #else
3296 #error unsupported NBASE
3297 #endif
3298                 }
3299                 cp = endcp;
3300         }
3301
3302         /*
3303          * terminate the string and return it
3304          */
3305         *cp = '\0';
3306         return str;
3307 }
3308
3309
3310 /*
3311  * make_result() -
3312  *
3313  *      Create the packed db numeric format in palloc()'d memory from
3314  *      a variable.
3315  */
3316 static Numeric
3317 make_result(NumericVar *var)
3318 {
3319         Numeric         result;
3320         NumericDigit *digits = var->digits;
3321         int                     weight = var->weight;
3322         int                     sign = var->sign;
3323         int                     n;
3324         Size            len;
3325
3326         if (sign == NUMERIC_NAN)
3327         {
3328                 result = (Numeric) palloc(NUMERIC_HDRSZ);
3329
3330                 SET_VARSIZE(result, NUMERIC_HDRSZ);
3331                 result->n_weight = 0;
3332                 result->n_sign_dscale = NUMERIC_NAN;
3333
3334                 dump_numeric("make_result()", result);
3335                 return result;
3336         }
3337
3338         n = var->ndigits;
3339
3340         /* truncate leading zeroes */
3341         while (n > 0 && *digits == 0)
3342         {
3343                 digits++;
3344                 weight--;
3345                 n--;
3346         }
3347         /* truncate trailing zeroes */
3348         while (n > 0 && digits[n - 1] == 0)
3349                 n--;
3350
3351         /* If zero result, force to weight=0 and positive sign */
3352         if (n == 0)
3353         {
3354                 weight = 0;
3355                 sign = NUMERIC_POS;
3356         }
3357
3358         /* Build the result */
3359         len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
3360         result = (Numeric) palloc(len);
3361         SET_VARSIZE(result, len);
3362         result->n_weight = weight;
3363         result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK);
3364
3365         memcpy(result->n_data, digits, n * sizeof(NumericDigit));
3366
3367         /* Check for overflow of int16 fields */
3368         if (result->n_weight != weight ||
3369                 NUMERIC_DSCALE(result) != var->dscale)
3370                 ereport(ERROR,
3371                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3372                                  errmsg("value overflows numeric format")));
3373
3374         dump_numeric("make_result()", result);
3375         return result;
3376 }
3377
3378
3379 /*
3380  * apply_typmod() -
3381  *
3382  *      Do bounds checking and rounding according to the attributes
3383  *      typmod field.
3384  */
3385 static void
3386 apply_typmod(NumericVar *var, int32 typmod)
3387 {
3388         int                     precision;
3389         int                     scale;
3390         int                     maxdigits;
3391         int                     ddigits;
3392         int                     i;
3393
3394         /* Do nothing if we have a default typmod (-1) */
3395         if (typmod < (int32) (VARHDRSZ))
3396                 return;
3397
3398         typmod -= VARHDRSZ;
3399         precision = (typmod >> 16) & 0xffff;
3400         scale = typmod & 0xffff;
3401         maxdigits = precision - scale;
3402
3403         /* Round to target scale (and set var->dscale) */
3404         round_var(var, scale);
3405
3406         /*
3407          * Check for overflow - note we can't do this before rounding, because
3408          * rounding could raise the weight.  Also note that the var's weight could
3409          * be inflated by leading zeroes, which will be stripped before storage
3410          * but perhaps might not have been yet. In any case, we must recognize a
3411          * true zero, whose weight doesn't mean anything.
3412          */
3413         ddigits = (var->weight + 1) * DEC_DIGITS;
3414         if (ddigits > maxdigits)
3415         {
3416                 /* Determine true weight; and check for all-zero result */
3417                 for (i = 0; i < var->ndigits; i++)
3418                 {
3419                         NumericDigit dig = var->digits[i];
3420
3421                         if (dig)
3422                         {
3423                                 /* Adjust for any high-order decimal zero digits */
3424 #if DEC_DIGITS == 4
3425                                 if (dig < 10)
3426                                         ddigits -= 3;
3427                                 else if (dig < 100)
3428                                         ddigits -= 2;
3429                                 else if (dig < 1000)
3430                                         ddigits -= 1;
3431 #elif DEC_DIGITS == 2
3432                                 if (dig < 10)
3433                                         ddigits -= 1;
3434 #elif DEC_DIGITS == 1
3435                                 /* no adjustment */
3436 #else
3437 #error unsupported NBASE
3438 #endif
3439                                 if (ddigits > maxdigits)
3440                                         ereport(ERROR,
3441                                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3442                                                          errmsg("numeric field overflow"),
3443                                                          errdetail("A field with precision %d, scale %d must round to an absolute value less than %s%d.",
3444                                                                            precision, scale,
3445                                         /* Display 10^0 as 1 */
3446                                                                            maxdigits ? "10^" : "",
3447                                                                            maxdigits ? maxdigits : 1
3448                                                                            )));
3449                                 break;
3450                         }
3451                         ddigits -= DEC_DIGITS;
3452                 }
3453         }
3454 }
3455
3456 /*
3457  * Convert numeric to int8, rounding if needed.
3458  *
3459  * If overflow, return FALSE (no error is raised).      Return TRUE if okay.
3460  *
3461  *      CAUTION: var's contents may be modified by rounding!
3462  */
3463 static bool
3464 numericvar_to_int8(NumericVar *var, int64 *result)
3465 {
3466         NumericDigit *digits;
3467         int                     ndigits;
3468         int                     weight;
3469         int                     i;
3470         int64           val,
3471                                 oldval;
3472         bool            neg;
3473
3474         /* Round to nearest integer */
3475         round_var(var, 0);
3476
3477         /* Check for zero input */
3478         strip_var(var);
3479         ndigits = var->ndigits;
3480         if (ndigits == 0)
3481         {
3482                 *result = 0;
3483                 return true;
3484         }
3485
3486         /*
3487          * For input like 10000000000, we must treat stripped digits as real. So
3488          * the loop assumes there are weight+1 digits before the decimal point.
3489          */
3490         weight = var->weight;
3491         Assert(weight >= 0 && ndigits <= weight + 1);
3492
3493         /* Construct the result */
3494         digits = var->digits;
3495         neg = (var->sign == NUMERIC_NEG);
3496         val = digits[0];
3497         for (i = 1; i <= weight; i++)
3498         {
3499                 oldval = val;
3500                 val *= NBASE;
3501                 if (i < ndigits)
3502                         val += digits[i];
3503
3504                 /*
3505                  * The overflow check is a bit tricky because we want to accept
3506                  * INT64_MIN, which will overflow the positive accumulator.  We can
3507                  * detect this case easily though because INT64_MIN is the only
3508                  * nonzero value for which -val == val (on a two's complement machine,
3509                  * anyway).
3510                  */
3511                 if ((val / NBASE) != oldval)    /* possible overflow? */
3512                 {
3513                         if (!neg || (-val) != val || val == 0 || oldval < 0)
3514                                 return false;
3515                 }
3516         }
3517
3518         *result = neg ? -val : val;
3519         return true;
3520 }
3521
3522 /*
3523  * Convert int8 value to numeric.
3524  */
3525 static void
3526 int8_to_numericvar(int64 val, NumericVar *var)
3527 {
3528         uint64          uval,
3529                                 newuval;
3530         NumericDigit *ptr;
3531         int                     ndigits;
3532
3533         /* int8 can require at most 19 decimal digits; add one for safety */
3534         alloc_var(var, 20 / DEC_DIGITS);
3535         if (val < 0)
3536         {
3537                 var->sign = NUMERIC_NEG;
3538                 uval = -val;
3539         }
3540         else
3541         {
3542                 var->sign = NUMERIC_POS;
3543                 uval = val;
3544         }
3545         var->dscale = 0;
3546         if (val == 0)
3547         {
3548                 var->ndigits = 0;
3549                 var->weight = 0;
3550                 return;
3551         }
3552         ptr = var->digits + var->ndigits;
3553         ndigits = 0;
3554         do
3555         {
3556                 ptr--;
3557                 ndigits++;
3558                 newuval = uval / NBASE;
3559                 *ptr = uval - newuval * NBASE;
3560                 uval = newuval;
3561         } while (uval);
3562         var->digits = ptr;
3563         var->ndigits = ndigits;
3564         var->weight = ndigits - 1;
3565 }
3566
3567 /*
3568  * Convert numeric to float8; if out of range, return +/- HUGE_VAL
3569  */
3570 static double
3571 numeric_to_double_no_overflow(Numeric num)
3572 {
3573         char       *tmp;
3574         double          val;
3575         char       *endptr;
3576
3577         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
3578                                                                                           NumericGetDatum(num)));
3579
3580         /* unlike float8in, we ignore ERANGE from strtod */
3581         val = strtod(tmp, &endptr);
3582         if (*endptr != '\0')
3583         {
3584                 /* shouldn't happen ... */
3585                 ereport(ERROR,
3586                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3587                          errmsg("invalid input syntax for type double precision: \"%s\"",
3588                                         tmp)));
3589         }
3590
3591         pfree(tmp);
3592
3593         return val;
3594 }
3595
3596 /* As above, but work from a NumericVar */
3597 static double
3598 numericvar_to_double_no_overflow(NumericVar *var)
3599 {
3600         char       *tmp;
3601         double          val;
3602         char       *endptr;
3603
3604         tmp = get_str_from_var(var, var->dscale);
3605
3606         /* unlike float8in, we ignore ERANGE from strtod */
3607         val = strtod(tmp, &endptr);
3608         if (*endptr != '\0')
3609         {
3610                 /* shouldn't happen ... */
3611                 ereport(ERROR,
3612                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3613                          errmsg("invalid input syntax for type double precision: \"%s\"",
3614                                         tmp)));
3615         }
3616
3617         pfree(tmp);
3618
3619         return val;
3620 }
3621
3622
3623 /*
3624  * cmp_var() -
3625  *
3626  *      Compare two values on variable level.  We assume zeroes have been
3627  *      truncated to no digits.
3628  */
3629 static int
3630 cmp_var(NumericVar *var1, NumericVar *var2)
3631 {
3632         return cmp_var_common(var1->digits, var1->ndigits,
3633                                                   var1->weight, var1->sign,
3634                                                   var2->digits, var2->ndigits,
3635                                                   var2->weight, var2->sign);
3636 }
3637
3638 /*
3639  * cmp_var_common() -
3640  *
3641  *      Main routine of cmp_var(). This function can be used by both
3642  *      NumericVar and Numeric.
3643  */
3644 static int
3645 cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
3646                            int var1weight, int var1sign,
3647                            const NumericDigit *var2digits, int var2ndigits,
3648                            int var2weight, int var2sign)
3649 {
3650         if (var1ndigits == 0)
3651         {
3652                 if (var2ndigits == 0)
3653                         return 0;
3654                 if (var2sign == NUMERIC_NEG)
3655                         return 1;
3656                 return -1;
3657         }
3658         if (var2ndigits == 0)
3659         {
3660                 if (var1sign == NUMERIC_POS)
3661                         return 1;
3662                 return -1;
3663         }
3664
3665         if (var1sign == NUMERIC_POS)
3666         {
3667                 if (var2sign == NUMERIC_NEG)
3668                         return 1;
3669                 return cmp_abs_common(var1digits, var1ndigits, var1weight,
3670                                                           var2digits, var2ndigits, var2weight);
3671         }
3672
3673         if (var2sign == NUMERIC_POS)
3674                 return -1;
3675
3676         return cmp_abs_common(var2digits, var2ndigits, var2weight,
3677                                                   var1digits, var1ndigits, var1weight);
3678 }
3679
3680
3681 /*
3682  * add_var() -
3683  *
3684  *      Full version of add functionality on variable level (handling signs).
3685  *      result might point to one of the operands too without danger.
3686  */
3687 static void
3688 add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3689 {
3690         /*
3691          * Decide on the signs of the two variables what to do
3692          */
3693         if (var1->sign == NUMERIC_POS)
3694         {
3695                 if (var2->sign == NUMERIC_POS)
3696                 {
3697                         /*
3698                          * Both are positive result = +(ABS(var1) + ABS(var2))
3699                          */
3700                         add_abs(var1, var2, result);
3701                         result->sign = NUMERIC_POS;
3702                 }
3703                 else
3704                 {
3705                         /*
3706                          * var1 is positive, var2 is negative Must compare absolute values
3707                          */
3708                         switch (cmp_abs(var1, var2))
3709                         {
3710                                 case 0:
3711                                         /* ----------
3712                                          * ABS(var1) == ABS(var2)
3713                                          * result = ZERO
3714                                          * ----------
3715                                          */
3716                                         zero_var(result);
3717                                         result->dscale = Max(var1->dscale, var2->dscale);
3718                                         break;
3719
3720                                 case 1:
3721                                         /* ----------
3722                                          * ABS(var1) > ABS(var2)
3723                                          * result = +(ABS(var1) - ABS(var2))
3724                                          * ----------
3725                                          */
3726                                         sub_abs(var1, var2, result);
3727                                         result->sign = NUMERIC_POS;
3728                                         break;
3729
3730                                 case -1:
3731                                         /* ----------
3732                                          * ABS(var1) < ABS(var2)
3733                                          * result = -(ABS(var2) - ABS(var1))
3734                                          * ----------
3735                                          */
3736                                         sub_abs(var2, var1, result);
3737                                         result->sign = NUMERIC_NEG;
3738                                         break;
3739                         }
3740                 }
3741         }
3742         else
3743         {
3744                 if (var2->sign == NUMERIC_POS)
3745                 {
3746                         /* ----------
3747                          * var1 is negative, var2 is positive
3748                          * Must compare absolute values
3749                          * ----------
3750                          */
3751                         switch (cmp_abs(var1, var2))
3752                         {
3753                                 case 0:
3754                                         /* ----------
3755                                          * ABS(var1) == ABS(var2)
3756                                          * result = ZERO
3757                                          * ----------
3758                                          */
3759                                         zero_var(result);
3760                                         result->dscale = Max(var1->dscale, var2->dscale);
3761                                         break;
3762
3763                                 case 1:
3764                                         /* ----------
3765                                          * ABS(var1) > ABS(var2)
3766                                          * result = -(ABS(var1) - ABS(var2))
3767                                          * ----------
3768                                          */
3769                                         sub_abs(var1, var2, result);
3770                                         result->sign = NUMERIC_NEG;
3771                                         break;
3772
3773                                 case -1:
3774                                         /* ----------
3775                                          * ABS(var1) < ABS(var2)
3776                                          * result = +(ABS(var2) - ABS(var1))
3777                                          * ----------
3778                                          */
3779                                         sub_abs(var2, var1, result);
3780                                         result->sign = NUMERIC_POS;
3781                                         break;
3782                         }
3783                 }
3784                 else
3785                 {
3786                         /* ----------
3787                          * Both are negative
3788                          * result = -(ABS(var1) + ABS(var2))
3789                          * ----------
3790                          */
3791                         add_abs(var1, var2, result);
3792                         result->sign = NUMERIC_NEG;
3793                 }
3794         }
3795 }
3796
3797
3798 /*
3799  * sub_var() -
3800  *
3801  *      Full version of sub functionality on variable level (handling signs).
3802  *      result might point to one of the operands too without danger.
3803  */
3804 static void
3805 sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3806 {
3807         /*
3808          * Decide on the signs of the two variables what to do
3809          */
3810         if (var1->sign == NUMERIC_POS)
3811         {
3812                 if (var2->sign == NUMERIC_NEG)
3813                 {
3814                         /* ----------
3815                          * var1 is positive, var2 is negative
3816                          * result = +(ABS(var1) + ABS(var2))
3817                          * ----------
3818                          */
3819                         add_abs(var1, var2, result);
3820                         result->sign = NUMERIC_POS;
3821                 }
3822                 else
3823                 {
3824                         /* ----------
3825                          * Both are positive
3826                          * Must compare absolute values
3827                          * ----------
3828                          */
3829                         switch (cmp_abs(var1, var2))
3830                         {
3831                                 case 0:
3832                                         /* ----------
3833                                          * ABS(var1) == ABS(var2)
3834                                          * result = ZERO
3835                                          * ----------
3836                                          */
3837                                         zero_var(result);
3838                                         result->dscale = Max(var1->dscale, var2->dscale);
3839                                         break;
3840
3841                                 case 1:
3842                                         /* ----------
3843                                          * ABS(var1) > ABS(var2)
3844                                          * result = +(ABS(var1) - ABS(var2))
3845                                          * ----------
3846                                          */
3847                                         sub_abs(var1, var2, result);
3848                                         result->sign = NUMERIC_POS;
3849                                         break;
3850
3851                                 case -1:
3852                                         /* ----------
3853                                          * ABS(var1) < ABS(var2)
3854                                          * result = -(ABS(var2) - ABS(var1))
3855                                          * ----------
3856                                          */
3857                                         sub_abs(var2, var1, result);
3858                                         result->sign = NUMERIC_NEG;
3859                                         break;
3860                         }
3861                 }
3862         }
3863         else
3864         {
3865                 if (var2->sign == NUMERIC_NEG)
3866                 {
3867                         /* ----------
3868                          * Both are negative
3869                          * Must compare absolute values
3870                          * ----------
3871                          */
3872                         switch (cmp_abs(var1, var2))
3873                         {
3874                                 case 0:
3875                                         /* ----------
3876                                          * ABS(var1) == ABS(var2)
3877                                          * result = ZERO
3878                                          * ----------
3879                                          */
3880                                         zero_var(result);
3881                                         result->dscale = Max(var1->dscale, var2->dscale);
3882                                         break;
3883
3884                                 case 1:
3885                                         /* ----------
3886                                          * ABS(var1) > ABS(var2)
3887                                          * result = -(ABS(var1) - ABS(var2))
3888                                          * ----------
3889                                          */
3890                                         sub_abs(var1, var2, result);
3891                                         result->sign = NUMERIC_NEG;
3892                                         break;
3893
3894                                 case -1:
3895                                         /* ----------
3896                                          * ABS(var1) < ABS(var2)
3897                                          * result = +(ABS(var2) - ABS(var1))
3898                                          * ----------
3899                                          */
3900                                         sub_abs(var2, var1, result);
3901                                         result->sign = NUMERIC_POS;
3902                                         break;
3903                         }
3904                 }
3905                 else
3906                 {
3907                         /* ----------
3908                          * var1 is negative, var2 is positive
3909                          * result = -(ABS(var1) + ABS(var2))
3910                          * ----------
3911                          */
3912                         add_abs(var1, var2, result);
3913                         result->sign = NUMERIC_NEG;
3914                 }
3915         }
3916 }
3917
3918
3919 /*
3920  * mul_var() -
3921  *
3922  *      Multiplication on variable level. Product of var1 * var2 is stored
3923  *      in result.      Result is rounded to no more than rscale fractional digits.
3924  */
3925 static void
3926 mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
3927                 int rscale)
3928 {
3929         int                     res_ndigits;
3930         int                     res_sign;
3931         int                     res_weight;
3932         int                     maxdigits;
3933         int                *dig;
3934         int                     carry;
3935         int                     maxdig;
3936         int                     newdig;
3937         NumericDigit *res_digits;
3938         int                     i,
3939                                 ri,
3940                                 i1,
3941                                 i2;
3942
3943         /* copy these values into local vars for speed in inner loop */
3944         int                     var1ndigits = var1->ndigits;
3945         int                     var2ndigits = var2->ndigits;
3946         NumericDigit *var1digits = var1->digits;
3947         NumericDigit *var2digits = var2->digits;
3948
3949         if (var1ndigits == 0 || var2ndigits == 0)
3950         {
3951                 /* one or both inputs is zero; so is result */
3952                 zero_var(result);
3953                 result->dscale = rscale;
3954                 return;
3955         }
3956
3957         /* Determine result sign and (maximum possible) weight */
3958         if (var1->sign == var2->sign)
3959                 res_sign = NUMERIC_POS;
3960         else
3961                 res_sign = NUMERIC_NEG;
3962         res_weight = var1->weight + var2->weight + 2;
3963
3964         /*
3965          * Determine number of result digits to compute.  If the exact result
3966          * would have more than rscale fractional digits, truncate the computation
3967          * with MUL_GUARD_DIGITS guard digits.  We do that by pretending that one
3968          * or both inputs have fewer digits than they really do.
3969          */
3970         res_ndigits = var1ndigits + var2ndigits + 1;
3971         maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS;
3972         if (res_ndigits > maxdigits)
3973         {
3974                 if (maxdigits < 3)
3975                 {
3976                         /* no useful precision at all in the result... */
3977                         zero_var(result);
3978                         result->dscale = rscale;
3979                         return;
3980                 }
3981                 /* force maxdigits odd so that input ndigits can be equal */
3982                 if ((maxdigits & 1) == 0)
3983                         maxdigits++;
3984                 if (var1ndigits > var2ndigits)
3985                 {
3986                         var1ndigits -= res_ndigits - maxdigits;
3987                         if (var1ndigits < var2ndigits)
3988                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
3989                 }
3990                 else
3991                 {
3992                         var2ndigits -= res_ndigits - maxdigits;
3993                         if (var2ndigits < var1ndigits)
3994                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
3995                 }
3996                 res_ndigits = maxdigits;
3997                 Assert(res_ndigits == var1ndigits + var2ndigits + 1);
3998         }
3999
4000         /*
4001          * We do the arithmetic in an array "dig[]" of signed int's.  Since
4002          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
4003          * to avoid normalizing carries immediately.
4004          *
4005          * maxdig tracks the maximum possible value of any dig[] entry; when this
4006          * threatens to exceed INT_MAX, we take the time to propagate carries. To
4007          * avoid overflow in maxdig itself, it actually represents the max
4008          * possible value divided by NBASE-1.
4009          */
4010         dig = (int *) palloc0(res_ndigits * sizeof(int));
4011         maxdig = 0;
4012
4013         ri = res_ndigits - 1;
4014         for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--)
4015         {
4016                 int                     var1digit = var1digits[i1];
4017
4018                 if (var1digit == 0)
4019                         continue;
4020
4021                 /* Time to normalize? */
4022                 maxdig += var1digit;
4023                 if (maxdig > INT_MAX / (NBASE - 1))
4024                 {
4025                         /* Yes, do it */
4026                         carry = 0;
4027                         for (i = res_ndigits - 1; i >= 0; i--)
4028                         {
4029                                 newdig = dig[i] + carry;
4030                                 if (newdig >= NBASE)
4031                                 {
4032                                         carry = newdig / NBASE;
4033                                         newdig -= carry * NBASE;
4034                                 }
4035                                 else
4036                                         carry = 0;
4037                                 dig[i] = newdig;
4038                         }
4039                         Assert(carry == 0);
4040                         /* Reset maxdig to indicate new worst-case */
4041                         maxdig = 1 + var1digit;
4042                 }
4043
4044                 /* Add appropriate multiple of var2 into the accumulator */
4045                 i = ri;
4046                 for (i2 = var2ndigits - 1; i2 >= 0; i2--)
4047                         dig[i--] += var1digit * var2digits[i2];
4048         }
4049
4050         /*
4051          * Now we do a final carry propagation pass to normalize the result, which
4052          * we combine with storing the result digits into the output. Note that
4053          * this is still done at full precision w/guard digits.
4054          */
4055         alloc_var(result, res_ndigits);
4056         res_digits = result->digits;
4057         carry = 0;
4058         for (i = res_ndigits - 1; i >= 0; i--)
4059         {
4060                 newdig = dig[i] + carry;
4061                 if (newdig >= NBASE)
4062                 {
4063                         carry = newdig / NBASE;
4064                         newdig -= carry * NBASE;
4065                 }
4066                 else
4067                         carry = 0;
4068                 res_digits[i] = newdig;
4069         }
4070         Assert(carry == 0);
4071
4072         pfree(dig);
4073
4074         /*
4075          * Finally, round the result to the requested precision.
4076          */
4077         result->weight = res_weight;
4078         result->sign = res_sign;
4079
4080         /* Round to target rscale (and set result->dscale) */
4081         round_var(result, rscale);
4082
4083         /* Strip leading and trailing zeroes */
4084         strip_var(result);
4085 }
4086
4087
4088 /*
4089  * div_var() -
4090  *
4091  *      Division on variable level. Quotient of var1 / var2 is stored in result.
4092  *      The quotient is figured to exactly rscale fractional digits.
4093  *      If round is true, it is rounded at the rscale'th digit; if false, it
4094  *      is truncated (towards zero) at that digit.
4095  */
4096 static void
4097 div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
4098                 int rscale, bool round)
4099 {
4100         int                     div_ndigits;
4101         int                     res_ndigits;
4102         int                     res_sign;
4103         int                     res_weight;
4104         int                     carry;
4105         int                     borrow;
4106         int                     divisor1;
4107         int                     divisor2;
4108         NumericDigit *dividend;
4109         NumericDigit *divisor;
4110         NumericDigit *res_digits;
4111         int                     i;
4112         int                     j;
4113
4114         /* copy these values into local vars for speed in inner loop */
4115         int                     var1ndigits = var1->ndigits;
4116         int                     var2ndigits = var2->ndigits;
4117
4118         /*
4119          * First of all division by zero check; we must not be handed an
4120          * unnormalized divisor.
4121          */
4122         if (var2ndigits == 0 || var2->digits[0] == 0)
4123                 ereport(ERROR,
4124                                 (errcode(ERRCODE_DIVISION_BY_ZERO),
4125                                  errmsg("division by zero")));
4126
4127         /*
4128          * Now result zero check
4129          */
4130         if (var1ndigits == 0)
4131         {
4132                 zero_var(result);
4133                 result->dscale = rscale;
4134                 return;
4135         }
4136
4137         /*
4138          * Determine the result sign, weight and number of digits to calculate.
4139          * The weight figured here is correct if the emitted quotient has no
4140          * leading zero digits; otherwise strip_var() will fix things up.
4141          */
4142         if (var1->sign == var2->sign)
4143                 res_sign = NUMERIC_POS;
4144         else
4145                 res_sign = NUMERIC_NEG;
4146         res_weight = var1->weight - var2->weight;
4147         /* The number of accurate result digits we need to produce: */
4148         res_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
4149         /* ... but always at least 1 */
4150         res_ndigits = Max(res_ndigits, 1);
4151         /* If rounding needed, figure one more digit to ensure correct result */
4152         if (round)
4153                 res_ndigits++;
4154         /*
4155          * The working dividend normally requires res_ndigits + var2ndigits
4156          * digits, but make it at least var1ndigits so we can load all of var1
4157          * into it.  (There will be an additional digit dividend[0] in the
4158          * dividend space, but for consistency with Knuth's notation we don't
4159          * count that in div_ndigits.)
4160          */
4161         div_ndigits = res_ndigits + var2ndigits;
4162         div_ndigits = Max(div_ndigits, var1ndigits);
4163
4164         /*
4165          * We need a workspace with room for the working dividend (div_ndigits+1
4166          * digits) plus room for the possibly-normalized divisor (var2ndigits
4167          * digits).  It is convenient also to have a zero at divisor[0] with
4168          * the actual divisor data in divisor[1 .. var2ndigits].  Transferring the
4169          * digits into the workspace also allows us to realloc the result (which
4170          * might be the same as either input var) before we begin the main loop.
4171          * Note that we use palloc0 to ensure that divisor[0], dividend[0], and
4172          * any additional dividend positions beyond var1ndigits, start out 0.
4173          */
4174         dividend = (NumericDigit *)
4175                 palloc0((div_ndigits + var2ndigits + 2) * sizeof(NumericDigit));
4176         divisor = dividend + (div_ndigits + 1);
4177         memcpy(dividend + 1, var1->digits, var1ndigits * sizeof(NumericDigit));
4178         memcpy(divisor + 1, var2->digits, var2ndigits * sizeof(NumericDigit));
4179
4180         /*
4181          * Now we can realloc the result to hold the generated quotient digits.
4182          */
4183         alloc_var(result, res_ndigits);
4184         res_digits = result->digits;
4185
4186         if (var2ndigits == 1)
4187         {
4188                 /*
4189                  * If there's only a single divisor digit, we can use a fast path
4190                  * (cf. Knuth section 4.3.1 exercise 16).
4191                  */
4192                 divisor1 = divisor[1];
4193                 carry = 0;
4194                 for (i = 0; i < res_ndigits; i++)
4195                 {
4196                         carry = carry * NBASE + dividend[i + 1];
4197                         res_digits[i] = carry / divisor1;
4198                         carry = carry % divisor1;
4199                 }
4200         }
4201         else
4202         {
4203                 /*
4204                  * The full multiple-place algorithm is taken from Knuth volume 2,
4205                  * Algorithm 4.3.1D.
4206                  *
4207                  * We need the first divisor digit to be >= NBASE/2.  If it isn't,
4208                  * make it so by scaling up both the divisor and dividend by the
4209                  * factor "d".  (The reason for allocating dividend[0] above is to
4210                  * leave room for possible carry here.)
4211                  */
4212                 if (divisor[1] < HALF_NBASE)
4213                 {
4214                         int             d = NBASE / (divisor[1] + 1);
4215
4216                         carry = 0;
4217                         for (i = var2ndigits; i > 0; i--)
4218                         {
4219                                 carry += divisor[i] * d;
4220                                 divisor[i] = carry % NBASE;
4221                                 carry = carry / NBASE;
4222                         }
4223                         Assert(carry == 0);
4224                         carry = 0;
4225                         /* at this point only var1ndigits of dividend can be nonzero */
4226                         for (i = var1ndigits; i >= 0; i--)
4227                         {
4228                                 carry += dividend[i] * d;
4229                                 dividend[i] = carry % NBASE;
4230                                 carry = carry / NBASE;
4231                         }
4232                         Assert(carry == 0);
4233                         Assert(divisor[1] >= HALF_NBASE);
4234                 }
4235                 /* First 2 divisor digits are used repeatedly in main loop */
4236                 divisor1 = divisor[1];
4237                 divisor2 = divisor[2];
4238
4239                 /*
4240                  * Begin the main loop.  Each iteration of this loop produces the
4241                  * j'th quotient digit by dividing dividend[j .. j + var2ndigits]
4242                  * by the divisor; this is essentially the same as the common manual
4243                  * procedure for long division.
4244                  */
4245                 for (j = 0; j < res_ndigits; j++)
4246                 {
4247                         /* Estimate quotient digit from the first two dividend digits */
4248                         int             next2digits = dividend[j] * NBASE + dividend[j+1];
4249                         int             qhat;
4250
4251                         /*
4252                          * If next2digits are 0, then quotient digit must be 0 and there's
4253                          * no need to adjust the working dividend.  It's worth testing
4254                          * here to fall out ASAP when processing trailing zeroes in
4255                          * a dividend.
4256                          */
4257                         if (next2digits == 0)
4258                         {
4259                                 res_digits[j] = 0;
4260                                 continue;
4261                         }
4262
4263                         if (dividend[j] == divisor1)
4264                                 qhat = NBASE - 1;
4265                         else
4266                                 qhat = next2digits / divisor1;
4267                         /*
4268                          * Adjust quotient digit if it's too large.  Knuth proves that
4269                          * after this step, the quotient digit will be either correct
4270                          * or just one too large.  (Note: it's OK to use dividend[j+2]
4271                          * here because we know the divisor length is at least 2.)
4272                          */
4273                         while (divisor2 * qhat >
4274                                    (next2digits - qhat * divisor1) * NBASE + dividend[j+2])
4275                                 qhat--;
4276
4277                         /* As above, need do nothing more when quotient digit is 0 */
4278                         if (qhat > 0)
4279                         {
4280                                 /*
4281                                  * Multiply the divisor by qhat, and subtract that from the
4282                                  * working dividend.  "carry" tracks the multiplication,
4283                                  * "borrow" the subtraction (could we fold these together?)
4284                                  */
4285                                 carry = 0;
4286                                 borrow = 0;
4287                                 for (i = var2ndigits; i >= 0; i--)
4288                                 {
4289                                         carry += divisor[i] * qhat;
4290                                         borrow -= carry % NBASE;
4291                                         carry = carry / NBASE;
4292                                         borrow += dividend[j + i];
4293                                         if (borrow < 0)
4294                                         {
4295                                                 dividend[j + i] = borrow + NBASE;
4296                                                 borrow = -1;
4297                                         }
4298                                         else
4299                                         {
4300                                                 dividend[j + i] = borrow;
4301                                                 borrow = 0;
4302                                         }
4303                                 }
4304                                 Assert(carry == 0);
4305
4306                                 /*
4307                                  * If we got a borrow out of the top dividend digit, then
4308                                  * indeed qhat was one too large.  Fix it, and add back the
4309                                  * divisor to correct the working dividend.  (Knuth proves
4310                                  * that this will occur only about 3/NBASE of the time; hence,
4311                                  * it's a good idea to test this code with small NBASE to be
4312                                  * sure this section gets exercised.)
4313                                  */
4314                                 if (borrow)
4315                                 {
4316                                         qhat--;
4317                                         carry = 0;
4318                                         for (i = var2ndigits; i >= 0; i--)
4319                                         {
4320                                                 carry += dividend[j + i] + divisor[i];
4321                                                 if (carry >= NBASE)
4322                                                 {
4323                                                         dividend[j + i] = carry - NBASE;
4324                                                         carry = 1;
4325                                                 }
4326                                                 else
4327                                                 {
4328                                                         dividend[j + i] = carry;
4329                                                         carry = 0;
4330                                                 }
4331                                         }
4332                                         /* A carry should occur here to cancel the borrow above */
4333                                         Assert(carry == 1);
4334                                 }
4335                         }
4336
4337                         /* And we're done with this quotient digit */
4338                         res_digits[j] = qhat;
4339                 }
4340         }
4341
4342         pfree(dividend);
4343
4344         /*
4345          * Finally, round or truncate the result to the requested precision.
4346          */
4347         result->weight = res_weight;
4348         result->sign = res_sign;
4349
4350         /* Round or truncate to target rscale (and set result->dscale) */
4351         if (round)
4352                 round_var(result, rscale);
4353         else
4354                 trunc_var(result, rscale);
4355
4356         /* Strip leading and trailing zeroes */
4357         strip_var(result);
4358 }
4359
4360
4361 /*
4362  * div_var_fast() -
4363  *
4364  *      This has the same API as div_var, but is implemented using the division
4365  *      algorithm from the "FM" library, rather than Knuth's schoolbook-division
4366  *      approach.  This is significantly faster but can produce inaccurate
4367  *      results, because it sometimes has to propagate rounding to the left,
4368  *      and so we can never be entirely sure that we know the requested digits
4369  *      exactly.  We compute DIV_GUARD_DIGITS extra digits, but there is
4370  *      no certainty that that's enough.  We use this only in the transcendental
4371  *      function calculation routines, where everything is approximate anyway.
4372  */
4373 static void
4374 div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result,
4375                          int rscale, bool round)
4376 {
4377         int                     div_ndigits;
4378         int                     res_sign;
4379         int                     res_weight;
4380         int                *div;
4381         int                     qdigit;
4382         int                     carry;
4383         int                     maxdiv;
4384         int                     newdig;
4385         NumericDigit *res_digits;
4386         double          fdividend,
4387                                 fdivisor,
4388                                 fdivisorinverse,
4389                                 fquotient;
4390         int                     qi;
4391         int                     i;
4392
4393         /* copy these values into local vars for speed in inner loop */
4394         int                     var1ndigits = var1->ndigits;
4395         int                     var2ndigits = var2->ndigits;
4396         NumericDigit *var1digits = var1->digits;
4397         NumericDigit *var2digits = var2->digits;
4398
4399         /*
4400          * First of all division by zero check; we must not be handed an
4401          * unnormalized divisor.
4402          */
4403         if (var2ndigits == 0 || var2digits[0] == 0)
4404                 ereport(ERROR,
4405                                 (errcode(ERRCODE_DIVISION_BY_ZERO),
4406                                  errmsg("division by zero")));
4407
4408         /*
4409          * Now result zero check
4410          */
4411         if (var1ndigits == 0)
4412         {
4413                 zero_var(result);
4414                 result->dscale = rscale;
4415                 return;
4416         }
4417
4418         /*
4419          * Determine the result sign, weight and number of digits to calculate
4420          */
4421         if (var1->sign == var2->sign)
4422                 res_sign = NUMERIC_POS;
4423         else
4424                 res_sign = NUMERIC_NEG;
4425         res_weight = var1->weight - var2->weight + 1;
4426         /* The number of accurate result digits we need to produce: */
4427         div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
4428         /* Add guard digits for roundoff error */
4429         div_ndigits += DIV_GUARD_DIGITS;
4430         if (div_ndigits < DIV_GUARD_DIGITS)
4431                 div_ndigits = DIV_GUARD_DIGITS;
4432         /* Must be at least var1ndigits, too, to simplify data-loading loop */
4433         if (div_ndigits < var1ndigits)
4434                 div_ndigits = var1ndigits;
4435
4436         /*
4437          * We do the arithmetic in an array "div[]" of signed int's.  Since
4438          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
4439          * to avoid normalizing carries immediately.
4440          *
4441          * We start with div[] containing one zero digit followed by the
4442          * dividend's digits (plus appended zeroes to reach the desired precision
4443          * including guard digits).  Each step of the main loop computes an
4444          * (approximate) quotient digit and stores it into div[], removing one
4445          * position of dividend space.  A final pass of carry propagation takes
4446          * care of any mistaken quotient digits.
4447          */
4448         div = (int *) palloc0((div_ndigits + 1) * sizeof(int));
4449         for (i = 0; i < var1ndigits; i++)
4450                 div[i + 1] = var1digits[i];
4451
4452         /*
4453          * We estimate each quotient digit using floating-point arithmetic, taking
4454          * the first four digits of the (current) dividend and divisor. This must
4455          * be float to avoid overflow.
4456          */
4457         fdivisor = (double) var2digits[0];
4458         for (i = 1; i < 4; i++)
4459         {
4460                 fdivisor *= NBASE;
4461                 if (i < var2ndigits)
4462                         fdivisor += (double) var2digits[i];
4463         }
4464         fdivisorinverse = 1.0 / fdivisor;
4465
4466         /*
4467          * maxdiv tracks the maximum possible absolute value of any div[] entry;
4468          * when this threatens to exceed INT_MAX, we take the time to propagate
4469          * carries.  To avoid overflow in maxdiv itself, it actually represents
4470          * the max possible abs. value divided by NBASE-1.
4471          */
4472         maxdiv = 1;
4473
4474         /*
4475          * Outer loop computes next quotient digit, which will go into div[qi]
4476          */
4477         for (qi = 0; qi < div_ndigits; qi++)
4478         {
4479                 /* Approximate the current dividend value */
4480                 fdividend = (double) div[qi];
4481                 for (i = 1; i < 4; i++)
4482                 {
4483                         fdividend *= NBASE;
4484                         if (qi + i <= div_ndigits)
4485                                 fdividend += (double) div[qi + i];
4486                 }
4487                 /* Compute the (approximate) quotient digit */
4488                 fquotient = fdividend * fdivisorinverse;
4489                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4490                         (((int) fquotient) - 1);        /* truncate towards -infinity */
4491
4492                 if (qdigit != 0)
4493                 {
4494                         /* Do we need to normalize now? */
4495                         maxdiv += Abs(qdigit);
4496                         if (maxdiv > INT_MAX / (NBASE - 1))
4497                         {
4498                                 /* Yes, do it */
4499                                 carry = 0;
4500                                 for (i = div_ndigits; i > qi; i--)
4501                                 {
4502                                         newdig = div[i] + carry;
4503                                         if (newdig < 0)
4504                                         {
4505                                                 carry = -((-newdig - 1) / NBASE) - 1;
4506                                                 newdig -= carry * NBASE;
4507                                         }
4508                                         else if (newdig >= NBASE)
4509                                         {
4510                                                 carry = newdig / NBASE;
4511                                                 newdig -= carry * NBASE;
4512                                         }
4513                                         else
4514                                                 carry = 0;
4515                                         div[i] = newdig;
4516                                 }
4517                                 newdig = div[qi] + carry;
4518                                 div[qi] = newdig;
4519
4520                                 /*
4521                                  * All the div[] digits except possibly div[qi] are now in the
4522                                  * range 0..NBASE-1.
4523                                  */
4524                                 maxdiv = Abs(newdig) / (NBASE - 1);
4525                                 maxdiv = Max(maxdiv, 1);
4526
4527                                 /*
4528                                  * Recompute the quotient digit since new info may have
4529                                  * propagated into the top four dividend digits
4530                                  */
4531                                 fdividend = (double) div[qi];
4532                                 for (i = 1; i < 4; i++)
4533                                 {
4534                                         fdividend *= NBASE;
4535                                         if (qi + i <= div_ndigits)
4536                                                 fdividend += (double) div[qi + i];
4537                                 }
4538                                 /* Compute the (approximate) quotient digit */
4539                                 fquotient = fdividend * fdivisorinverse;
4540                                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4541                                         (((int) fquotient) - 1);        /* truncate towards -infinity */
4542                                 maxdiv += Abs(qdigit);
4543                         }
4544
4545                         /* Subtract off the appropriate multiple of the divisor */
4546                         if (qdigit != 0)
4547                         {
4548                                 int                     istop = Min(var2ndigits, div_ndigits - qi + 1);
4549
4550                                 for (i = 0; i < istop; i++)
4551                                         div[qi + i] -= qdigit * var2digits[i];
4552                         }
4553                 }
4554
4555                 /*
4556                  * The dividend digit we are about to replace might still be nonzero.
4557                  * Fold it into the next digit position.  We don't need to worry about
4558                  * overflow here since this should nearly cancel with the subtraction
4559                  * of the divisor.
4560                  */
4561                 div[qi + 1] += div[qi] * NBASE;
4562
4563                 div[qi] = qdigit;
4564         }
4565
4566         /*
4567          * Approximate and store the last quotient digit (div[div_ndigits])
4568          */
4569         fdividend = (double) div[qi];
4570         for (i = 1; i < 4; i++)
4571                 fdividend *= NBASE;
4572         fquotient = fdividend * fdivisorinverse;
4573         qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4574                 (((int) fquotient) - 1);        /* truncate towards -infinity */
4575         div[qi] = qdigit;
4576
4577         /*
4578          * Now we do a final carry propagation pass to normalize the result, which
4579          * we combine with storing the result digits into the output. Note that
4580          * this is still done at full precision w/guard digits.
4581          */
4582         alloc_var(result, div_ndigits + 1);
4583         res_digits = result->digits;
4584         carry = 0;
4585         for (i = div_ndigits; i >= 0; i--)
4586         {
4587                 newdig = div[i] + carry;
4588                 if (newdig < 0)
4589                 {
4590                         carry = -((-newdig - 1) / NBASE) - 1;
4591                         newdig -= carry * NBASE;
4592                 }
4593                 else if (newdig >= NBASE)
4594                 {
4595                         carry = newdig / NBASE;
4596                         newdig -= carry * NBASE;
4597                 }
4598                 else
4599                         carry = 0;
4600                 res_digits[i] = newdig;
4601         }
4602         Assert(carry == 0);
4603
4604         pfree(div);
4605
4606         /*
4607          * Finally, round the result to the requested precision.
4608          */
4609         result->weight = res_weight;
4610         result->sign = res_sign;
4611
4612         /* Round to target rscale (and set result->dscale) */
4613         if (round)
4614                 round_var(result, rscale);
4615         else
4616                 trunc_var(result, rscale);
4617
4618         /* Strip leading and trailing zeroes */
4619         strip_var(result);
4620 }
4621
4622
4623 /*
4624  * Default scale selection for division
4625  *
4626  * Returns the appropriate result scale for the division result.
4627  */
4628 static int
4629 select_div_scale(NumericVar *var1, NumericVar *var2)
4630 {
4631         int                     weight1,
4632                                 weight2,
4633                                 qweight,
4634                                 i;
4635         NumericDigit firstdigit1,
4636                                 firstdigit2;
4637         int                     rscale;
4638
4639         /*
4640          * The result scale of a division isn't specified in any SQL standard. For
4641          * PostgreSQL we select a result scale that will give at least
4642          * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a
4643          * result no less accurate than float8; but use a scale not less than
4644          * either input's display scale.
4645          */
4646
4647         /* Get the actual (normalized) weight and first digit of each input */
4648
4649         weight1 = 0;                            /* values to use if var1 is zero */
4650         firstdigit1 = 0;
4651         for (i = 0; i < var1->ndigits; i++)
4652         {
4653                 firstdigit1 = var1->digits[i];
4654                 if (firstdigit1 != 0)
4655                 {
4656                         weight1 = var1->weight - i;
4657                         break;
4658                 }
4659         }
4660
4661         weight2 = 0;                            /* values to use if var2 is zero */
4662         firstdigit2 = 0;
4663         for (i = 0; i < var2->ndigits; i++)
4664         {
4665                 firstdigit2 = var2->digits[i];
4666                 if (firstdigit2 != 0)
4667                 {
4668                         weight2 = var2->weight - i;
4669                         break;
4670                 }
4671         }
4672
4673         /*
4674          * Estimate weight of quotient.  If the two first digits are equal, we
4675          * can't be sure, but assume that var1 is less than var2.
4676          */
4677         qweight = weight1 - weight2;
4678         if (firstdigit1 <= firstdigit2)
4679                 qweight--;
4680
4681         /* Select result scale */
4682         rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS;
4683         rscale = Max(rscale, var1->dscale);
4684         rscale = Max(rscale, var2->dscale);
4685         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4686         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4687
4688         return rscale;
4689 }
4690
4691
4692 /*
4693  * mod_var() -
4694  *
4695  *      Calculate the modulo of two numerics at variable level
4696  */
4697 static void
4698 mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
4699 {
4700         NumericVar      tmp;
4701
4702         init_var(&tmp);
4703
4704         /* ---------
4705          * We do this using the equation
4706          *              mod(x,y) = x - trunc(x/y)*y
4707          * div_var can be persuaded to give us trunc(x/y) directly.
4708          * ----------
4709          */
4710         div_var(var1, var2, &tmp, 0, false);
4711
4712         mul_var(var2, &tmp, &tmp, var2->dscale);
4713
4714         sub_var(var1, &tmp, result);
4715
4716         free_var(&tmp);
4717 }
4718
4719
4720 /*
4721  * ceil_var() -
4722  *
4723  *      Return the smallest integer greater than or equal to the argument
4724  *      on variable level
4725  */
4726 static void
4727 ceil_var(NumericVar *var, NumericVar *result)
4728 {
4729         NumericVar      tmp;
4730
4731         init_var(&tmp);
4732         set_var_from_var(var, &tmp);
4733
4734         trunc_var(&tmp, 0);
4735
4736         if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
4737                 add_var(&tmp, &const_one, &tmp);
4738
4739         set_var_from_var(&tmp, result);
4740         free_var(&tmp);
4741 }
4742
4743
4744 /*
4745  * floor_var() -
4746  *
4747  *      Return the largest integer equal to or less than the argument
4748  *      on variable level
4749  */
4750 static void
4751 floor_var(NumericVar *var, NumericVar *result)
4752 {
4753         NumericVar      tmp;
4754
4755         init_var(&tmp);
4756         set_var_from_var(var, &tmp);
4757
4758         trunc_var(&tmp, 0);
4759
4760         if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
4761                 sub_var(&tmp, &const_one, &tmp);
4762
4763         set_var_from_var(&tmp, result);
4764         free_var(&tmp);
4765 }
4766
4767
4768 /*
4769  * sqrt_var() -
4770  *
4771  *      Compute the square root of x using Newton's algorithm
4772  */
4773 static void
4774 sqrt_var(NumericVar *arg, NumericVar *result, int rscale)
4775 {
4776         NumericVar      tmp_arg;
4777         NumericVar      tmp_val;
4778         NumericVar      last_val;
4779         int                     local_rscale;
4780         int                     stat;
4781
4782         local_rscale = rscale + 8;
4783
4784         stat = cmp_var(arg, &const_zero);
4785         if (stat == 0)
4786         {
4787                 zero_var(result);
4788                 result->dscale = rscale;
4789                 return;
4790         }
4791
4792         /*
4793          * SQL2003 defines sqrt() in terms of power, so we need to emit the right
4794          * SQLSTATE error code if the operand is negative.
4795          */
4796         if (stat < 0)
4797                 ereport(ERROR,
4798                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
4799                                  errmsg("cannot take square root of a negative number")));
4800
4801         init_var(&tmp_arg);
4802         init_var(&tmp_val);
4803         init_var(&last_val);
4804
4805         /* Copy arg in case it is the same var as result */
4806         set_var_from_var(arg, &tmp_arg);
4807
4808         /*
4809          * Initialize the result to the first guess
4810          */
4811         alloc_var(result, 1);
4812         result->digits[0] = tmp_arg.digits[0] / 2;
4813         if (result->digits[0] == 0)
4814                 result->digits[0] = 1;
4815         result->weight = tmp_arg.weight / 2;
4816         result->sign = NUMERIC_POS;
4817
4818         set_var_from_var(result, &last_val);
4819
4820         for (;;)
4821         {
4822                 div_var_fast(&tmp_arg, result, &tmp_val, local_rscale, true);
4823
4824                 add_var(result, &tmp_val, result);
4825                 mul_var(result, &const_zero_point_five, result, local_rscale);
4826
4827                 if (cmp_var(&last_val, result) == 0)
4828                         break;
4829                 set_var_from_var(result, &last_val);
4830         }
4831
4832         free_var(&last_val);
4833         free_var(&tmp_val);
4834         free_var(&tmp_arg);
4835
4836         /* Round to requested precision */
4837         round_var(result, rscale);
4838 }
4839
4840
4841 /*
4842  * exp_var() -
4843  *
4844  *      Raise e to the power of x
4845  */
4846 static void
4847 exp_var(NumericVar *arg, NumericVar *result, int rscale)
4848 {
4849         NumericVar      x;
4850         int                     xintval;
4851         bool            xneg = FALSE;
4852         int                     local_rscale;
4853
4854         /*----------
4855          * We separate the integral and fraction parts of x, then compute
4856          *              e^x = e^xint * e^xfrac
4857          * where e = exp(1) and e^xfrac = exp(xfrac) are computed by
4858          * exp_var_internal; the limited range of inputs allows that routine
4859          * to do a good job with a simple Taylor series.  Raising e^xint is
4860          * done by repeated multiplications in power_var_int.
4861          *----------
4862          */
4863         init_var(&x);
4864
4865         set_var_from_var(arg, &x);
4866
4867         if (x.sign == NUMERIC_NEG)
4868         {
4869                 xneg = TRUE;
4870                 x.sign = NUMERIC_POS;
4871         }
4872
4873         /* Extract the integer part, remove it from x */
4874         xintval = 0;
4875         while (x.weight >= 0)
4876         {
4877                 xintval *= NBASE;
4878                 if (x.ndigits > 0)
4879                 {
4880                         xintval += x.digits[0];
4881                         x.digits++;
4882                         x.ndigits--;
4883                 }
4884                 x.weight--;
4885                 /* Guard against overflow */
4886                 if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3)
4887                         ereport(ERROR,
4888                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
4889                                          errmsg("argument for function \"exp\" too big")));
4890         }
4891
4892         /* Select an appropriate scale for internal calculation */
4893         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
4894
4895         /* Compute e^xfrac */
4896         exp_var_internal(&x, result, local_rscale);
4897
4898         /* If there's an integer part, multiply by e^xint */
4899         if (xintval > 0)
4900         {
4901                 NumericVar      e;
4902
4903                 init_var(&e);
4904                 exp_var_internal(&const_one, &e, local_rscale);
4905                 power_var_int(&e, xintval, &e, local_rscale);
4906                 mul_var(&e, result, result, local_rscale);
4907                 free_var(&e);
4908         }
4909
4910         /* Compensate for input sign, and round to requested rscale */
4911         if (xneg)
4912                 div_var_fast(&const_one, result, result, rscale, true);
4913         else
4914                 round_var(result, rscale);
4915
4916         free_var(&x);
4917 }
4918
4919
4920 /*
4921  * exp_var_internal() -
4922  *
4923  *      Raise e to the power of x, where 0 <= x <= 1
4924  *
4925  * NB: the result should be good to at least rscale digits, but it has
4926  * *not* been rounded off; the caller must do that if wanted.
4927  */
4928 static void
4929 exp_var_internal(NumericVar *arg, NumericVar *result, int rscale)
4930 {
4931         NumericVar      x;
4932         NumericVar      xpow;
4933         NumericVar      ifac;
4934         NumericVar      elem;
4935         NumericVar      ni;
4936         int                     ndiv2 = 0;
4937         int                     local_rscale;
4938
4939         init_var(&x);
4940         init_var(&xpow);
4941         init_var(&ifac);
4942         init_var(&elem);
4943         init_var(&ni);
4944
4945         set_var_from_var(arg, &x);
4946
4947         Assert(x.sign == NUMERIC_POS);
4948
4949         local_rscale = rscale + 8;
4950
4951         /* Reduce input into range 0 <= x <= 0.01 */
4952         while (cmp_var(&x, &const_zero_point_01) > 0)
4953         {
4954                 ndiv2++;
4955                 local_rscale++;
4956                 mul_var(&x, &const_zero_point_five, &x, x.dscale + 1);
4957         }
4958
4959         /*
4960          * Use the Taylor series
4961          *
4962          * exp(x) = 1 + x + x^2/2! + x^3/3! + ...
4963          *
4964          * Given the limited range of x, this should converge reasonably quickly.
4965          * We run the series until the terms fall below the local_rscale limit.
4966          */
4967         add_var(&const_one, &x, result);
4968         set_var_from_var(&x, &xpow);
4969         set_var_from_var(&const_one, &ifac);
4970         set_var_from_var(&const_one, &ni);
4971
4972         for (;;)
4973         {
4974                 add_var(&ni, &const_one, &ni);
4975                 mul_var(&xpow, &x, &xpow, local_rscale);
4976                 mul_var(&ifac, &ni, &ifac, 0);
4977                 div_var_fast(&xpow, &ifac, &elem, local_rscale, true);
4978
4979                 if (elem.ndigits == 0)
4980                         break;
4981
4982                 add_var(result, &elem, result);
4983         }
4984
4985         /* Compensate for argument range reduction */
4986         while (ndiv2-- > 0)
4987                 mul_var(result, result, result, local_rscale);
4988
4989         free_var(&x);
4990         free_var(&xpow);
4991         free_var(&ifac);
4992         free_var(&elem);
4993         free_var(&ni);
4994 }
4995
4996
4997 /*
4998  * ln_var() -
4999  *
5000  *      Compute the natural log of x
5001  */
5002 static void
5003 ln_var(NumericVar *arg, NumericVar *result, int rscale)
5004 {
5005         NumericVar      x;
5006         NumericVar      xx;
5007         NumericVar      ni;
5008         NumericVar      elem;
5009         NumericVar      fact;
5010         int                     local_rscale;
5011         int                     cmp;
5012
5013         cmp = cmp_var(arg, &const_zero);
5014         if (cmp == 0)
5015                 ereport(ERROR,
5016                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
5017                                  errmsg("cannot take logarithm of zero")));
5018         else if (cmp < 0)
5019                 ereport(ERROR,
5020                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
5021                                  errmsg("cannot take logarithm of a negative number")));
5022
5023         local_rscale = rscale + 8;
5024
5025         init_var(&x);
5026         init_var(&xx);
5027         init_var(&ni);
5028         init_var(&elem);
5029         init_var(&fact);
5030
5031         set_var_from_var(arg, &x);
5032         set_var_from_var(&const_two, &fact);
5033
5034         /* Reduce input into range 0.9 < x < 1.1 */
5035         while (cmp_var(&x, &const_zero_point_nine) <= 0)
5036         {
5037                 local_rscale++;
5038                 sqrt_var(&x, &x, local_rscale);
5039                 mul_var(&fact, &const_two, &fact, 0);
5040         }
5041         while (cmp_var(&x, &const_one_point_one) >= 0)
5042         {
5043                 local_rscale++;
5044                 sqrt_var(&x, &x, local_rscale);
5045                 mul_var(&fact, &const_two, &fact, 0);
5046         }
5047
5048         /*
5049          * We use the Taylor series for 0.5 * ln((1+z)/(1-z)),
5050          *
5051          * z + z^3/3 + z^5/5 + ...
5052          *
5053          * where z = (x-1)/(x+1) is in the range (approximately) -0.053 .. 0.048
5054          * due to the above range-reduction of x.
5055          *
5056          * The convergence of this is not as fast as one would like, but is
5057          * tolerable given that z is small.
5058          */
5059         sub_var(&x, &const_one, result);
5060         add_var(&x, &const_one, &elem);
5061         div_var_fast(result, &elem, result, local_rscale, true);
5062         set_var_from_var(result, &xx);
5063         mul_var(result, result, &x, local_rscale);
5064
5065         set_var_from_var(&const_one, &ni);
5066
5067         for (;;)
5068         {
5069                 add_var(&ni, &const_two, &ni);
5070                 mul_var(&xx, &x, &xx, local_rscale);
5071                 div_var_fast(&xx, &ni, &elem, local_rscale, true);
5072
5073                 if (elem.ndigits == 0)
5074                         break;
5075
5076                 add_var(result, &elem, result);
5077
5078                 if (elem.weight < (result->weight - local_rscale * 2 / DEC_DIGITS))
5079                         break;
5080         }
5081
5082         /* Compensate for argument range reduction, round to requested rscale */
5083         mul_var(result, &fact, result, rscale);
5084
5085         free_var(&x);
5086         free_var(&xx);
5087         free_var(&ni);
5088         free_var(&elem);
5089         free_var(&fact);
5090 }
5091
5092
5093 /*
5094  * log_var() -
5095  *
5096  *      Compute the logarithm of num in a given base.
5097  *
5098  *      Note: this routine chooses dscale of the result.
5099  */
5100 static void
5101 log_var(NumericVar *base, NumericVar *num, NumericVar *result)
5102 {
5103         NumericVar      ln_base;
5104         NumericVar      ln_num;
5105         int                     dec_digits;
5106         int                     rscale;
5107         int                     local_rscale;
5108
5109         init_var(&ln_base);
5110         init_var(&ln_num);
5111
5112         /* Set scale for ln() calculations --- compare numeric_ln() */
5113
5114         /* Approx decimal digits before decimal point */
5115         dec_digits = (num->weight + 1) * DEC_DIGITS;
5116
5117         if (dec_digits > 1)
5118                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
5119         else if (dec_digits < 1)
5120                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
5121         else
5122                 rscale = NUMERIC_MIN_SIG_DIGITS;
5123
5124         rscale = Max(rscale, base->dscale);
5125         rscale = Max(rscale, num->dscale);
5126         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
5127         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
5128
5129         local_rscale = rscale + 8;
5130
5131         /* Form natural logarithms */
5132         ln_var(base, &ln_base, local_rscale);
5133         ln_var(num, &ln_num, local_rscale);
5134
5135         ln_base.dscale = rscale;
5136         ln_num.dscale = rscale;
5137
5138         /* Select scale for division result */
5139         rscale = select_div_scale(&ln_num, &ln_base);
5140
5141         div_var_fast(&ln_num, &ln_base, result, rscale, true);
5142
5143         free_var(&ln_num);
5144         free_var(&ln_base);
5145 }
5146
5147
5148 /*
5149  * power_var() -
5150  *
5151  *      Raise base to the power of exp
5152  *
5153  *      Note: this routine chooses dscale of the result.
5154  */
5155 static void
5156 power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
5157 {
5158         NumericVar      ln_base;
5159         NumericVar      ln_num;
5160         int                     dec_digits;
5161         int                     rscale;
5162         int                     local_rscale;
5163         double          val;
5164
5165         /* If exp can be represented as an integer, use power_var_int */
5166         if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1)
5167         {
5168                 /* exact integer, but does it fit in int? */
5169                 NumericVar      x;
5170                 int64           expval64;
5171
5172                 /* must copy because numericvar_to_int8() scribbles on input */
5173                 init_var(&x);
5174                 set_var_from_var(exp, &x);
5175                 if (numericvar_to_int8(&x, &expval64))
5176                 {
5177                         int                     expval = (int) expval64;
5178
5179                         /* Test for overflow by reverse-conversion. */
5180                         if ((int64) expval == expval64)
5181                         {
5182                                 /* Okay, select rscale */
5183                                 rscale = NUMERIC_MIN_SIG_DIGITS;
5184                                 rscale = Max(rscale, base->dscale);
5185                                 rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
5186                                 rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
5187
5188                                 power_var_int(base, expval, result, rscale);
5189
5190                                 free_var(&x);
5191                                 return;
5192                         }
5193                 }
5194                 free_var(&x);
5195         }
5196
5197         init_var(&ln_base);
5198         init_var(&ln_num);
5199
5200         /* Set scale for ln() calculation --- need extra accuracy here */
5201
5202         /* Approx decimal digits before decimal point */
5203         dec_digits = (base->weight + 1) * DEC_DIGITS;
5204
5205         if (dec_digits > 1)
5206                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(dec_digits - 1);
5207         else if (dec_digits < 1)
5208                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(1 - dec_digits);
5209         else
5210                 rscale = NUMERIC_MIN_SIG_DIGITS * 2;
5211
5212         rscale = Max(rscale, base->dscale * 2);
5213         rscale = Max(rscale, exp->dscale * 2);
5214         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2);
5215         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2);
5216
5217         local_rscale = rscale + 8;
5218
5219         ln_var(base, &ln_base, local_rscale);
5220
5221         mul_var(&ln_base, exp, &ln_num, local_rscale);
5222
5223         /* Set scale for exp() -- compare numeric_exp() */
5224
5225         /* convert input to float8, ignoring overflow */
5226         val = numericvar_to_double_no_overflow(&ln_num);
5227
5228         /*
5229          * log10(result) = num * log10(e), so this is approximately the weight:
5230          */
5231         val *= 0.434294481903252;
5232
5233         /* limit to something that won't cause integer overflow */
5234         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
5235         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
5236
5237         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
5238         rscale = Max(rscale, base->dscale);
5239         rscale = Max(rscale, exp->dscale);
5240         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
5241         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
5242
5243         exp_var(&ln_num, result, rscale);
5244
5245         free_var(&ln_num);
5246         free_var(&ln_base);
5247 }
5248
5249 /*
5250  * power_var_int() -
5251  *
5252  *      Raise base to the power of exp, where exp is an integer.
5253  */
5254 static void
5255 power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale)
5256 {
5257         bool            neg;
5258         NumericVar      base_prod;
5259         int                     local_rscale;
5260
5261         /* Detect some special cases, particularly 0^0. */
5262
5263         switch (exp)
5264         {
5265                 case 0:
5266                         if (base->ndigits == 0)
5267                                 ereport(ERROR,
5268                                                 (errcode(ERRCODE_FLOATING_POINT_EXCEPTION),
5269                                                  errmsg("zero raised to zero is undefined")));
5270                         set_var_from_var(&const_one, result);
5271                         result->dscale = rscale;        /* no need to round */
5272                         return;
5273                 case 1:
5274                         set_var_from_var(base, result);
5275                         round_var(result, rscale);
5276                         return;
5277                 case -1:
5278                         div_var(&const_one, base, result, rscale, true);
5279                         return;
5280                 case 2:
5281                         mul_var(base, base, result, rscale);
5282                         return;
5283                 default:
5284                         break;
5285         }
5286
5287         /*
5288          * The general case repeatedly multiplies base according to the bit
5289          * pattern of exp.      We do the multiplications with some extra precision.
5290          */
5291         neg = (exp < 0);
5292         exp = Abs(exp);
5293
5294         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
5295
5296         init_var(&base_prod);
5297         set_var_from_var(base, &base_prod);
5298
5299         if (exp & 1)
5300                 set_var_from_var(base, result);
5301         else
5302                 set_var_from_var(&const_one, result);
5303
5304         while ((exp >>= 1) > 0)
5305         {
5306                 mul_var(&base_prod, &base_prod, &base_prod, local_rscale);
5307                 if (exp & 1)
5308                         mul_var(&base_prod, result, result, local_rscale);
5309         }
5310
5311         free_var(&base_prod);
5312
5313         /* Compensate for input sign, and round to requested rscale */
5314         if (neg)
5315                 div_var_fast(&const_one, result, result, rscale, true);
5316         else
5317                 round_var(result, rscale);
5318 }
5319
5320
5321 /* ----------------------------------------------------------------------
5322  *
5323  * Following are the lowest level functions that operate unsigned
5324  * on the variable level
5325  *
5326  * ----------------------------------------------------------------------
5327  */
5328
5329
5330 /* ----------
5331  * cmp_abs() -
5332  *
5333  *      Compare the absolute values of var1 and var2
5334  *      Returns:        -1 for ABS(var1) < ABS(var2)
5335  *                              0  for ABS(var1) == ABS(var2)
5336  *                              1  for ABS(var1) > ABS(var2)
5337  * ----------
5338  */
5339 static int
5340 cmp_abs(NumericVar *var1, NumericVar *var2)
5341 {
5342         return cmp_abs_common(var1->digits, var1->ndigits, var1->weight,
5343                                                   var2->digits, var2->ndigits, var2->weight);
5344 }
5345
5346 /* ----------
5347  * cmp_abs_common() -
5348  *
5349  *      Main routine of cmp_abs(). This function can be used by both
5350  *      NumericVar and Numeric.
5351  * ----------
5352  */
5353 static int
5354 cmp_abs_common(const NumericDigit *var1digits, int var1ndigits, int var1weight,
5355                          const NumericDigit *var2digits, int var2ndigits, int var2weight)
5356 {
5357         int                     i1 = 0;
5358         int                     i2 = 0;
5359
5360         /* Check any digits before the first common digit */
5361
5362         while (var1weight > var2weight && i1 < var1ndigits)
5363         {
5364                 if (var1digits[i1++] != 0)
5365                         return 1;
5366                 var1weight--;
5367         }
5368         while (var2weight > var1weight && i2 < var2ndigits)
5369         {
5370                 if (var2digits[i2++] != 0)
5371                         return -1;
5372                 var2weight--;
5373         }
5374
5375         /* At this point, either w1 == w2 or we've run out of digits */
5376
5377         if (var1weight == var2weight)
5378         {
5379                 while (i1 < var1ndigits && i2 < var2ndigits)
5380                 {
5381                         int                     stat = var1digits[i1++] - var2digits[i2++];
5382
5383                         if (stat)
5384                         {
5385                                 if (stat > 0)
5386                                         return 1;
5387                                 return -1;
5388                         }
5389                 }
5390         }
5391
5392         /*
5393          * At this point, we've run out of digits on one side or the other; so any
5394          * remaining nonzero digits imply that side is larger
5395          */
5396         while (i1 < var1ndigits)
5397         {
5398                 if (var1digits[i1++] != 0)
5399                         return 1;
5400         }
5401         while (i2 < var2ndigits)
5402         {
5403                 if (var2digits[i2++] != 0)
5404                         return -1;
5405         }
5406
5407         return 0;
5408 }
5409
5410
5411 /*
5412  * add_abs() -
5413  *
5414  *      Add the absolute values of two variables into result.
5415  *      result might point to one of the operands without danger.
5416  */
5417 static void
5418 add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5419 {
5420         NumericDigit *res_buf;
5421         NumericDigit *res_digits;
5422         int                     res_ndigits;
5423         int                     res_weight;
5424         int                     res_rscale,
5425                                 rscale1,
5426                                 rscale2;
5427         int                     res_dscale;
5428         int                     i,
5429                                 i1,
5430                                 i2;
5431         int                     carry = 0;
5432
5433         /* copy these values into local vars for speed in inner loop */
5434         int                     var1ndigits = var1->ndigits;
5435         int                     var2ndigits = var2->ndigits;
5436         NumericDigit *var1digits = var1->digits;
5437         NumericDigit *var2digits = var2->digits;
5438
5439         res_weight = Max(var1->weight, var2->weight) + 1;
5440
5441         res_dscale = Max(var1->dscale, var2->dscale);
5442
5443         /* Note: here we are figuring rscale in base-NBASE digits */
5444         rscale1 = var1->ndigits - var1->weight - 1;
5445         rscale2 = var2->ndigits - var2->weight - 1;
5446         res_rscale = Max(rscale1, rscale2);
5447
5448         res_ndigits = res_rscale + res_weight + 1;
5449         if (res_ndigits <= 0)
5450                 res_ndigits = 1;
5451
5452         res_buf = digitbuf_alloc(res_ndigits + 1);
5453         res_buf[0] = 0;                         /* spare digit for later rounding */
5454         res_digits = res_buf + 1;
5455
5456         i1 = res_rscale + var1->weight + 1;
5457         i2 = res_rscale + var2->weight + 1;
5458         for (i = res_ndigits - 1; i >= 0; i--)
5459         {
5460                 i1--;
5461                 i2--;
5462                 if (i1 >= 0 && i1 < var1ndigits)
5463                         carry += var1digits[i1];
5464                 if (i2 >= 0 && i2 < var2ndigits)
5465                         carry += var2digits[i2];
5466
5467                 if (carry >= NBASE)
5468                 {
5469                         res_digits[i] = carry - NBASE;
5470                         carry = 1;
5471                 }
5472                 else
5473                 {
5474                         res_digits[i] = carry;
5475                         carry = 0;
5476                 }
5477         }
5478
5479         Assert(carry == 0);                     /* else we failed to allow for carry out */
5480
5481         digitbuf_free(result->buf);
5482         result->ndigits = res_ndigits;
5483         result->buf = res_buf;
5484         result->digits = res_digits;
5485         result->weight = res_weight;
5486         result->dscale = res_dscale;
5487
5488         /* Remove leading/trailing zeroes */
5489         strip_var(result);
5490 }
5491
5492
5493 /*
5494  * sub_abs()
5495  *
5496  *      Subtract the absolute value of var2 from the absolute value of var1
5497  *      and store in result. result might point to one of the operands
5498  *      without danger.
5499  *
5500  *      ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!!
5501  */
5502 static void
5503 sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5504 {
5505         NumericDigit *res_buf;
5506         NumericDigit *res_digits;
5507         int                     res_ndigits;
5508         int                     res_weight;
5509         int                     res_rscale,
5510                                 rscale1,
5511                                 rscale2;
5512         int                     res_dscale;
5513         int                     i,
5514                                 i1,
5515                                 i2;
5516         int                     borrow = 0;
5517
5518         /* copy these values into local vars for speed in inner loop */
5519         int                     var1ndigits = var1->ndigits;
5520         int                     var2ndigits = var2->ndigits;
5521         NumericDigit *var1digits = var1->digits;
5522         NumericDigit *var2digits = var2->digits;
5523
5524         res_weight = var1->weight;
5525
5526         res_dscale = Max(var1->dscale, var2->dscale);
5527
5528         /* Note: here we are figuring rscale in base-NBASE digits */
5529         rscale1 = var1->ndigits - var1->weight - 1;
5530         rscale2 = var2->ndigits - var2->weight - 1;
5531         res_rscale = Max(rscale1, rscale2);
5532
5533         res_ndigits = res_rscale + res_weight + 1;
5534         if (res_ndigits <= 0)
5535                 res_ndigits = 1;
5536
5537         res_buf = digitbuf_alloc(res_ndigits + 1);
5538         res_buf[0] = 0;                         /* spare digit for later rounding */
5539         res_digits = res_buf + 1;
5540
5541         i1 = res_rscale + var1->weight + 1;
5542         i2 = res_rscale + var2->weight + 1;
5543         for (i = res_ndigits - 1; i >= 0; i--)
5544         {
5545                 i1--;
5546                 i2--;
5547                 if (i1 >= 0 && i1 < var1ndigits)
5548                         borrow += var1digits[i1];
5549                 if (i2 >= 0 && i2 < var2ndigits)
5550                         borrow -= var2digits[i2];
5551
5552                 if (borrow < 0)
5553                 {
5554                         res_digits[i] = borrow + NBASE;
5555                         borrow = -1;
5556                 }
5557                 else
5558                 {
5559                         res_digits[i] = borrow;
5560                         borrow = 0;
5561                 }
5562         }
5563
5564         Assert(borrow == 0);            /* else caller gave us var1 < var2 */
5565
5566         digitbuf_free(result->buf);
5567         result->ndigits = res_ndigits;
5568         result->buf = res_buf;
5569         result->digits = res_digits;
5570         result->weight = res_weight;
5571         result->dscale = res_dscale;
5572
5573         /* Remove leading/trailing zeroes */
5574         strip_var(result);
5575 }
5576
5577 /*
5578  * round_var
5579  *
5580  * Round the value of a variable to no more than rscale decimal digits
5581  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
5582  * rounding before the decimal point.
5583  */
5584 static void
5585 round_var(NumericVar *var, int rscale)
5586 {
5587         NumericDigit *digits = var->digits;
5588         int                     di;
5589         int                     ndigits;
5590         int                     carry;
5591
5592         var->dscale = rscale;
5593
5594         /* decimal digits wanted */
5595         di = (var->weight + 1) * DEC_DIGITS + rscale;
5596
5597         /*
5598          * If di = 0, the value loses all digits, but could round up to 1 if its
5599          * first extra digit is >= 5.  If di < 0 the result must be 0.
5600          */
5601         if (di < 0)
5602         {
5603                 var->ndigits = 0;
5604                 var->weight = 0;
5605                 var->sign = NUMERIC_POS;
5606         }
5607         else
5608         {
5609                 /* NBASE digits wanted */
5610                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5611
5612                 /* 0, or number of decimal digits to keep in last NBASE digit */
5613                 di %= DEC_DIGITS;
5614
5615                 if (ndigits < var->ndigits ||
5616                         (ndigits == var->ndigits && di > 0))
5617                 {
5618                         var->ndigits = ndigits;
5619
5620 #if DEC_DIGITS == 1
5621                         /* di must be zero */
5622                         carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5623 #else
5624                         if (di == 0)
5625                                 carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5626                         else
5627                         {
5628                                 /* Must round within last NBASE digit */
5629                                 int                     extra,
5630                                                         pow10;
5631
5632 #if DEC_DIGITS == 4
5633                                 pow10 = round_powers[di];
5634 #elif DEC_DIGITS == 2
5635                                 pow10 = 10;
5636 #else
5637 #error unsupported NBASE
5638 #endif
5639                                 extra = digits[--ndigits] % pow10;
5640                                 digits[ndigits] -= extra;
5641                                 carry = 0;
5642                                 if (extra >= pow10 / 2)
5643                                 {
5644                                         pow10 += digits[ndigits];
5645                                         if (pow10 >= NBASE)
5646                                         {
5647                                                 pow10 -= NBASE;
5648                                                 carry = 1;
5649                                         }
5650                                         digits[ndigits] = pow10;
5651                                 }
5652                         }
5653 #endif
5654
5655                         /* Propagate carry if needed */
5656                         while (carry)
5657                         {
5658                                 carry += digits[--ndigits];
5659                                 if (carry >= NBASE)
5660                                 {
5661                                         digits[ndigits] = carry - NBASE;
5662                                         carry = 1;
5663                                 }
5664                                 else
5665                                 {
5666                                         digits[ndigits] = carry;
5667                                         carry = 0;
5668                                 }
5669                         }
5670
5671                         if (ndigits < 0)
5672                         {
5673                                 Assert(ndigits == -1);  /* better not have added > 1 digit */
5674                                 Assert(var->digits > var->buf);
5675                                 var->digits--;
5676                                 var->ndigits++;
5677                                 var->weight++;
5678                         }
5679                 }
5680         }
5681 }
5682
5683 /*
5684  * trunc_var
5685  *
5686  * Truncate (towards zero) the value of a variable at rscale decimal digits
5687  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
5688  * truncation before the decimal point.
5689  */
5690 static void
5691 trunc_var(NumericVar *var, int rscale)
5692 {
5693         int                     di;
5694         int                     ndigits;
5695
5696         var->dscale = rscale;
5697
5698         /* decimal digits wanted */
5699         di = (var->weight + 1) * DEC_DIGITS + rscale;
5700
5701         /*
5702          * If di <= 0, the value loses all digits.
5703          */
5704         if (di <= 0)
5705         {
5706                 var->ndigits = 0;
5707                 var->weight = 0;
5708                 var->sign = NUMERIC_POS;
5709         }
5710         else
5711         {
5712                 /* NBASE digits wanted */
5713                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5714
5715                 if (ndigits <= var->ndigits)
5716                 {
5717                         var->ndigits = ndigits;
5718
5719 #if DEC_DIGITS == 1
5720                         /* no within-digit stuff to worry about */
5721 #else
5722                         /* 0, or number of decimal digits to keep in last NBASE digit */
5723                         di %= DEC_DIGITS;
5724
5725                         if (di > 0)
5726                         {
5727                                 /* Must truncate within last NBASE digit */
5728                                 NumericDigit *digits = var->digits;
5729                                 int                     extra,
5730                                                         pow10;
5731
5732 #if DEC_DIGITS == 4
5733                                 pow10 = round_powers[di];
5734 #elif DEC_DIGITS == 2
5735                                 pow10 = 10;
5736 #else
5737 #error unsupported NBASE
5738 #endif
5739                                 extra = digits[--ndigits] % pow10;
5740                                 digits[ndigits] -= extra;
5741                         }
5742 #endif
5743                 }
5744         }
5745 }
5746
5747 /*
5748  * strip_var
5749  *
5750  * Strip any leading and trailing zeroes from a numeric variable
5751  */
5752 static void
5753 strip_var(NumericVar *var)
5754 {
5755         NumericDigit *digits = var->digits;
5756         int                     ndigits = var->ndigits;
5757
5758         /* Strip leading zeroes */
5759         while (ndigits > 0 && *digits == 0)
5760         {
5761                 digits++;
5762                 var->weight--;
5763                 ndigits--;
5764         }
5765
5766         /* Strip trailing zeroes */
5767         while (ndigits > 0 && digits[ndigits - 1] == 0)
5768                 ndigits--;
5769
5770         /* If it's zero, normalize the sign and weight */
5771         if (ndigits == 0)
5772         {
5773                 var->sign = NUMERIC_POS;
5774                 var->weight = 0;
5775         }
5776
5777         var->digits = digits;
5778         var->ndigits = ndigits;
5779 }