]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/numeric.c
Update C comments to mention SQL:2003 handling of power return values.
[postgresql] / src / backend / utils / adt / numeric.c
1 /*-------------------------------------------------------------------------
2  *
3  * numeric.c
4  *        An exact numeric data type for the Postgres database system
5  *
6  * Original coding 1998, Jan Wieck.  Heavily revised 2003, Tom Lane.
7  *
8  * Many of the algorithmic ideas are borrowed from David M. Smith's "FM"
9  * multiple-precision math library, most recently published as Algorithm
10  * 786: Multiple-Precision Complex Arithmetic and Functions, ACM
11  * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998,
12  * pages 359-367.
13  *
14  * Copyright (c) 1998-2008, PostgreSQL Global Development Group
15  *
16  * IDENTIFICATION
17  *        $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.113 2008/05/09 15:36:06 momjian Exp $
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include <ctype.h>
25 #include <float.h>
26 #include <limits.h>
27 #include <math.h>
28
29 #include "access/hash.h"
30 #include "catalog/pg_type.h"
31 #include "libpq/pqformat.h"
32 #include "miscadmin.h"
33 #include "utils/array.h"
34 #include "utils/builtins.h"
35 #include "utils/int8.h"
36 #include "utils/numeric.h"
37
38 /* ----------
39  * Uncomment the following to enable compilation of dump_numeric()
40  * and dump_var() and to get a dump of any result produced by make_result().
41  * ----------
42 #define NUMERIC_DEBUG
43  */
44
45
46 /* ----------
47  * Local data types
48  *
49  * Numeric values are represented in a base-NBASE floating point format.
50  * Each "digit" ranges from 0 to NBASE-1.  The type NumericDigit is signed
51  * and wide enough to store a digit.  We assume that NBASE*NBASE can fit in
52  * an int.      Although the purely calculational routines could handle any even
53  * NBASE that's less than sqrt(INT_MAX), in practice we are only interested
54  * in NBASE a power of ten, so that I/O conversions and decimal rounding
55  * are easy.  Also, it's actually more efficient if NBASE is rather less than
56  * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var_fast to
57  * postpone processing carries.
58  * ----------
59  */
60
61 #if 0
62 #define NBASE           10
63 #define HALF_NBASE      5
64 #define DEC_DIGITS      1                       /* decimal digits per NBASE digit */
65 #define MUL_GUARD_DIGITS        4       /* these are measured in NBASE digits */
66 #define DIV_GUARD_DIGITS        8
67
68 typedef signed char NumericDigit;
69 #endif
70
71 #if 0
72 #define NBASE           100
73 #define HALF_NBASE      50
74 #define DEC_DIGITS      2                       /* decimal digits per NBASE digit */
75 #define MUL_GUARD_DIGITS        3       /* these are measured in NBASE digits */
76 #define DIV_GUARD_DIGITS        6
77
78 typedef signed char NumericDigit;
79 #endif
80
81 #if 1
82 #define NBASE           10000
83 #define HALF_NBASE      5000
84 #define DEC_DIGITS      4                       /* decimal digits per NBASE digit */
85 #define MUL_GUARD_DIGITS        2       /* these are measured in NBASE digits */
86 #define DIV_GUARD_DIGITS        4
87
88 typedef int16 NumericDigit;
89 #endif
90
91
92 /* ----------
93  * NumericVar is the format we use for arithmetic.  The digit-array part
94  * is the same as the NumericData storage format, but the header is more
95  * complex.
96  *
97  * The value represented by a NumericVar is determined by the sign, weight,
98  * ndigits, and digits[] array.
99  * Note: the first digit of a NumericVar's value is assumed to be multiplied
100  * by NBASE ** weight.  Another way to say it is that there are weight+1
101  * digits before the decimal point.  It is possible to have weight < 0.
102  *
103  * buf points at the physical start of the palloc'd digit buffer for the
104  * NumericVar.  digits points at the first digit in actual use (the one
105  * with the specified weight).  We normally leave an unused digit or two
106  * (preset to zeroes) between buf and digits, so that there is room to store
107  * a carry out of the top digit without reallocating space.  We just need to
108  * decrement digits (and increment weight) to make room for the carry digit.
109  * (There is no such extra space in a numeric value stored in the database,
110  * only in a NumericVar in memory.)
111  *
112  * If buf is NULL then the digit buffer isn't actually palloc'd and should
113  * not be freed --- see the constants below for an example.
114  *
115  * dscale, or display scale, is the nominal precision expressed as number
116  * of digits after the decimal point (it must always be >= 0 at present).
117  * dscale may be more than the number of physically stored fractional digits,
118  * implying that we have suppressed storage of significant trailing zeroes.
119  * It should never be less than the number of stored digits, since that would
120  * imply hiding digits that are present.  NOTE that dscale is always expressed
121  * in *decimal* digits, and so it may correspond to a fractional number of
122  * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits.
123  *
124  * rscale, or result scale, is the target precision for a computation.
125  * Like dscale it is expressed as number of *decimal* digits after the decimal
126  * point, and is always >= 0 at present.
127  * Note that rscale is not stored in variables --- it's figured on-the-fly
128  * from the dscales of the inputs.
129  *
130  * NB: All the variable-level functions are written in a style that makes it
131  * possible to give one and the same variable as argument and destination.
132  * This is feasible because the digit buffer is separate from the variable.
133  * ----------
134  */
135 typedef struct NumericVar
136 {
137         int                     ndigits;                /* # of digits in digits[] - can be 0! */
138         int                     weight;                 /* weight of first digit */
139         int                     sign;                   /* NUMERIC_POS, NUMERIC_NEG, or NUMERIC_NAN */
140         int                     dscale;                 /* display scale */
141         NumericDigit *buf;                      /* start of palloc'd space for digits[] */
142         NumericDigit *digits;           /* base-NBASE digits */
143 } NumericVar;
144
145
146 /* ----------
147  * Some preinitialized constants
148  * ----------
149  */
150 static NumericDigit const_zero_data[1] = {0};
151 static NumericVar const_zero =
152 {0, 0, NUMERIC_POS, 0, NULL, const_zero_data};
153
154 static NumericDigit const_one_data[1] = {1};
155 static NumericVar const_one =
156 {1, 0, NUMERIC_POS, 0, NULL, const_one_data};
157
158 static NumericDigit const_two_data[1] = {2};
159 static NumericVar const_two =
160 {1, 0, NUMERIC_POS, 0, NULL, const_two_data};
161
162 #if DEC_DIGITS == 4
163 static NumericDigit const_zero_point_five_data[1] = {5000};
164 #elif DEC_DIGITS == 2
165 static NumericDigit const_zero_point_five_data[1] = {50};
166 #elif DEC_DIGITS == 1
167 static NumericDigit const_zero_point_five_data[1] = {5};
168 #endif
169 static NumericVar const_zero_point_five =
170 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data};
171
172 #if DEC_DIGITS == 4
173 static NumericDigit const_zero_point_nine_data[1] = {9000};
174 #elif DEC_DIGITS == 2
175 static NumericDigit const_zero_point_nine_data[1] = {90};
176 #elif DEC_DIGITS == 1
177 static NumericDigit const_zero_point_nine_data[1] = {9};
178 #endif
179 static NumericVar const_zero_point_nine =
180 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data};
181
182 #if DEC_DIGITS == 4
183 static NumericDigit const_zero_point_01_data[1] = {100};
184 static NumericVar const_zero_point_01 =
185 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
186 #elif DEC_DIGITS == 2
187 static NumericDigit const_zero_point_01_data[1] = {1};
188 static NumericVar const_zero_point_01 =
189 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
190 #elif DEC_DIGITS == 1
191 static NumericDigit const_zero_point_01_data[1] = {1};
192 static NumericVar const_zero_point_01 =
193 {1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
194 #endif
195
196 #if DEC_DIGITS == 4
197 static NumericDigit const_one_point_one_data[2] = {1, 1000};
198 #elif DEC_DIGITS == 2
199 static NumericDigit const_one_point_one_data[2] = {1, 10};
200 #elif DEC_DIGITS == 1
201 static NumericDigit const_one_point_one_data[2] = {1, 1};
202 #endif
203 static NumericVar const_one_point_one =
204 {2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data};
205
206 static NumericVar const_nan =
207 {0, 0, NUMERIC_NAN, 0, NULL, NULL};
208
209 #if DEC_DIGITS == 4
210 static const int round_powers[4] = {0, 1000, 100, 10};
211 #endif
212
213
214 /* ----------
215  * Local functions
216  * ----------
217  */
218
219 #ifdef NUMERIC_DEBUG
220 static void dump_numeric(const char *str, Numeric num);
221 static void dump_var(const char *str, NumericVar *var);
222 #else
223 #define dump_numeric(s,n)
224 #define dump_var(s,v)
225 #endif
226
227 #define digitbuf_alloc(ndigits)  \
228         ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit)))
229 #define digitbuf_free(buf)      \
230         do { \
231                  if ((buf) != NULL) \
232                          pfree(buf); \
233         } while (0)
234
235 #define init_var(v)             MemSetAligned(v, 0, sizeof(NumericVar))
236
237 #define NUMERIC_DIGITS(num) ((NumericDigit *)(num)->n_data)
238 #define NUMERIC_NDIGITS(num) \
239         ((VARSIZE(num) - NUMERIC_HDRSZ) / sizeof(NumericDigit))
240
241 static void alloc_var(NumericVar *var, int ndigits);
242 static void free_var(NumericVar *var);
243 static void zero_var(NumericVar *var);
244
245 static void set_var_from_str(const char *str, NumericVar *dest);
246 static void set_var_from_num(Numeric value, NumericVar *dest);
247 static void set_var_from_var(NumericVar *value, NumericVar *dest);
248 static char *get_str_from_var(NumericVar *var, int dscale);
249
250 static Numeric make_result(NumericVar *var);
251
252 static void apply_typmod(NumericVar *var, int32 typmod);
253
254 static int32 numericvar_to_int4(NumericVar *var);
255 static bool numericvar_to_int8(NumericVar *var, int64 *result);
256 static void int8_to_numericvar(int64 val, NumericVar *var);
257 static double numeric_to_double_no_overflow(Numeric num);
258 static double numericvar_to_double_no_overflow(NumericVar *var);
259
260 static int      cmp_numerics(Numeric num1, Numeric num2);
261 static int      cmp_var(NumericVar *var1, NumericVar *var2);
262 static int cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
263                            int var1weight, int var1sign,
264                            const NumericDigit *var2digits, int var2ndigits,
265                            int var2weight, int var2sign);
266 static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
267 static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
268 static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
269                 int rscale);
270 static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
271                 int rscale, bool round);
272 static void div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result,
273                 int rscale, bool round);
274 static int      select_div_scale(NumericVar *var1, NumericVar *var2);
275 static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
276 static void ceil_var(NumericVar *var, NumericVar *result);
277 static void floor_var(NumericVar *var, NumericVar *result);
278
279 static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale);
280 static void exp_var(NumericVar *arg, NumericVar *result, int rscale);
281 static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale);
282 static void ln_var(NumericVar *arg, NumericVar *result, int rscale);
283 static void log_var(NumericVar *base, NumericVar *num, NumericVar *result);
284 static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result);
285 static void power_var_int(NumericVar *base, int exp, NumericVar *result,
286                           int rscale);
287
288 static int      cmp_abs(NumericVar *var1, NumericVar *var2);
289 static int cmp_abs_common(const NumericDigit *var1digits, int var1ndigits,
290                            int var1weight,
291                            const NumericDigit *var2digits, int var2ndigits,
292                            int var2weight);
293 static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
294 static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
295 static void round_var(NumericVar *var, int rscale);
296 static void trunc_var(NumericVar *var, int rscale);
297 static void strip_var(NumericVar *var);
298 static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
299                            NumericVar *count_var, NumericVar *result_var);
300
301
302 /* ----------------------------------------------------------------------
303  *
304  * Input-, output- and rounding-functions
305  *
306  * ----------------------------------------------------------------------
307  */
308
309
310 /*
311  * numeric_in() -
312  *
313  *      Input function for numeric data type
314  */
315 Datum
316 numeric_in(PG_FUNCTION_ARGS)
317 {
318         char       *str = PG_GETARG_CSTRING(0);
319
320 #ifdef NOT_USED
321         Oid                     typelem = PG_GETARG_OID(1);
322 #endif
323         int32           typmod = PG_GETARG_INT32(2);
324         NumericVar      value;
325         Numeric         res;
326
327         /*
328          * Check for NaN
329          */
330         if (pg_strcasecmp(str, "NaN") == 0)
331                 PG_RETURN_NUMERIC(make_result(&const_nan));
332
333         /*
334          * Use set_var_from_str() to parse the input string and return it in the
335          * packed DB storage format
336          */
337         init_var(&value);
338         set_var_from_str(str, &value);
339
340         apply_typmod(&value, typmod);
341
342         res = make_result(&value);
343         free_var(&value);
344
345         PG_RETURN_NUMERIC(res);
346 }
347
348
349 /*
350  * numeric_out() -
351  *
352  *      Output function for numeric data type
353  */
354 Datum
355 numeric_out(PG_FUNCTION_ARGS)
356 {
357         Numeric         num = PG_GETARG_NUMERIC(0);
358         NumericVar      x;
359         char       *str;
360
361         /*
362          * Handle NaN
363          */
364         if (NUMERIC_IS_NAN(num))
365                 PG_RETURN_CSTRING(pstrdup("NaN"));
366
367         /*
368          * Get the number in the variable format.
369          *
370          * Even if we didn't need to change format, we'd still need to copy the
371          * value to have a modifiable copy for rounding.  set_var_from_num() also
372          * guarantees there is extra digit space in case we produce a carry out
373          * from rounding.
374          */
375         init_var(&x);
376         set_var_from_num(num, &x);
377
378         str = get_str_from_var(&x, x.dscale);
379
380         free_var(&x);
381
382         PG_RETURN_CSTRING(str);
383 }
384
385 /*
386  *              numeric_recv                    - converts external binary format to numeric
387  *
388  * External format is a sequence of int16's:
389  * ndigits, weight, sign, dscale, NumericDigits.
390  */
391 Datum
392 numeric_recv(PG_FUNCTION_ARGS)
393 {
394         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
395
396 #ifdef NOT_USED
397         Oid                     typelem = PG_GETARG_OID(1);
398 #endif
399         int32           typmod = PG_GETARG_INT32(2);
400         NumericVar      value;
401         Numeric         res;
402         int                     len,
403                                 i;
404
405         init_var(&value);
406
407         len = (uint16) pq_getmsgint(buf, sizeof(uint16));
408         if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE)
409                 ereport(ERROR,
410                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
411                                  errmsg("invalid length in external \"numeric\" value")));
412
413         alloc_var(&value, len);
414
415         value.weight = (int16) pq_getmsgint(buf, sizeof(int16));
416         value.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
417         if (!(value.sign == NUMERIC_POS ||
418                   value.sign == NUMERIC_NEG ||
419                   value.sign == NUMERIC_NAN))
420                 ereport(ERROR,
421                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
422                                  errmsg("invalid sign in external \"numeric\" value")));
423
424         value.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
425         for (i = 0; i < len; i++)
426         {
427                 NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
428
429                 if (d < 0 || d >= NBASE)
430                         ereport(ERROR,
431                                         (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
432                                          errmsg("invalid digit in external \"numeric\" value")));
433                 value.digits[i] = d;
434         }
435
436         apply_typmod(&value, typmod);
437
438         res = make_result(&value);
439         free_var(&value);
440
441         PG_RETURN_NUMERIC(res);
442 }
443
444 /*
445  *              numeric_send                    - converts numeric to binary format
446  */
447 Datum
448 numeric_send(PG_FUNCTION_ARGS)
449 {
450         Numeric         num = PG_GETARG_NUMERIC(0);
451         NumericVar      x;
452         StringInfoData buf;
453         int                     i;
454
455         init_var(&x);
456         set_var_from_num(num, &x);
457
458         pq_begintypsend(&buf);
459
460         pq_sendint(&buf, x.ndigits, sizeof(int16));
461         pq_sendint(&buf, x.weight, sizeof(int16));
462         pq_sendint(&buf, x.sign, sizeof(int16));
463         pq_sendint(&buf, x.dscale, sizeof(int16));
464         for (i = 0; i < x.ndigits; i++)
465                 pq_sendint(&buf, x.digits[i], sizeof(NumericDigit));
466
467         free_var(&x);
468
469         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
470 }
471
472
473 /*
474  * numeric() -
475  *
476  *      This is a special function called by the Postgres database system
477  *      before a value is stored in a tuple's attribute. The precision and
478  *      scale of the attribute have to be applied on the value.
479  */
480 Datum
481 numeric         (PG_FUNCTION_ARGS)
482 {
483         Numeric         num = PG_GETARG_NUMERIC(0);
484         int32           typmod = PG_GETARG_INT32(1);
485         Numeric         new;
486         int32           tmp_typmod;
487         int                     precision;
488         int                     scale;
489         int                     ddigits;
490         int                     maxdigits;
491         NumericVar      var;
492
493         /*
494          * Handle NaN
495          */
496         if (NUMERIC_IS_NAN(num))
497                 PG_RETURN_NUMERIC(make_result(&const_nan));
498
499         /*
500          * If the value isn't a valid type modifier, simply return a copy of the
501          * input value
502          */
503         if (typmod < (int32) (VARHDRSZ))
504         {
505                 new = (Numeric) palloc(VARSIZE(num));
506                 memcpy(new, num, VARSIZE(num));
507                 PG_RETURN_NUMERIC(new);
508         }
509
510         /*
511          * Get the precision and scale out of the typmod value
512          */
513         tmp_typmod = typmod - VARHDRSZ;
514         precision = (tmp_typmod >> 16) & 0xffff;
515         scale = tmp_typmod & 0xffff;
516         maxdigits = precision - scale;
517
518         /*
519          * If the number is certainly in bounds and due to the target scale no
520          * rounding could be necessary, just make a copy of the input and modify
521          * its scale fields.  (Note we assume the existing dscale is honest...)
522          */
523         ddigits = (num->n_weight + 1) * DEC_DIGITS;
524         if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num))
525         {
526                 new = (Numeric) palloc(VARSIZE(num));
527                 memcpy(new, num, VARSIZE(num));
528                 new->n_sign_dscale = NUMERIC_SIGN(new) |
529                         ((uint16) scale & NUMERIC_DSCALE_MASK);
530                 PG_RETURN_NUMERIC(new);
531         }
532
533         /*
534          * We really need to fiddle with things - unpack the number into a
535          * variable and let apply_typmod() do it.
536          */
537         init_var(&var);
538
539         set_var_from_num(num, &var);
540         apply_typmod(&var, typmod);
541         new = make_result(&var);
542
543         free_var(&var);
544
545         PG_RETURN_NUMERIC(new);
546 }
547
548 Datum
549 numerictypmodin(PG_FUNCTION_ARGS)
550 {
551         ArrayType  *ta = PG_GETARG_ARRAYTYPE_P(0);
552         int32      *tl;
553         int                     n;
554         int32           typmod;
555
556         tl = ArrayGetIntegerTypmods(ta, &n);
557
558         if (n == 2)
559         {
560                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
561                         ereport(ERROR,
562                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
563                                          errmsg("NUMERIC precision %d must be between 1 and %d",
564                                                         tl[0], NUMERIC_MAX_PRECISION)));
565                 if (tl[1] < 0 || tl[1] > tl[0])
566                         ereport(ERROR,
567                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
568                                 errmsg("NUMERIC scale %d must be between 0 and precision %d",
569                                            tl[1], tl[0])));
570                 typmod = ((tl[0] << 16) | tl[1]) + VARHDRSZ;
571         }
572         else if (n == 1)
573         {
574                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
575                         ereport(ERROR,
576                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
577                                          errmsg("NUMERIC precision %d must be between 1 and %d",
578                                                         tl[0], NUMERIC_MAX_PRECISION)));
579                 /* scale defaults to zero */
580                 typmod = (tl[0] << 16) + VARHDRSZ;
581         }
582         else
583         {
584                 ereport(ERROR,
585                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
586                                  errmsg("invalid NUMERIC type modifier")));
587                 typmod = 0;                             /* keep compiler quiet */
588         }
589
590         PG_RETURN_INT32(typmod);
591 }
592
593 Datum
594 numerictypmodout(PG_FUNCTION_ARGS)
595 {
596         int32           typmod = PG_GETARG_INT32(0);
597         char       *res = (char *) palloc(64);
598
599         if (typmod >= 0)
600                 snprintf(res, 64, "(%d,%d)",
601                                  ((typmod - VARHDRSZ) >> 16) & 0xffff,
602                                  (typmod - VARHDRSZ) & 0xffff);
603         else
604                 *res = '\0';
605
606         PG_RETURN_CSTRING(res);
607 }
608
609
610 /* ----------------------------------------------------------------------
611  *
612  * Sign manipulation, rounding and the like
613  *
614  * ----------------------------------------------------------------------
615  */
616
617 Datum
618 numeric_abs(PG_FUNCTION_ARGS)
619 {
620         Numeric         num = PG_GETARG_NUMERIC(0);
621         Numeric         res;
622
623         /*
624          * Handle NaN
625          */
626         if (NUMERIC_IS_NAN(num))
627                 PG_RETURN_NUMERIC(make_result(&const_nan));
628
629         /*
630          * Do it the easy way directly on the packed format
631          */
632         res = (Numeric) palloc(VARSIZE(num));
633         memcpy(res, num, VARSIZE(num));
634
635         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
636
637         PG_RETURN_NUMERIC(res);
638 }
639
640
641 Datum
642 numeric_uminus(PG_FUNCTION_ARGS)
643 {
644         Numeric         num = PG_GETARG_NUMERIC(0);
645         Numeric         res;
646
647         /*
648          * Handle NaN
649          */
650         if (NUMERIC_IS_NAN(num))
651                 PG_RETURN_NUMERIC(make_result(&const_nan));
652
653         /*
654          * Do it the easy way directly on the packed format
655          */
656         res = (Numeric) palloc(VARSIZE(num));
657         memcpy(res, num, VARSIZE(num));
658
659         /*
660          * The packed format is known to be totally zero digit trimmed always. So
661          * we can identify a ZERO by the fact that there are no digits at all.  Do
662          * nothing to a zero.
663          */
664         if (VARSIZE(num) != NUMERIC_HDRSZ)
665         {
666                 /* Else, flip the sign */
667                 if (NUMERIC_SIGN(num) == NUMERIC_POS)
668                         res->n_sign_dscale = NUMERIC_NEG | NUMERIC_DSCALE(num);
669                 else
670                         res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
671         }
672
673         PG_RETURN_NUMERIC(res);
674 }
675
676
677 Datum
678 numeric_uplus(PG_FUNCTION_ARGS)
679 {
680         Numeric         num = PG_GETARG_NUMERIC(0);
681         Numeric         res;
682
683         res = (Numeric) palloc(VARSIZE(num));
684         memcpy(res, num, VARSIZE(num));
685
686         PG_RETURN_NUMERIC(res);
687 }
688
689 /*
690  * numeric_sign() -
691  *
692  * returns -1 if the argument is less than 0, 0 if the argument is equal
693  * to 0, and 1 if the argument is greater than zero.
694  */
695 Datum
696 numeric_sign(PG_FUNCTION_ARGS)
697 {
698         Numeric         num = PG_GETARG_NUMERIC(0);
699         Numeric         res;
700         NumericVar      result;
701
702         /*
703          * Handle NaN
704          */
705         if (NUMERIC_IS_NAN(num))
706                 PG_RETURN_NUMERIC(make_result(&const_nan));
707
708         init_var(&result);
709
710         /*
711          * The packed format is known to be totally zero digit trimmed always. So
712          * we can identify a ZERO by the fact that there are no digits at all.
713          */
714         if (VARSIZE(num) == NUMERIC_HDRSZ)
715                 set_var_from_var(&const_zero, &result);
716         else
717         {
718                 /*
719                  * And if there are some, we return a copy of ONE with the sign of our
720                  * argument
721                  */
722                 set_var_from_var(&const_one, &result);
723                 result.sign = NUMERIC_SIGN(num);
724         }
725
726         res = make_result(&result);
727         free_var(&result);
728
729         PG_RETURN_NUMERIC(res);
730 }
731
732
733 /*
734  * numeric_round() -
735  *
736  *      Round a value to have 'scale' digits after the decimal point.
737  *      We allow negative 'scale', implying rounding before the decimal
738  *      point --- Oracle interprets rounding that way.
739  */
740 Datum
741 numeric_round(PG_FUNCTION_ARGS)
742 {
743         Numeric         num = PG_GETARG_NUMERIC(0);
744         int32           scale = PG_GETARG_INT32(1);
745         Numeric         res;
746         NumericVar      arg;
747
748         /*
749          * Handle NaN
750          */
751         if (NUMERIC_IS_NAN(num))
752                 PG_RETURN_NUMERIC(make_result(&const_nan));
753
754         /*
755          * Limit the scale value to avoid possible overflow in calculations
756          */
757         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
758         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
759
760         /*
761          * Unpack the argument and round it at the proper digit position
762          */
763         init_var(&arg);
764         set_var_from_num(num, &arg);
765
766         round_var(&arg, scale);
767
768         /* We don't allow negative output dscale */
769         if (scale < 0)
770                 arg.dscale = 0;
771
772         /*
773          * Return the rounded result
774          */
775         res = make_result(&arg);
776
777         free_var(&arg);
778         PG_RETURN_NUMERIC(res);
779 }
780
781
782 /*
783  * numeric_trunc() -
784  *
785  *      Truncate a value to have 'scale' digits after the decimal point.
786  *      We allow negative 'scale', implying a truncation before the decimal
787  *      point --- Oracle interprets truncation that way.
788  */
789 Datum
790 numeric_trunc(PG_FUNCTION_ARGS)
791 {
792         Numeric         num = PG_GETARG_NUMERIC(0);
793         int32           scale = PG_GETARG_INT32(1);
794         Numeric         res;
795         NumericVar      arg;
796
797         /*
798          * Handle NaN
799          */
800         if (NUMERIC_IS_NAN(num))
801                 PG_RETURN_NUMERIC(make_result(&const_nan));
802
803         /*
804          * Limit the scale value to avoid possible overflow in calculations
805          */
806         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
807         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
808
809         /*
810          * Unpack the argument and truncate it at the proper digit position
811          */
812         init_var(&arg);
813         set_var_from_num(num, &arg);
814
815         trunc_var(&arg, scale);
816
817         /* We don't allow negative output dscale */
818         if (scale < 0)
819                 arg.dscale = 0;
820
821         /*
822          * Return the truncated result
823          */
824         res = make_result(&arg);
825
826         free_var(&arg);
827         PG_RETURN_NUMERIC(res);
828 }
829
830
831 /*
832  * numeric_ceil() -
833  *
834  *      Return the smallest integer greater than or equal to the argument
835  */
836 Datum
837 numeric_ceil(PG_FUNCTION_ARGS)
838 {
839         Numeric         num = PG_GETARG_NUMERIC(0);
840         Numeric         res;
841         NumericVar      result;
842
843         if (NUMERIC_IS_NAN(num))
844                 PG_RETURN_NUMERIC(make_result(&const_nan));
845
846         init_var(&result);
847
848         set_var_from_num(num, &result);
849         ceil_var(&result, &result);
850
851         res = make_result(&result);
852         free_var(&result);
853
854         PG_RETURN_NUMERIC(res);
855 }
856
857
858 /*
859  * numeric_floor() -
860  *
861  *      Return the largest integer equal to or less than the argument
862  */
863 Datum
864 numeric_floor(PG_FUNCTION_ARGS)
865 {
866         Numeric         num = PG_GETARG_NUMERIC(0);
867         Numeric         res;
868         NumericVar      result;
869
870         if (NUMERIC_IS_NAN(num))
871                 PG_RETURN_NUMERIC(make_result(&const_nan));
872
873         init_var(&result);
874
875         set_var_from_num(num, &result);
876         floor_var(&result, &result);
877
878         res = make_result(&result);
879         free_var(&result);
880
881         PG_RETURN_NUMERIC(res);
882 }
883
884 /*
885  * Implements the numeric version of the width_bucket() function
886  * defined by SQL2003. See also width_bucket_float8().
887  *
888  * 'bound1' and 'bound2' are the lower and upper bounds of the
889  * histogram's range, respectively. 'count' is the number of buckets
890  * in the histogram. width_bucket() returns an integer indicating the
891  * bucket number that 'operand' belongs to in an equiwidth histogram
892  * with the specified characteristics. An operand smaller than the
893  * lower bound is assigned to bucket 0. An operand greater than the
894  * upper bound is assigned to an additional bucket (with number
895  * count+1). We don't allow "NaN" for any of the numeric arguments.
896  */
897 Datum
898 width_bucket_numeric(PG_FUNCTION_ARGS)
899 {
900         Numeric         operand = PG_GETARG_NUMERIC(0);
901         Numeric         bound1 = PG_GETARG_NUMERIC(1);
902         Numeric         bound2 = PG_GETARG_NUMERIC(2);
903         int32           count = PG_GETARG_INT32(3);
904         NumericVar      count_var;
905         NumericVar      result_var;
906         int32           result;
907
908         if (count <= 0)
909                 ereport(ERROR,
910                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
911                                  errmsg("count must be greater than zero")));
912
913         if (NUMERIC_IS_NAN(operand) ||
914                 NUMERIC_IS_NAN(bound1) ||
915                 NUMERIC_IS_NAN(bound2))
916                 ereport(ERROR,
917                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
918                           errmsg("operand, lower bound and upper bound cannot be NaN")));
919
920         init_var(&result_var);
921         init_var(&count_var);
922
923         /* Convert 'count' to a numeric, for ease of use later */
924         int8_to_numericvar((int64) count, &count_var);
925
926         switch (cmp_numerics(bound1, bound2))
927         {
928                 case 0:
929                         ereport(ERROR,
930                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
931                                  errmsg("lower bound cannot equal upper bound")));
932
933                         /* bound1 < bound2 */
934                 case -1:
935                         if (cmp_numerics(operand, bound1) < 0)
936                                 set_var_from_var(&const_zero, &result_var);
937                         else if (cmp_numerics(operand, bound2) >= 0)
938                                 add_var(&count_var, &const_one, &result_var);
939                         else
940                                 compute_bucket(operand, bound1, bound2,
941                                                            &count_var, &result_var);
942                         break;
943
944                         /* bound1 > bound2 */
945                 case 1:
946                         if (cmp_numerics(operand, bound1) > 0)
947                                 set_var_from_var(&const_zero, &result_var);
948                         else if (cmp_numerics(operand, bound2) <= 0)
949                                 add_var(&count_var, &const_one, &result_var);
950                         else
951                                 compute_bucket(operand, bound1, bound2,
952                                                            &count_var, &result_var);
953                         break;
954         }
955
956         /* if result exceeds the range of a legal int4, we ereport here */
957         result = numericvar_to_int4(&result_var);
958
959         free_var(&count_var);
960         free_var(&result_var);
961
962         PG_RETURN_INT32(result);
963 }
964
965 /*
966  * If 'operand' is not outside the bucket range, determine the correct
967  * bucket for it to go. The calculations performed by this function
968  * are derived directly from the SQL2003 spec.
969  */
970 static void
971 compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
972                            NumericVar *count_var, NumericVar *result_var)
973 {
974         NumericVar      bound1_var;
975         NumericVar      bound2_var;
976         NumericVar      operand_var;
977
978         init_var(&bound1_var);
979         init_var(&bound2_var);
980         init_var(&operand_var);
981
982         set_var_from_num(bound1, &bound1_var);
983         set_var_from_num(bound2, &bound2_var);
984         set_var_from_num(operand, &operand_var);
985
986         if (cmp_var(&bound1_var, &bound2_var) < 0)
987         {
988                 sub_var(&operand_var, &bound1_var, &operand_var);
989                 sub_var(&bound2_var, &bound1_var, &bound2_var);
990                 div_var(&operand_var, &bound2_var, result_var,
991                                 select_div_scale(&operand_var, &bound2_var), true);
992         }
993         else
994         {
995                 sub_var(&bound1_var, &operand_var, &operand_var);
996                 sub_var(&bound1_var, &bound2_var, &bound1_var);
997                 div_var(&operand_var, &bound1_var, result_var,
998                                 select_div_scale(&operand_var, &bound1_var), true);
999         }
1000
1001         mul_var(result_var, count_var, result_var,
1002                         result_var->dscale + count_var->dscale);
1003         add_var(result_var, &const_one, result_var);
1004         floor_var(result_var, result_var);
1005
1006         free_var(&bound1_var);
1007         free_var(&bound2_var);
1008         free_var(&operand_var);
1009 }
1010
1011 /* ----------------------------------------------------------------------
1012  *
1013  * Comparison functions
1014  *
1015  * Note: btree indexes need these routines not to leak memory; therefore,
1016  * be careful to free working copies of toasted datums.  Most places don't
1017  * need to be so careful.
1018  * ----------------------------------------------------------------------
1019  */
1020
1021
1022 Datum
1023 numeric_cmp(PG_FUNCTION_ARGS)
1024 {
1025         Numeric         num1 = PG_GETARG_NUMERIC(0);
1026         Numeric         num2 = PG_GETARG_NUMERIC(1);
1027         int                     result;
1028
1029         result = cmp_numerics(num1, num2);
1030
1031         PG_FREE_IF_COPY(num1, 0);
1032         PG_FREE_IF_COPY(num2, 1);
1033
1034         PG_RETURN_INT32(result);
1035 }
1036
1037
1038 Datum
1039 numeric_eq(PG_FUNCTION_ARGS)
1040 {
1041         Numeric         num1 = PG_GETARG_NUMERIC(0);
1042         Numeric         num2 = PG_GETARG_NUMERIC(1);
1043         bool            result;
1044
1045         result = cmp_numerics(num1, num2) == 0;
1046
1047         PG_FREE_IF_COPY(num1, 0);
1048         PG_FREE_IF_COPY(num2, 1);
1049
1050         PG_RETURN_BOOL(result);
1051 }
1052
1053 Datum
1054 numeric_ne(PG_FUNCTION_ARGS)
1055 {
1056         Numeric         num1 = PG_GETARG_NUMERIC(0);
1057         Numeric         num2 = PG_GETARG_NUMERIC(1);
1058         bool            result;
1059
1060         result = cmp_numerics(num1, num2) != 0;
1061
1062         PG_FREE_IF_COPY(num1, 0);
1063         PG_FREE_IF_COPY(num2, 1);
1064
1065         PG_RETURN_BOOL(result);
1066 }
1067
1068 Datum
1069 numeric_gt(PG_FUNCTION_ARGS)
1070 {
1071         Numeric         num1 = PG_GETARG_NUMERIC(0);
1072         Numeric         num2 = PG_GETARG_NUMERIC(1);
1073         bool            result;
1074
1075         result = cmp_numerics(num1, num2) > 0;
1076
1077         PG_FREE_IF_COPY(num1, 0);
1078         PG_FREE_IF_COPY(num2, 1);
1079
1080         PG_RETURN_BOOL(result);
1081 }
1082
1083 Datum
1084 numeric_ge(PG_FUNCTION_ARGS)
1085 {
1086         Numeric         num1 = PG_GETARG_NUMERIC(0);
1087         Numeric         num2 = PG_GETARG_NUMERIC(1);
1088         bool            result;
1089
1090         result = cmp_numerics(num1, num2) >= 0;
1091
1092         PG_FREE_IF_COPY(num1, 0);
1093         PG_FREE_IF_COPY(num2, 1);
1094
1095         PG_RETURN_BOOL(result);
1096 }
1097
1098 Datum
1099 numeric_lt(PG_FUNCTION_ARGS)
1100 {
1101         Numeric         num1 = PG_GETARG_NUMERIC(0);
1102         Numeric         num2 = PG_GETARG_NUMERIC(1);
1103         bool            result;
1104
1105         result = cmp_numerics(num1, num2) < 0;
1106
1107         PG_FREE_IF_COPY(num1, 0);
1108         PG_FREE_IF_COPY(num2, 1);
1109
1110         PG_RETURN_BOOL(result);
1111 }
1112
1113 Datum
1114 numeric_le(PG_FUNCTION_ARGS)
1115 {
1116         Numeric         num1 = PG_GETARG_NUMERIC(0);
1117         Numeric         num2 = PG_GETARG_NUMERIC(1);
1118         bool            result;
1119
1120         result = cmp_numerics(num1, num2) <= 0;
1121
1122         PG_FREE_IF_COPY(num1, 0);
1123         PG_FREE_IF_COPY(num2, 1);
1124
1125         PG_RETURN_BOOL(result);
1126 }
1127
1128 static int
1129 cmp_numerics(Numeric num1, Numeric num2)
1130 {
1131         int                     result;
1132
1133         /*
1134          * We consider all NANs to be equal and larger than any non-NAN. This is
1135          * somewhat arbitrary; the important thing is to have a consistent sort
1136          * order.
1137          */
1138         if (NUMERIC_IS_NAN(num1))
1139         {
1140                 if (NUMERIC_IS_NAN(num2))
1141                         result = 0;                     /* NAN = NAN */
1142                 else
1143                         result = 1;                     /* NAN > non-NAN */
1144         }
1145         else if (NUMERIC_IS_NAN(num2))
1146         {
1147                 result = -1;                    /* non-NAN < NAN */
1148         }
1149         else
1150         {
1151                 result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
1152                                                                 num1->n_weight, NUMERIC_SIGN(num1),
1153                                                                 NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
1154                                                                 num2->n_weight, NUMERIC_SIGN(num2));
1155         }
1156
1157         return result;
1158 }
1159
1160 Datum
1161 hash_numeric(PG_FUNCTION_ARGS)
1162 {
1163         Numeric         key = PG_GETARG_NUMERIC(0);
1164         Datum           digit_hash;
1165         Datum           result;
1166         int                     weight;
1167         int                     start_offset;
1168         int                     end_offset;
1169         int                     i;
1170         int                     hash_len;
1171
1172         /* If it's NaN, don't try to hash the rest of the fields */
1173         if (NUMERIC_IS_NAN(key))
1174                 PG_RETURN_UINT32(0);
1175
1176         weight = key->n_weight;
1177         start_offset = 0;
1178         end_offset = 0;
1179
1180         /*
1181          * Omit any leading or trailing zeros from the input to the hash. The
1182          * numeric implementation *should* guarantee that leading and trailing
1183          * zeros are suppressed, but we're paranoid. Note that we measure the
1184          * starting and ending offsets in units of NumericDigits, not bytes.
1185          */
1186         for (i = 0; i < NUMERIC_NDIGITS(key); i++)
1187         {
1188                 if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
1189                         break;
1190
1191                 start_offset++;
1192
1193                 /*
1194                  * The weight is effectively the # of digits before the decimal point,
1195                  * so decrement it for each leading zero we skip.
1196                  */
1197                 weight--;
1198         }
1199
1200         /*
1201          * If there are no non-zero digits, then the value of the number is zero,
1202          * regardless of any other fields.
1203          */
1204         if (NUMERIC_NDIGITS(key) == start_offset)
1205                 PG_RETURN_UINT32(-1);
1206
1207         for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--)
1208         {
1209                 if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
1210                         break;
1211
1212                 end_offset++;
1213         }
1214
1215         /* If we get here, there should be at least one non-zero digit */
1216         Assert(start_offset + end_offset < NUMERIC_NDIGITS(key));
1217
1218         /*
1219          * Note that we don't hash on the Numeric's scale, since two numerics can
1220          * compare equal but have different scales. We also don't hash on the
1221          * sign, although we could: since a sign difference implies inequality,
1222          * this shouldn't affect correctness.
1223          */
1224         hash_len = NUMERIC_NDIGITS(key) - start_offset - end_offset;
1225         digit_hash = hash_any((unsigned char *) (NUMERIC_DIGITS(key) + start_offset),
1226                                                   hash_len * sizeof(NumericDigit));
1227
1228         /* Mix in the weight, via XOR */
1229         result = digit_hash ^ weight;
1230
1231         PG_RETURN_DATUM(result);
1232 }
1233
1234
1235 /* ----------------------------------------------------------------------
1236  *
1237  * Basic arithmetic functions
1238  *
1239  * ----------------------------------------------------------------------
1240  */
1241
1242
1243 /*
1244  * numeric_add() -
1245  *
1246  *      Add two numerics
1247  */
1248 Datum
1249 numeric_add(PG_FUNCTION_ARGS)
1250 {
1251         Numeric         num1 = PG_GETARG_NUMERIC(0);
1252         Numeric         num2 = PG_GETARG_NUMERIC(1);
1253         NumericVar      arg1;
1254         NumericVar      arg2;
1255         NumericVar      result;
1256         Numeric         res;
1257
1258         /*
1259          * Handle NaN
1260          */
1261         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1262                 PG_RETURN_NUMERIC(make_result(&const_nan));
1263
1264         /*
1265          * Unpack the values, let add_var() compute the result and return it.
1266          */
1267         init_var(&arg1);
1268         init_var(&arg2);
1269         init_var(&result);
1270
1271         set_var_from_num(num1, &arg1);
1272         set_var_from_num(num2, &arg2);
1273
1274         add_var(&arg1, &arg2, &result);
1275
1276         res = make_result(&result);
1277
1278         free_var(&arg1);
1279         free_var(&arg2);
1280         free_var(&result);
1281
1282         PG_RETURN_NUMERIC(res);
1283 }
1284
1285
1286 /*
1287  * numeric_sub() -
1288  *
1289  *      Subtract one numeric from another
1290  */
1291 Datum
1292 numeric_sub(PG_FUNCTION_ARGS)
1293 {
1294         Numeric         num1 = PG_GETARG_NUMERIC(0);
1295         Numeric         num2 = PG_GETARG_NUMERIC(1);
1296         NumericVar      arg1;
1297         NumericVar      arg2;
1298         NumericVar      result;
1299         Numeric         res;
1300
1301         /*
1302          * Handle NaN
1303          */
1304         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1305                 PG_RETURN_NUMERIC(make_result(&const_nan));
1306
1307         /*
1308          * Unpack the values, let sub_var() compute the result and return it.
1309          */
1310         init_var(&arg1);
1311         init_var(&arg2);
1312         init_var(&result);
1313
1314         set_var_from_num(num1, &arg1);
1315         set_var_from_num(num2, &arg2);
1316
1317         sub_var(&arg1, &arg2, &result);
1318
1319         res = make_result(&result);
1320
1321         free_var(&arg1);
1322         free_var(&arg2);
1323         free_var(&result);
1324
1325         PG_RETURN_NUMERIC(res);
1326 }
1327
1328
1329 /*
1330  * numeric_mul() -
1331  *
1332  *      Calculate the product of two numerics
1333  */
1334 Datum
1335 numeric_mul(PG_FUNCTION_ARGS)
1336 {
1337         Numeric         num1 = PG_GETARG_NUMERIC(0);
1338         Numeric         num2 = PG_GETARG_NUMERIC(1);
1339         NumericVar      arg1;
1340         NumericVar      arg2;
1341         NumericVar      result;
1342         Numeric         res;
1343
1344         /*
1345          * Handle NaN
1346          */
1347         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1348                 PG_RETURN_NUMERIC(make_result(&const_nan));
1349
1350         /*
1351          * Unpack the values, let mul_var() compute the result and return it.
1352          * Unlike add_var() and sub_var(), mul_var() will round its result. In the
1353          * case of numeric_mul(), which is invoked for the * operator on numerics,
1354          * we request exact representation for the product (rscale = sum(dscale of
1355          * arg1, dscale of arg2)).
1356          */
1357         init_var(&arg1);
1358         init_var(&arg2);
1359         init_var(&result);
1360
1361         set_var_from_num(num1, &arg1);
1362         set_var_from_num(num2, &arg2);
1363
1364         mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
1365
1366         res = make_result(&result);
1367
1368         free_var(&arg1);
1369         free_var(&arg2);
1370         free_var(&result);
1371
1372         PG_RETURN_NUMERIC(res);
1373 }
1374
1375
1376 /*
1377  * numeric_div() -
1378  *
1379  *      Divide one numeric into another
1380  */
1381 Datum
1382 numeric_div(PG_FUNCTION_ARGS)
1383 {
1384         Numeric         num1 = PG_GETARG_NUMERIC(0);
1385         Numeric         num2 = PG_GETARG_NUMERIC(1);
1386         NumericVar      arg1;
1387         NumericVar      arg2;
1388         NumericVar      result;
1389         Numeric         res;
1390         int                     rscale;
1391
1392         /*
1393          * Handle NaN
1394          */
1395         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1396                 PG_RETURN_NUMERIC(make_result(&const_nan));
1397
1398         /*
1399          * Unpack the arguments
1400          */
1401         init_var(&arg1);
1402         init_var(&arg2);
1403         init_var(&result);
1404
1405         set_var_from_num(num1, &arg1);
1406         set_var_from_num(num2, &arg2);
1407
1408         /*
1409          * Select scale for division result
1410          */
1411         rscale = select_div_scale(&arg1, &arg2);
1412
1413         /*
1414          * Do the divide and return the result
1415          */
1416         div_var(&arg1, &arg2, &result, rscale, true);
1417
1418         res = make_result(&result);
1419
1420         free_var(&arg1);
1421         free_var(&arg2);
1422         free_var(&result);
1423
1424         PG_RETURN_NUMERIC(res);
1425 }
1426
1427
1428 /*
1429  * numeric_div_trunc() -
1430  *
1431  *      Divide one numeric into another, truncating the result to an integer
1432  */
1433 Datum
1434 numeric_div_trunc(PG_FUNCTION_ARGS)
1435 {
1436         Numeric         num1 = PG_GETARG_NUMERIC(0);
1437         Numeric         num2 = PG_GETARG_NUMERIC(1);
1438         NumericVar      arg1;
1439         NumericVar      arg2;
1440         NumericVar      result;
1441         Numeric         res;
1442
1443         /*
1444          * Handle NaN
1445          */
1446         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1447                 PG_RETURN_NUMERIC(make_result(&const_nan));
1448
1449         /*
1450          * Unpack the arguments
1451          */
1452         init_var(&arg1);
1453         init_var(&arg2);
1454         init_var(&result);
1455
1456         set_var_from_num(num1, &arg1);
1457         set_var_from_num(num2, &arg2);
1458
1459         /*
1460          * Do the divide and return the result
1461          */
1462         div_var(&arg1, &arg2, &result, 0, false);
1463
1464         res = make_result(&result);
1465
1466         free_var(&arg1);
1467         free_var(&arg2);
1468         free_var(&result);
1469
1470         PG_RETURN_NUMERIC(res);
1471 }
1472
1473
1474 /*
1475  * numeric_mod() -
1476  *
1477  *      Calculate the modulo of two numerics
1478  */
1479 Datum
1480 numeric_mod(PG_FUNCTION_ARGS)
1481 {
1482         Numeric         num1 = PG_GETARG_NUMERIC(0);
1483         Numeric         num2 = PG_GETARG_NUMERIC(1);
1484         Numeric         res;
1485         NumericVar      arg1;
1486         NumericVar      arg2;
1487         NumericVar      result;
1488
1489         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1490                 PG_RETURN_NUMERIC(make_result(&const_nan));
1491
1492         init_var(&arg1);
1493         init_var(&arg2);
1494         init_var(&result);
1495
1496         set_var_from_num(num1, &arg1);
1497         set_var_from_num(num2, &arg2);
1498
1499         mod_var(&arg1, &arg2, &result);
1500
1501         res = make_result(&result);
1502
1503         free_var(&result);
1504         free_var(&arg2);
1505         free_var(&arg1);
1506
1507         PG_RETURN_NUMERIC(res);
1508 }
1509
1510
1511 /*
1512  * numeric_inc() -
1513  *
1514  *      Increment a number by one
1515  */
1516 Datum
1517 numeric_inc(PG_FUNCTION_ARGS)
1518 {
1519         Numeric         num = PG_GETARG_NUMERIC(0);
1520         NumericVar      arg;
1521         Numeric         res;
1522
1523         /*
1524          * Handle NaN
1525          */
1526         if (NUMERIC_IS_NAN(num))
1527                 PG_RETURN_NUMERIC(make_result(&const_nan));
1528
1529         /*
1530          * Compute the result and return it
1531          */
1532         init_var(&arg);
1533
1534         set_var_from_num(num, &arg);
1535
1536         add_var(&arg, &const_one, &arg);
1537
1538         res = make_result(&arg);
1539
1540         free_var(&arg);
1541
1542         PG_RETURN_NUMERIC(res);
1543 }
1544
1545
1546 /*
1547  * numeric_smaller() -
1548  *
1549  *      Return the smaller of two numbers
1550  */
1551 Datum
1552 numeric_smaller(PG_FUNCTION_ARGS)
1553 {
1554         Numeric         num1 = PG_GETARG_NUMERIC(0);
1555         Numeric         num2 = PG_GETARG_NUMERIC(1);
1556
1557         /*
1558          * Use cmp_numerics so that this will agree with the comparison operators,
1559          * particularly as regards comparisons involving NaN.
1560          */
1561         if (cmp_numerics(num1, num2) < 0)
1562                 PG_RETURN_NUMERIC(num1);
1563         else
1564                 PG_RETURN_NUMERIC(num2);
1565 }
1566
1567
1568 /*
1569  * numeric_larger() -
1570  *
1571  *      Return the larger of two numbers
1572  */
1573 Datum
1574 numeric_larger(PG_FUNCTION_ARGS)
1575 {
1576         Numeric         num1 = PG_GETARG_NUMERIC(0);
1577         Numeric         num2 = PG_GETARG_NUMERIC(1);
1578
1579         /*
1580          * Use cmp_numerics so that this will agree with the comparison operators,
1581          * particularly as regards comparisons involving NaN.
1582          */
1583         if (cmp_numerics(num1, num2) > 0)
1584                 PG_RETURN_NUMERIC(num1);
1585         else
1586                 PG_RETURN_NUMERIC(num2);
1587 }
1588
1589
1590 /* ----------------------------------------------------------------------
1591  *
1592  * Advanced math functions
1593  *
1594  * ----------------------------------------------------------------------
1595  */
1596
1597 /*
1598  * numeric_fac()
1599  *
1600  * Compute factorial
1601  */
1602 Datum
1603 numeric_fac(PG_FUNCTION_ARGS)
1604 {
1605         int64           num = PG_GETARG_INT64(0);
1606         Numeric         res;
1607         NumericVar      fact;
1608         NumericVar      result;
1609
1610         if (num <= 1)
1611         {
1612                 res = make_result(&const_one);
1613                 PG_RETURN_NUMERIC(res);
1614         }
1615         /* Fail immediately if the result would overflow */
1616         if (num > 32177)
1617                 ereport(ERROR,
1618                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1619                                  errmsg("value overflows numeric format")));
1620
1621         init_var(&fact);
1622         init_var(&result);
1623
1624         int8_to_numericvar(num, &result);
1625
1626         for (num = num - 1; num > 1; num--)
1627         {
1628                 /* this loop can take awhile, so allow it to be interrupted */
1629                 CHECK_FOR_INTERRUPTS();
1630
1631                 int8_to_numericvar(num, &fact);
1632
1633                 mul_var(&result, &fact, &result, 0);
1634         }
1635
1636         res = make_result(&result);
1637
1638         free_var(&fact);
1639         free_var(&result);
1640
1641         PG_RETURN_NUMERIC(res);
1642 }
1643
1644
1645 /*
1646  * numeric_sqrt() -
1647  *
1648  *      Compute the square root of a numeric.
1649  */
1650 Datum
1651 numeric_sqrt(PG_FUNCTION_ARGS)
1652 {
1653         Numeric         num = PG_GETARG_NUMERIC(0);
1654         Numeric         res;
1655         NumericVar      arg;
1656         NumericVar      result;
1657         int                     sweight;
1658         int                     rscale;
1659
1660         /*
1661          * Handle NaN
1662          */
1663         if (NUMERIC_IS_NAN(num))
1664                 PG_RETURN_NUMERIC(make_result(&const_nan));
1665
1666         /*
1667          * Unpack the argument and determine the result scale.  We choose a scale
1668          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1669          * case not less than the input's dscale.
1670          */
1671         init_var(&arg);
1672         init_var(&result);
1673
1674         set_var_from_num(num, &arg);
1675
1676         /* Assume the input was normalized, so arg.weight is accurate */
1677         sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
1678
1679         rscale = NUMERIC_MIN_SIG_DIGITS - sweight;
1680         rscale = Max(rscale, arg.dscale);
1681         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1682         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1683
1684         /*
1685          * Let sqrt_var() do the calculation and return the result.
1686          */
1687         sqrt_var(&arg, &result, rscale);
1688
1689         res = make_result(&result);
1690
1691         free_var(&result);
1692         free_var(&arg);
1693
1694         PG_RETURN_NUMERIC(res);
1695 }
1696
1697
1698 /*
1699  * numeric_exp() -
1700  *
1701  *      Raise e to the power of x
1702  */
1703 Datum
1704 numeric_exp(PG_FUNCTION_ARGS)
1705 {
1706         Numeric         num = PG_GETARG_NUMERIC(0);
1707         Numeric         res;
1708         NumericVar      arg;
1709         NumericVar      result;
1710         int                     rscale;
1711         double          val;
1712
1713         /*
1714          * Handle NaN
1715          */
1716         if (NUMERIC_IS_NAN(num))
1717                 PG_RETURN_NUMERIC(make_result(&const_nan));
1718
1719         /*
1720          * Unpack the argument and determine the result scale.  We choose a scale
1721          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1722          * case not less than the input's dscale.
1723          */
1724         init_var(&arg);
1725         init_var(&result);
1726
1727         set_var_from_num(num, &arg);
1728
1729         /* convert input to float8, ignoring overflow */
1730         val = numericvar_to_double_no_overflow(&arg);
1731
1732         /*
1733          * log10(result) = num * log10(e), so this is approximately the decimal
1734          * weight of the result:
1735          */
1736         val *= 0.434294481903252;
1737
1738         /* limit to something that won't cause integer overflow */
1739         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
1740         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
1741
1742         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
1743         rscale = Max(rscale, arg.dscale);
1744         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1745         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1746
1747         /*
1748          * Let exp_var() do the calculation and return the result.
1749          */
1750         exp_var(&arg, &result, rscale);
1751
1752         res = make_result(&result);
1753
1754         free_var(&result);
1755         free_var(&arg);
1756
1757         PG_RETURN_NUMERIC(res);
1758 }
1759
1760
1761 /*
1762  * numeric_ln() -
1763  *
1764  *      Compute the natural logarithm of x
1765  */
1766 Datum
1767 numeric_ln(PG_FUNCTION_ARGS)
1768 {
1769         Numeric         num = PG_GETARG_NUMERIC(0);
1770         Numeric         res;
1771         NumericVar      arg;
1772         NumericVar      result;
1773         int                     dec_digits;
1774         int                     rscale;
1775
1776         /*
1777          * Handle NaN
1778          */
1779         if (NUMERIC_IS_NAN(num))
1780                 PG_RETURN_NUMERIC(make_result(&const_nan));
1781
1782         init_var(&arg);
1783         init_var(&result);
1784
1785         set_var_from_num(num, &arg);
1786
1787         /* Approx decimal digits before decimal point */
1788         dec_digits = (arg.weight + 1) * DEC_DIGITS;
1789
1790         if (dec_digits > 1)
1791                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
1792         else if (dec_digits < 1)
1793                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
1794         else
1795                 rscale = NUMERIC_MIN_SIG_DIGITS;
1796
1797         rscale = Max(rscale, arg.dscale);
1798         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1799         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1800
1801         ln_var(&arg, &result, rscale);
1802
1803         res = make_result(&result);
1804
1805         free_var(&result);
1806         free_var(&arg);
1807
1808         PG_RETURN_NUMERIC(res);
1809 }
1810
1811
1812 /*
1813  * numeric_log() -
1814  *
1815  *      Compute the logarithm of x in a given base
1816  */
1817 Datum
1818 numeric_log(PG_FUNCTION_ARGS)
1819 {
1820         Numeric         num1 = PG_GETARG_NUMERIC(0);
1821         Numeric         num2 = PG_GETARG_NUMERIC(1);
1822         Numeric         res;
1823         NumericVar      arg1;
1824         NumericVar      arg2;
1825         NumericVar      result;
1826
1827         /*
1828          * Handle NaN
1829          */
1830         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1831                 PG_RETURN_NUMERIC(make_result(&const_nan));
1832
1833         /*
1834          * Initialize things
1835          */
1836         init_var(&arg1);
1837         init_var(&arg2);
1838         init_var(&result);
1839
1840         set_var_from_num(num1, &arg1);
1841         set_var_from_num(num2, &arg2);
1842
1843         /*
1844          * Call log_var() to compute and return the result; note it handles scale
1845          * selection itself.
1846          */
1847         log_var(&arg1, &arg2, &result);
1848
1849         res = make_result(&result);
1850
1851         free_var(&result);
1852         free_var(&arg2);
1853         free_var(&arg1);
1854
1855         PG_RETURN_NUMERIC(res);
1856 }
1857
1858
1859 /*
1860  * numeric_power() -
1861  *
1862  *      Raise b to the power of x
1863  */
1864 Datum
1865 numeric_power(PG_FUNCTION_ARGS)
1866 {
1867         Numeric         num1 = PG_GETARG_NUMERIC(0);
1868         Numeric         num2 = PG_GETARG_NUMERIC(1);
1869         Numeric         res;
1870         NumericVar      arg1;
1871         NumericVar      arg2;
1872         NumericVar      arg2_trunc;
1873         NumericVar      result;
1874
1875         /*
1876          * Handle NaN
1877          */
1878         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1879                 PG_RETURN_NUMERIC(make_result(&const_nan));
1880
1881         /*
1882          * Initialize things
1883          */
1884         init_var(&arg1);
1885         init_var(&arg2);
1886         init_var(&arg2_trunc);
1887         init_var(&result);
1888
1889         set_var_from_num(num1, &arg1);
1890         set_var_from_num(num2, &arg2);
1891         set_var_from_var(&arg2, &arg2_trunc);
1892
1893         trunc_var(&arg2_trunc, 0);
1894
1895         /*
1896          * The SQL spec requires that we emit a particular SQLSTATE error code for
1897          * certain error conditions.  Specifically, we don't return a divide-by-zero
1898          * error code for 0 ^ -1.
1899          */
1900         if ((cmp_var(&arg1, &const_zero) == 0 &&
1901                  cmp_var(&arg2, &const_zero) < 0) ||
1902                 (cmp_var(&arg1, &const_zero) < 0 &&
1903                  cmp_var(&arg2, &arg2_trunc) != 0))
1904                 ereport(ERROR,
1905                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
1906                                  errmsg("invalid argument for power function")));
1907
1908         /*
1909          * Call power_var() to compute and return the result; note it handles
1910          * scale selection itself.
1911          */
1912         power_var(&arg1, &arg2, &result);
1913
1914         res = make_result(&result);
1915
1916         free_var(&result);
1917         free_var(&arg2);
1918         free_var(&arg2_trunc);
1919         free_var(&arg1);
1920
1921         PG_RETURN_NUMERIC(res);
1922 }
1923
1924
1925 /* ----------------------------------------------------------------------
1926  *
1927  * Type conversion functions
1928  *
1929  * ----------------------------------------------------------------------
1930  */
1931
1932
1933 Datum
1934 int4_numeric(PG_FUNCTION_ARGS)
1935 {
1936         int32           val = PG_GETARG_INT32(0);
1937         Numeric         res;
1938         NumericVar      result;
1939
1940         init_var(&result);
1941
1942         int8_to_numericvar((int64) val, &result);
1943
1944         res = make_result(&result);
1945
1946         free_var(&result);
1947
1948         PG_RETURN_NUMERIC(res);
1949 }
1950
1951
1952 Datum
1953 numeric_int4(PG_FUNCTION_ARGS)
1954 {
1955         Numeric         num = PG_GETARG_NUMERIC(0);
1956         NumericVar      x;
1957         int32           result;
1958
1959         /* XXX would it be better to return NULL? */
1960         if (NUMERIC_IS_NAN(num))
1961                 ereport(ERROR,
1962                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1963                                  errmsg("cannot convert NaN to integer")));
1964
1965         /* Convert to variable format, then convert to int4 */
1966         init_var(&x);
1967         set_var_from_num(num, &x);
1968         result = numericvar_to_int4(&x);
1969         free_var(&x);
1970         PG_RETURN_INT32(result);
1971 }
1972
1973 /*
1974  * Given a NumericVar, convert it to an int32. If the NumericVar
1975  * exceeds the range of an int32, raise the appropriate error via
1976  * ereport(). The input NumericVar is *not* free'd.
1977  */
1978 static int32
1979 numericvar_to_int4(NumericVar *var)
1980 {
1981         int32           result;
1982         int64           val;
1983
1984         if (!numericvar_to_int8(var, &val))
1985                 ereport(ERROR,
1986                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1987                                  errmsg("integer out of range")));
1988
1989         /* Down-convert to int4 */
1990         result = (int32) val;
1991
1992         /* Test for overflow by reverse-conversion. */
1993         if ((int64) result != val)
1994                 ereport(ERROR,
1995                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1996                                  errmsg("integer out of range")));
1997
1998         return result;
1999 }
2000
2001 Datum
2002 int8_numeric(PG_FUNCTION_ARGS)
2003 {
2004         int64           val = PG_GETARG_INT64(0);
2005         Numeric         res;
2006         NumericVar      result;
2007
2008         init_var(&result);
2009
2010         int8_to_numericvar(val, &result);
2011
2012         res = make_result(&result);
2013
2014         free_var(&result);
2015
2016         PG_RETURN_NUMERIC(res);
2017 }
2018
2019
2020 Datum
2021 numeric_int8(PG_FUNCTION_ARGS)
2022 {
2023         Numeric         num = PG_GETARG_NUMERIC(0);
2024         NumericVar      x;
2025         int64           result;
2026
2027         /* XXX would it be better to return NULL? */
2028         if (NUMERIC_IS_NAN(num))
2029                 ereport(ERROR,
2030                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2031                                  errmsg("cannot convert NaN to bigint")));
2032
2033         /* Convert to variable format and thence to int8 */
2034         init_var(&x);
2035         set_var_from_num(num, &x);
2036
2037         if (!numericvar_to_int8(&x, &result))
2038                 ereport(ERROR,
2039                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2040                                  errmsg("bigint out of range")));
2041
2042         free_var(&x);
2043
2044         PG_RETURN_INT64(result);
2045 }
2046
2047
2048 Datum
2049 int2_numeric(PG_FUNCTION_ARGS)
2050 {
2051         int16           val = PG_GETARG_INT16(0);
2052         Numeric         res;
2053         NumericVar      result;
2054
2055         init_var(&result);
2056
2057         int8_to_numericvar((int64) val, &result);
2058
2059         res = make_result(&result);
2060
2061         free_var(&result);
2062
2063         PG_RETURN_NUMERIC(res);
2064 }
2065
2066
2067 Datum
2068 numeric_int2(PG_FUNCTION_ARGS)
2069 {
2070         Numeric         num = PG_GETARG_NUMERIC(0);
2071         NumericVar      x;
2072         int64           val;
2073         int16           result;
2074
2075         /* XXX would it be better to return NULL? */
2076         if (NUMERIC_IS_NAN(num))
2077                 ereport(ERROR,
2078                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2079                                  errmsg("cannot convert NaN to smallint")));
2080
2081         /* Convert to variable format and thence to int8 */
2082         init_var(&x);
2083         set_var_from_num(num, &x);
2084
2085         if (!numericvar_to_int8(&x, &val))
2086                 ereport(ERROR,
2087                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2088                                  errmsg("smallint out of range")));
2089
2090         free_var(&x);
2091
2092         /* Down-convert to int2 */
2093         result = (int16) val;
2094
2095         /* Test for overflow by reverse-conversion. */
2096         if ((int64) result != val)
2097                 ereport(ERROR,
2098                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2099                                  errmsg("smallint out of range")));
2100
2101         PG_RETURN_INT16(result);
2102 }
2103
2104
2105 Datum
2106 float8_numeric(PG_FUNCTION_ARGS)
2107 {
2108         float8          val = PG_GETARG_FLOAT8(0);
2109         Numeric         res;
2110         NumericVar      result;
2111         char            buf[DBL_DIG + 100];
2112
2113         if (isnan(val))
2114                 PG_RETURN_NUMERIC(make_result(&const_nan));
2115
2116         sprintf(buf, "%.*g", DBL_DIG, val);
2117
2118         init_var(&result);
2119
2120         set_var_from_str(buf, &result);
2121         res = make_result(&result);
2122
2123         free_var(&result);
2124
2125         PG_RETURN_NUMERIC(res);
2126 }
2127
2128
2129 Datum
2130 numeric_float8(PG_FUNCTION_ARGS)
2131 {
2132         Numeric         num = PG_GETARG_NUMERIC(0);
2133         char       *tmp;
2134         Datum           result;
2135
2136         if (NUMERIC_IS_NAN(num))
2137                 PG_RETURN_FLOAT8(get_float8_nan());
2138
2139         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2140                                                                                           NumericGetDatum(num)));
2141
2142         result = DirectFunctionCall1(float8in, CStringGetDatum(tmp));
2143
2144         pfree(tmp);
2145
2146         PG_RETURN_DATUM(result);
2147 }
2148
2149
2150 /* Convert numeric to float8; if out of range, return +/- HUGE_VAL */
2151 Datum
2152 numeric_float8_no_overflow(PG_FUNCTION_ARGS)
2153 {
2154         Numeric         num = PG_GETARG_NUMERIC(0);
2155         double          val;
2156
2157         if (NUMERIC_IS_NAN(num))
2158                 PG_RETURN_FLOAT8(get_float8_nan());
2159
2160         val = numeric_to_double_no_overflow(num);
2161
2162         PG_RETURN_FLOAT8(val);
2163 }
2164
2165 Datum
2166 float4_numeric(PG_FUNCTION_ARGS)
2167 {
2168         float4          val = PG_GETARG_FLOAT4(0);
2169         Numeric         res;
2170         NumericVar      result;
2171         char            buf[FLT_DIG + 100];
2172
2173         if (isnan(val))
2174                 PG_RETURN_NUMERIC(make_result(&const_nan));
2175
2176         sprintf(buf, "%.*g", FLT_DIG, val);
2177
2178         init_var(&result);
2179
2180         set_var_from_str(buf, &result);
2181         res = make_result(&result);
2182
2183         free_var(&result);
2184
2185         PG_RETURN_NUMERIC(res);
2186 }
2187
2188
2189 Datum
2190 numeric_float4(PG_FUNCTION_ARGS)
2191 {
2192         Numeric         num = PG_GETARG_NUMERIC(0);
2193         char       *tmp;
2194         Datum           result;
2195
2196         if (NUMERIC_IS_NAN(num))
2197                 PG_RETURN_FLOAT4(get_float4_nan());
2198
2199         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2200                                                                                           NumericGetDatum(num)));
2201
2202         result = DirectFunctionCall1(float4in, CStringGetDatum(tmp));
2203
2204         pfree(tmp);
2205
2206         PG_RETURN_DATUM(result);
2207 }
2208
2209
2210 /* ----------------------------------------------------------------------
2211  *
2212  * Aggregate functions
2213  *
2214  * The transition datatype for all these aggregates is a 3-element array
2215  * of Numeric, holding the values N, sum(X), sum(X*X) in that order.
2216  *
2217  * We represent N as a numeric mainly to avoid having to build a special
2218  * datatype; it's unlikely it'd overflow an int4, but ...
2219  *
2220  * ----------------------------------------------------------------------
2221  */
2222
2223 static ArrayType *
2224 do_numeric_accum(ArrayType *transarray, Numeric newval)
2225 {
2226         Datum      *transdatums;
2227         int                     ndatums;
2228         Datum           N,
2229                                 sumX,
2230                                 sumX2;
2231         ArrayType  *result;
2232
2233         /* We assume the input is array of numeric */
2234         deconstruct_array(transarray,
2235                                           NUMERICOID, -1, false, 'i',
2236                                           &transdatums, NULL, &ndatums);
2237         if (ndatums != 3)
2238                 elog(ERROR, "expected 3-element numeric array");
2239         N = transdatums[0];
2240         sumX = transdatums[1];
2241         sumX2 = transdatums[2];
2242
2243         N = DirectFunctionCall1(numeric_inc, N);
2244         sumX = DirectFunctionCall2(numeric_add, sumX,
2245                                                            NumericGetDatum(newval));
2246         sumX2 = DirectFunctionCall2(numeric_add, sumX2,
2247                                                                 DirectFunctionCall2(numeric_mul,
2248                                                                                                         NumericGetDatum(newval),
2249                                                                                                         NumericGetDatum(newval)));
2250
2251         transdatums[0] = N;
2252         transdatums[1] = sumX;
2253         transdatums[2] = sumX2;
2254
2255         result = construct_array(transdatums, 3,
2256                                                          NUMERICOID, -1, false, 'i');
2257
2258         return result;
2259 }
2260
2261 /*
2262  * Improve avg performance by not caclulating sum(X*X).
2263  */
2264 static ArrayType *
2265 do_numeric_avg_accum(ArrayType *transarray, Numeric newval)
2266 {
2267         Datum      *transdatums;
2268         int                     ndatums;
2269         Datum           N,
2270                                 sumX;
2271         ArrayType  *result;
2272
2273         /* We assume the input is array of numeric */
2274         deconstruct_array(transarray,
2275                                           NUMERICOID, -1, false, 'i',
2276                                           &transdatums, NULL, &ndatums);
2277         if (ndatums != 2)
2278                 elog(ERROR, "expected 2-element numeric array");
2279         N = transdatums[0];
2280         sumX = transdatums[1];
2281
2282         N = DirectFunctionCall1(numeric_inc, N);
2283         sumX = DirectFunctionCall2(numeric_add, sumX,
2284                                                            NumericGetDatum(newval));
2285
2286         transdatums[0] = N;
2287         transdatums[1] = sumX;
2288
2289         result = construct_array(transdatums, 2,
2290                                                          NUMERICOID, -1, false, 'i');
2291
2292         return result;
2293 }
2294
2295 Datum
2296 numeric_accum(PG_FUNCTION_ARGS)
2297 {
2298         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2299         Numeric         newval = PG_GETARG_NUMERIC(1);
2300
2301         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2302 }
2303
2304 /*
2305  * Optimized case for average of numeric.
2306  */
2307 Datum
2308 numeric_avg_accum(PG_FUNCTION_ARGS)
2309 {
2310         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2311         Numeric         newval = PG_GETARG_NUMERIC(1);
2312
2313         PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
2314 }
2315
2316 /*
2317  * Integer data types all use Numeric accumulators to share code and
2318  * avoid risk of overflow.      For int2 and int4 inputs, Numeric accumulation
2319  * is overkill for the N and sum(X) values, but definitely not overkill
2320  * for the sum(X*X) value.      Hence, we use int2_accum and int4_accum only
2321  * for stddev/variance --- there are faster special-purpose accumulator
2322  * routines for SUM and AVG of these datatypes.
2323  */
2324
2325 Datum
2326 int2_accum(PG_FUNCTION_ARGS)
2327 {
2328         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2329         Datum           newval2 = PG_GETARG_DATUM(1);
2330         Numeric         newval;
2331
2332         newval = DatumGetNumeric(DirectFunctionCall1(int2_numeric, newval2));
2333
2334         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2335 }
2336
2337 Datum
2338 int4_accum(PG_FUNCTION_ARGS)
2339 {
2340         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2341         Datum           newval4 = PG_GETARG_DATUM(1);
2342         Numeric         newval;
2343
2344         newval = DatumGetNumeric(DirectFunctionCall1(int4_numeric, newval4));
2345
2346         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2347 }
2348
2349 Datum
2350 int8_accum(PG_FUNCTION_ARGS)
2351 {
2352         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2353         Datum           newval8 = PG_GETARG_DATUM(1);
2354         Numeric         newval;
2355
2356         newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
2357
2358         PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
2359 }
2360
2361 /*
2362  * Optimized case for average of int8.
2363  */
2364 Datum
2365 int8_avg_accum(PG_FUNCTION_ARGS)
2366 {
2367         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2368         Datum           newval8 = PG_GETARG_DATUM(1);
2369         Numeric         newval;
2370
2371         newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
2372
2373         PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
2374 }
2375
2376
2377 Datum
2378 numeric_avg(PG_FUNCTION_ARGS)
2379 {
2380         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2381         Datum      *transdatums;
2382         int                     ndatums;
2383         Numeric         N,
2384                                 sumX;
2385
2386         /* We assume the input is array of numeric */
2387         deconstruct_array(transarray,
2388                                           NUMERICOID, -1, false, 'i',
2389                                           &transdatums, NULL, &ndatums);
2390         if (ndatums != 2)
2391                 elog(ERROR, "expected 2-element numeric array");
2392         N = DatumGetNumeric(transdatums[0]);
2393         sumX = DatumGetNumeric(transdatums[1]);
2394
2395         /* SQL92 defines AVG of no values to be NULL */
2396         /* N is zero iff no digits (cf. numeric_uminus) */
2397         if (VARSIZE(N) == NUMERIC_HDRSZ)
2398                 PG_RETURN_NULL();
2399
2400         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div,
2401                                                                                 NumericGetDatum(sumX),
2402                                                                                 NumericGetDatum(N)));
2403 }
2404
2405 /*
2406  * Workhorse routine for the standard deviance and variance
2407  * aggregates. 'transarray' is the aggregate's transition
2408  * array. 'variance' specifies whether we should calculate the
2409  * variance or the standard deviation. 'sample' indicates whether the
2410  * caller is interested in the sample or the population
2411  * variance/stddev.
2412  *
2413  * If appropriate variance statistic is undefined for the input,
2414  * *is_null is set to true and NULL is returned.
2415  */
2416 static Numeric
2417 numeric_stddev_internal(ArrayType *transarray,
2418                                                 bool variance, bool sample,
2419                                                 bool *is_null)
2420 {
2421         Datum      *transdatums;
2422         int                     ndatums;
2423         Numeric         N,
2424                                 sumX,
2425                                 sumX2,
2426                                 res;
2427         NumericVar      vN,
2428                                 vsumX,
2429                                 vsumX2,
2430                                 vNminus1;
2431         NumericVar *comp;
2432         int                     rscale;
2433
2434         *is_null = false;
2435
2436         /* We assume the input is array of numeric */
2437         deconstruct_array(transarray,
2438                                           NUMERICOID, -1, false, 'i',
2439                                           &transdatums, NULL, &ndatums);
2440         if (ndatums != 3)
2441                 elog(ERROR, "expected 3-element numeric array");
2442         N = DatumGetNumeric(transdatums[0]);
2443         sumX = DatumGetNumeric(transdatums[1]);
2444         sumX2 = DatumGetNumeric(transdatums[2]);
2445
2446         if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2))
2447                 return make_result(&const_nan);
2448
2449         init_var(&vN);
2450         set_var_from_num(N, &vN);
2451
2452         /*
2453          * Sample stddev and variance are undefined when N <= 1; population stddev
2454          * is undefined when N == 0. Return NULL in either case.
2455          */
2456         if (sample)
2457                 comp = &const_one;
2458         else
2459                 comp = &const_zero;
2460
2461         if (cmp_var(&vN, comp) <= 0)
2462         {
2463                 free_var(&vN);
2464                 *is_null = true;
2465                 return NULL;
2466         }
2467
2468         init_var(&vNminus1);
2469         sub_var(&vN, &const_one, &vNminus1);
2470
2471         init_var(&vsumX);
2472         set_var_from_num(sumX, &vsumX);
2473         init_var(&vsumX2);
2474         set_var_from_num(sumX2, &vsumX2);
2475
2476         /* compute rscale for mul_var calls */
2477         rscale = vsumX.dscale * 2;
2478
2479         mul_var(&vsumX, &vsumX, &vsumX, rscale);        /* vsumX = sumX * sumX */
2480         mul_var(&vN, &vsumX2, &vsumX2, rscale);         /* vsumX2 = N * sumX2 */
2481         sub_var(&vsumX2, &vsumX, &vsumX2);      /* N * sumX2 - sumX * sumX */
2482
2483         if (cmp_var(&vsumX2, &const_zero) <= 0)
2484         {
2485                 /* Watch out for roundoff error producing a negative numerator */
2486                 res = make_result(&const_zero);
2487         }
2488         else
2489         {
2490                 if (sample)
2491                         mul_var(&vN, &vNminus1, &vNminus1, 0);          /* N * (N - 1) */
2492                 else
2493                         mul_var(&vN, &vN, &vNminus1, 0);        /* N * N */
2494                 rscale = select_div_scale(&vsumX2, &vNminus1);
2495                 div_var(&vsumX2, &vNminus1, &vsumX, rscale, true);              /* variance */
2496                 if (!variance)
2497                         sqrt_var(&vsumX, &vsumX, rscale);       /* stddev */
2498
2499                 res = make_result(&vsumX);
2500         }
2501
2502         free_var(&vN);
2503         free_var(&vNminus1);
2504         free_var(&vsumX);
2505         free_var(&vsumX2);
2506
2507         return res;
2508 }
2509
2510 Datum
2511 numeric_var_samp(PG_FUNCTION_ARGS)
2512 {
2513         Numeric         res;
2514         bool            is_null;
2515
2516         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2517                                                                   true, true, &is_null);
2518
2519         if (is_null)
2520                 PG_RETURN_NULL();
2521         else
2522                 PG_RETURN_NUMERIC(res);
2523 }
2524
2525 Datum
2526 numeric_stddev_samp(PG_FUNCTION_ARGS)
2527 {
2528         Numeric         res;
2529         bool            is_null;
2530
2531         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2532                                                                   false, true, &is_null);
2533
2534         if (is_null)
2535                 PG_RETURN_NULL();
2536         else
2537                 PG_RETURN_NUMERIC(res);
2538 }
2539
2540 Datum
2541 numeric_var_pop(PG_FUNCTION_ARGS)
2542 {
2543         Numeric         res;
2544         bool            is_null;
2545
2546         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2547                                                                   true, false, &is_null);
2548
2549         if (is_null)
2550                 PG_RETURN_NULL();
2551         else
2552                 PG_RETURN_NUMERIC(res);
2553 }
2554
2555 Datum
2556 numeric_stddev_pop(PG_FUNCTION_ARGS)
2557 {
2558         Numeric         res;
2559         bool            is_null;
2560
2561         res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
2562                                                                   false, false, &is_null);
2563
2564         if (is_null)
2565                 PG_RETURN_NULL();
2566         else
2567                 PG_RETURN_NUMERIC(res);
2568 }
2569
2570 /*
2571  * SUM transition functions for integer datatypes.
2572  *
2573  * To avoid overflow, we use accumulators wider than the input datatype.
2574  * A Numeric accumulator is needed for int8 input; for int4 and int2
2575  * inputs, we use int8 accumulators which should be sufficient for practical
2576  * purposes.  (The latter two therefore don't really belong in this file,
2577  * but we keep them here anyway.)
2578  *
2579  * Because SQL92 defines the SUM() of no values to be NULL, not zero,
2580  * the initial condition of the transition data value needs to be NULL. This
2581  * means we can't rely on ExecAgg to automatically insert the first non-null
2582  * data value into the transition data: it doesn't know how to do the type
2583  * conversion.  The upshot is that these routines have to be marked non-strict
2584  * and handle substitution of the first non-null input themselves.
2585  */
2586
2587 Datum
2588 int2_sum(PG_FUNCTION_ARGS)
2589 {
2590         int64           newval;
2591
2592         if (PG_ARGISNULL(0))
2593         {
2594                 /* No non-null input seen so far... */
2595                 if (PG_ARGISNULL(1))
2596                         PG_RETURN_NULL();       /* still no non-null */
2597                 /* This is the first non-null input. */
2598                 newval = (int64) PG_GETARG_INT16(1);
2599                 PG_RETURN_INT64(newval);
2600         }
2601
2602         /*
2603          * If we're invoked by nodeAgg, we can cheat and modify our first
2604          * parameter in-place to avoid palloc overhead. If not, we need to return
2605          * the new value of the transition variable.
2606          * (If int8 is pass-by-value, then of course this is useless as well
2607          * as incorrect, so just ifdef it out.)
2608          */
2609 #ifndef USE_FLOAT8_BYVAL                /* controls int8 too */
2610         if (fcinfo->context && IsA(fcinfo->context, AggState))
2611         {
2612                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
2613
2614                 /* Leave the running sum unchanged in the new input is null */
2615                 if (!PG_ARGISNULL(1))
2616                         *oldsum = *oldsum + (int64) PG_GETARG_INT16(1);
2617
2618                 PG_RETURN_POINTER(oldsum);
2619         }
2620         else
2621 #endif
2622         {
2623                 int64           oldsum = PG_GETARG_INT64(0);
2624
2625                 /* Leave sum unchanged if new input is null. */
2626                 if (PG_ARGISNULL(1))
2627                         PG_RETURN_INT64(oldsum);
2628
2629                 /* OK to do the addition. */
2630                 newval = oldsum + (int64) PG_GETARG_INT16(1);
2631
2632                 PG_RETURN_INT64(newval);
2633         }
2634 }
2635
2636 Datum
2637 int4_sum(PG_FUNCTION_ARGS)
2638 {
2639         int64           newval;
2640
2641         if (PG_ARGISNULL(0))
2642         {
2643                 /* No non-null input seen so far... */
2644                 if (PG_ARGISNULL(1))
2645                         PG_RETURN_NULL();       /* still no non-null */
2646                 /* This is the first non-null input. */
2647                 newval = (int64) PG_GETARG_INT32(1);
2648                 PG_RETURN_INT64(newval);
2649         }
2650
2651         /*
2652          * If we're invoked by nodeAgg, we can cheat and modify our first
2653          * parameter in-place to avoid palloc overhead. If not, we need to return
2654          * the new value of the transition variable.
2655          * (If int8 is pass-by-value, then of course this is useless as well
2656          * as incorrect, so just ifdef it out.)
2657          */
2658 #ifndef USE_FLOAT8_BYVAL                /* controls int8 too */
2659         if (fcinfo->context && IsA(fcinfo->context, AggState))
2660         {
2661                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
2662
2663                 /* Leave the running sum unchanged in the new input is null */
2664                 if (!PG_ARGISNULL(1))
2665                         *oldsum = *oldsum + (int64) PG_GETARG_INT32(1);
2666
2667                 PG_RETURN_POINTER(oldsum);
2668         }
2669         else
2670 #endif
2671         {
2672                 int64           oldsum = PG_GETARG_INT64(0);
2673
2674                 /* Leave sum unchanged if new input is null. */
2675                 if (PG_ARGISNULL(1))
2676                         PG_RETURN_INT64(oldsum);
2677
2678                 /* OK to do the addition. */
2679                 newval = oldsum + (int64) PG_GETARG_INT32(1);
2680
2681                 PG_RETURN_INT64(newval);
2682         }
2683 }
2684
2685 Datum
2686 int8_sum(PG_FUNCTION_ARGS)
2687 {
2688         Numeric         oldsum;
2689         Datum           newval;
2690
2691         if (PG_ARGISNULL(0))
2692         {
2693                 /* No non-null input seen so far... */
2694                 if (PG_ARGISNULL(1))
2695                         PG_RETURN_NULL();       /* still no non-null */
2696                 /* This is the first non-null input. */
2697                 newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2698                 PG_RETURN_DATUM(newval);
2699         }
2700
2701         /*
2702          * Note that we cannot special-case the nodeAgg case here, as we do for
2703          * int2_sum and int4_sum: numeric is of variable size, so we cannot modify
2704          * our first parameter in-place.
2705          */
2706
2707         oldsum = PG_GETARG_NUMERIC(0);
2708
2709         /* Leave sum unchanged if new input is null. */
2710         if (PG_ARGISNULL(1))
2711                 PG_RETURN_NUMERIC(oldsum);
2712
2713         /* OK to do the addition. */
2714         newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
2715
2716         PG_RETURN_DATUM(DirectFunctionCall2(numeric_add,
2717                                                                                 NumericGetDatum(oldsum), newval));
2718 }
2719
2720
2721 /*
2722  * Routines for avg(int2) and avg(int4).  The transition datatype
2723  * is a two-element int8 array, holding count and sum.
2724  */
2725
2726 typedef struct Int8TransTypeData
2727 {
2728 #ifndef INT64_IS_BUSTED
2729         int64           count;
2730         int64           sum;
2731 #else
2732         /* "int64" isn't really 64 bits, so fake up properly-aligned fields */
2733         int32           count;
2734         int32           pad1;
2735         int32           sum;
2736         int32           pad2;
2737 #endif
2738 } Int8TransTypeData;
2739
2740 Datum
2741 int2_avg_accum(PG_FUNCTION_ARGS)
2742 {
2743         ArrayType  *transarray;
2744         int16           newval = PG_GETARG_INT16(1);
2745         Int8TransTypeData *transdata;
2746
2747         /*
2748          * If we're invoked by nodeAgg, we can cheat and modify our first
2749          * parameter in-place to reduce palloc overhead. Otherwise we need to make
2750          * a copy of it before scribbling on it.
2751          */
2752         if (fcinfo->context && IsA(fcinfo->context, AggState))
2753                 transarray = PG_GETARG_ARRAYTYPE_P(0);
2754         else
2755                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2756
2757         if (ARR_HASNULL(transarray) ||
2758                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2759                 elog(ERROR, "expected 2-element int8 array");
2760
2761         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2762         transdata->count++;
2763         transdata->sum += newval;
2764
2765         PG_RETURN_ARRAYTYPE_P(transarray);
2766 }
2767
2768 Datum
2769 int4_avg_accum(PG_FUNCTION_ARGS)
2770 {
2771         ArrayType  *transarray;
2772         int32           newval = PG_GETARG_INT32(1);
2773         Int8TransTypeData *transdata;
2774
2775         /*
2776          * If we're invoked by nodeAgg, we can cheat and modify our first
2777          * parameter in-place to reduce palloc overhead. Otherwise we need to make
2778          * a copy of it before scribbling on it.
2779          */
2780         if (fcinfo->context && IsA(fcinfo->context, AggState))
2781                 transarray = PG_GETARG_ARRAYTYPE_P(0);
2782         else
2783                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
2784
2785         if (ARR_HASNULL(transarray) ||
2786                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2787                 elog(ERROR, "expected 2-element int8 array");
2788
2789         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2790         transdata->count++;
2791         transdata->sum += newval;
2792
2793         PG_RETURN_ARRAYTYPE_P(transarray);
2794 }
2795
2796 Datum
2797 int8_avg(PG_FUNCTION_ARGS)
2798 {
2799         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
2800         Int8TransTypeData *transdata;
2801         Datum           countd,
2802                                 sumd;
2803
2804         if (ARR_HASNULL(transarray) ||
2805                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
2806                 elog(ERROR, "expected 2-element int8 array");
2807         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
2808
2809         /* SQL92 defines AVG of no values to be NULL */
2810         if (transdata->count == 0)
2811                 PG_RETURN_NULL();
2812
2813         countd = DirectFunctionCall1(int8_numeric,
2814                                                                  Int64GetDatumFast(transdata->count));
2815         sumd = DirectFunctionCall1(int8_numeric,
2816                                                            Int64GetDatumFast(transdata->sum));
2817
2818         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumd, countd));
2819 }
2820
2821
2822 /* ----------------------------------------------------------------------
2823  *
2824  * Debug support
2825  *
2826  * ----------------------------------------------------------------------
2827  */
2828
2829 #ifdef NUMERIC_DEBUG
2830
2831 /*
2832  * dump_numeric() - Dump a value in the db storage format for debugging
2833  */
2834 static void
2835 dump_numeric(const char *str, Numeric num)
2836 {
2837         NumericDigit *digits = NUMERIC_DIGITS(num);
2838         int                     ndigits;
2839         int                     i;
2840
2841         ndigits = NUMERIC_NDIGITS(num);
2842
2843         printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num));
2844         switch (NUMERIC_SIGN(num))
2845         {
2846                 case NUMERIC_POS:
2847                         printf("POS");
2848                         break;
2849                 case NUMERIC_NEG:
2850                         printf("NEG");
2851                         break;
2852                 case NUMERIC_NAN:
2853                         printf("NaN");
2854                         break;
2855                 default:
2856                         printf("SIGN=0x%x", NUMERIC_SIGN(num));
2857                         break;
2858         }
2859
2860         for (i = 0; i < ndigits; i++)
2861                 printf(" %0*d", DEC_DIGITS, digits[i]);
2862         printf("\n");
2863 }
2864
2865
2866 /*
2867  * dump_var() - Dump a value in the variable format for debugging
2868  */
2869 static void
2870 dump_var(const char *str, NumericVar *var)
2871 {
2872         int                     i;
2873
2874         printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale);
2875         switch (var->sign)
2876         {
2877                 case NUMERIC_POS:
2878                         printf("POS");
2879                         break;
2880                 case NUMERIC_NEG:
2881                         printf("NEG");
2882                         break;
2883                 case NUMERIC_NAN:
2884                         printf("NaN");
2885                         break;
2886                 default:
2887                         printf("SIGN=0x%x", var->sign);
2888                         break;
2889         }
2890
2891         for (i = 0; i < var->ndigits; i++)
2892                 printf(" %0*d", DEC_DIGITS, var->digits[i]);
2893
2894         printf("\n");
2895 }
2896 #endif   /* NUMERIC_DEBUG */
2897
2898
2899 /* ----------------------------------------------------------------------
2900  *
2901  * Local functions follow
2902  *
2903  * In general, these do not support NaNs --- callers must eliminate
2904  * the possibility of NaN first.  (make_result() is an exception.)
2905  *
2906  * ----------------------------------------------------------------------
2907  */
2908
2909
2910 /*
2911  * alloc_var() -
2912  *
2913  *      Allocate a digit buffer of ndigits digits (plus a spare digit for rounding)
2914  */
2915 static void
2916 alloc_var(NumericVar *var, int ndigits)
2917 {
2918         digitbuf_free(var->buf);
2919         var->buf = digitbuf_alloc(ndigits + 1);
2920         var->buf[0] = 0;                        /* spare digit for rounding */
2921         var->digits = var->buf + 1;
2922         var->ndigits = ndigits;
2923 }
2924
2925
2926 /*
2927  * free_var() -
2928  *
2929  *      Return the digit buffer of a variable to the free pool
2930  */
2931 static void
2932 free_var(NumericVar *var)
2933 {
2934         digitbuf_free(var->buf);
2935         var->buf = NULL;
2936         var->digits = NULL;
2937         var->sign = NUMERIC_NAN;
2938 }
2939
2940
2941 /*
2942  * zero_var() -
2943  *
2944  *      Set a variable to ZERO.
2945  *      Note: its dscale is not touched.
2946  */
2947 static void
2948 zero_var(NumericVar *var)
2949 {
2950         digitbuf_free(var->buf);
2951         var->buf = NULL;
2952         var->digits = NULL;
2953         var->ndigits = 0;
2954         var->weight = 0;                        /* by convention; doesn't really matter */
2955         var->sign = NUMERIC_POS;        /* anything but NAN... */
2956 }
2957
2958
2959 /*
2960  * set_var_from_str()
2961  *
2962  *      Parse a string and put the number into a variable
2963  */
2964 static void
2965 set_var_from_str(const char *str, NumericVar *dest)
2966 {
2967         const char *cp = str;
2968         bool            have_dp = FALSE;
2969         int                     i;
2970         unsigned char *decdigits;
2971         int                     sign = NUMERIC_POS;
2972         int                     dweight = -1;
2973         int                     ddigits;
2974         int                     dscale = 0;
2975         int                     weight;
2976         int                     ndigits;
2977         int                     offset;
2978         NumericDigit *digits;
2979
2980         /*
2981          * We first parse the string to extract decimal digits and determine the
2982          * correct decimal weight.      Then convert to NBASE representation.
2983          */
2984
2985         /* skip leading spaces */
2986         while (*cp)
2987         {
2988                 if (!isspace((unsigned char) *cp))
2989                         break;
2990                 cp++;
2991         }
2992
2993         switch (*cp)
2994         {
2995                 case '+':
2996                         sign = NUMERIC_POS;
2997                         cp++;
2998                         break;
2999
3000                 case '-':
3001                         sign = NUMERIC_NEG;
3002                         cp++;
3003                         break;
3004         }
3005
3006         if (*cp == '.')
3007         {
3008                 have_dp = TRUE;
3009                 cp++;
3010         }
3011
3012         if (!isdigit((unsigned char) *cp))
3013                 ereport(ERROR,
3014                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3015                           errmsg("invalid input syntax for type numeric: \"%s\"", str)));
3016
3017         decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS * 2);
3018
3019         /* leading padding for digit alignment later */
3020         memset(decdigits, 0, DEC_DIGITS);
3021         i = DEC_DIGITS;
3022
3023         while (*cp)
3024         {
3025                 if (isdigit((unsigned char) *cp))
3026                 {
3027                         decdigits[i++] = *cp++ - '0';
3028                         if (!have_dp)
3029                                 dweight++;
3030                         else
3031                                 dscale++;
3032                 }
3033                 else if (*cp == '.')
3034                 {
3035                         if (have_dp)
3036                                 ereport(ERROR,
3037                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3038                                           errmsg("invalid input syntax for type numeric: \"%s\"",
3039                                                          str)));
3040                         have_dp = TRUE;
3041                         cp++;
3042                 }
3043                 else
3044                         break;
3045         }
3046
3047         ddigits = i - DEC_DIGITS;
3048         /* trailing padding for digit alignment later */
3049         memset(decdigits + i, 0, DEC_DIGITS - 1);
3050
3051         /* Handle exponent, if any */
3052         if (*cp == 'e' || *cp == 'E')
3053         {
3054                 long            exponent;
3055                 char       *endptr;
3056
3057                 cp++;
3058                 exponent = strtol(cp, &endptr, 10);
3059                 if (endptr == cp)
3060                         ereport(ERROR,
3061                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3062                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3063                                                         str)));
3064                 cp = endptr;
3065                 if (exponent > NUMERIC_MAX_PRECISION ||
3066                         exponent < -NUMERIC_MAX_PRECISION)
3067                         ereport(ERROR,
3068                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3069                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3070                                                         str)));
3071                 dweight += (int) exponent;
3072                 dscale -= (int) exponent;
3073                 if (dscale < 0)
3074                         dscale = 0;
3075         }
3076
3077         /* Should be nothing left but spaces */
3078         while (*cp)
3079         {
3080                 if (!isspace((unsigned char) *cp))
3081                         ereport(ERROR,
3082                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3083                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3084                                                         str)));
3085                 cp++;
3086         }
3087
3088         /*
3089          * Okay, convert pure-decimal representation to base NBASE.  First we need
3090          * to determine the converted weight and ndigits.  offset is the number of
3091          * decimal zeroes to insert before the first given digit to have a
3092          * correctly aligned first NBASE digit.
3093          */
3094         if (dweight >= 0)
3095                 weight = (dweight + 1 + DEC_DIGITS - 1) / DEC_DIGITS - 1;
3096         else
3097                 weight = -((-dweight - 1) / DEC_DIGITS + 1);
3098         offset = (weight + 1) * DEC_DIGITS - (dweight + 1);
3099         ndigits = (ddigits + offset + DEC_DIGITS - 1) / DEC_DIGITS;
3100
3101         alloc_var(dest, ndigits);
3102         dest->sign = sign;
3103         dest->weight = weight;
3104         dest->dscale = dscale;
3105
3106         i = DEC_DIGITS - offset;
3107         digits = dest->digits;
3108
3109         while (ndigits-- > 0)
3110         {
3111 #if DEC_DIGITS == 4
3112                 *digits++ = ((decdigits[i] * 10 + decdigits[i + 1]) * 10 +
3113                                          decdigits[i + 2]) * 10 + decdigits[i + 3];
3114 #elif DEC_DIGITS == 2
3115                 *digits++ = decdigits[i] * 10 + decdigits[i + 1];
3116 #elif DEC_DIGITS == 1
3117                 *digits++ = decdigits[i];
3118 #else
3119 #error unsupported NBASE
3120 #endif
3121                 i += DEC_DIGITS;
3122         }
3123
3124         pfree(decdigits);
3125
3126         /* Strip any leading/trailing zeroes, and normalize weight if zero */
3127         strip_var(dest);
3128 }
3129
3130
3131 /*
3132  * set_var_from_num() -
3133  *
3134  *      Convert the packed db format into a variable
3135  */
3136 static void
3137 set_var_from_num(Numeric num, NumericVar *dest)
3138 {
3139         int                     ndigits;
3140
3141         ndigits = NUMERIC_NDIGITS(num);
3142
3143         alloc_var(dest, ndigits);
3144
3145         dest->weight = num->n_weight;
3146         dest->sign = NUMERIC_SIGN(num);
3147         dest->dscale = NUMERIC_DSCALE(num);
3148
3149         memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit));
3150 }
3151
3152
3153 /*
3154  * set_var_from_var() -
3155  *
3156  *      Copy one variable into another
3157  */
3158 static void
3159 set_var_from_var(NumericVar *value, NumericVar *dest)
3160 {
3161         NumericDigit *newbuf;
3162
3163         newbuf = digitbuf_alloc(value->ndigits + 1);
3164         newbuf[0] = 0;                          /* spare digit for rounding */
3165         memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit));
3166
3167         digitbuf_free(dest->buf);
3168
3169         memmove(dest, value, sizeof(NumericVar));
3170         dest->buf = newbuf;
3171         dest->digits = newbuf + 1;
3172 }
3173
3174
3175 /*
3176  * get_str_from_var() -
3177  *
3178  *      Convert a var to text representation (guts of numeric_out).
3179  *      CAUTION: var's contents may be modified by rounding!
3180  *      Returns a palloc'd string.
3181  */
3182 static char *
3183 get_str_from_var(NumericVar *var, int dscale)
3184 {
3185         char       *str;
3186         char       *cp;
3187         char       *endcp;
3188         int                     i;
3189         int                     d;
3190         NumericDigit dig;
3191
3192 #if DEC_DIGITS > 1
3193         NumericDigit d1;
3194 #endif
3195
3196         if (dscale < 0)
3197                 dscale = 0;
3198
3199         /*
3200          * Check if we must round up before printing the value and do so.
3201          */
3202         round_var(var, dscale);
3203
3204         /*
3205          * Allocate space for the result.
3206          *
3207          * i is set to to # of decimal digits before decimal point. dscale is the
3208          * # of decimal digits we will print after decimal point. We may generate
3209          * as many as DEC_DIGITS-1 excess digits at the end, and in addition we
3210          * need room for sign, decimal point, null terminator.
3211          */
3212         i = (var->weight + 1) * DEC_DIGITS;
3213         if (i <= 0)
3214                 i = 1;
3215
3216         str = palloc(i + dscale + DEC_DIGITS + 2);
3217         cp = str;
3218
3219         /*
3220          * Output a dash for negative values
3221          */
3222         if (var->sign == NUMERIC_NEG)
3223                 *cp++ = '-';
3224
3225         /*
3226          * Output all digits before the decimal point
3227          */
3228         if (var->weight < 0)
3229         {
3230                 d = var->weight + 1;
3231                 *cp++ = '0';
3232         }
3233         else
3234         {
3235                 for (d = 0; d <= var->weight; d++)
3236                 {
3237                         dig = (d < var->ndigits) ? var->digits[d] : 0;
3238                         /* In the first digit, suppress extra leading decimal zeroes */
3239 #if DEC_DIGITS == 4
3240                         {
3241                                 bool            putit = (d > 0);
3242
3243                                 d1 = dig / 1000;
3244                                 dig -= d1 * 1000;
3245                                 putit |= (d1 > 0);
3246                                 if (putit)
3247                                         *cp++ = d1 + '0';
3248                                 d1 = dig / 100;
3249                                 dig -= d1 * 100;
3250                                 putit |= (d1 > 0);
3251                                 if (putit)
3252                                         *cp++ = d1 + '0';
3253                                 d1 = dig / 10;
3254                                 dig -= d1 * 10;
3255                                 putit |= (d1 > 0);
3256                                 if (putit)
3257                                         *cp++ = d1 + '0';
3258                                 *cp++ = dig + '0';
3259                         }
3260 #elif DEC_DIGITS == 2
3261                         d1 = dig / 10;
3262                         dig -= d1 * 10;
3263                         if (d1 > 0 || d > 0)
3264                                 *cp++ = d1 + '0';
3265                         *cp++ = dig + '0';
3266 #elif DEC_DIGITS == 1
3267                         *cp++ = dig + '0';
3268 #else
3269 #error unsupported NBASE
3270 #endif
3271                 }
3272         }
3273
3274         /*
3275          * If requested, output a decimal point and all the digits that follow it.
3276          * We initially put out a multiple of DEC_DIGITS digits, then truncate if
3277          * needed.
3278          */
3279         if (dscale > 0)
3280         {
3281                 *cp++ = '.';
3282                 endcp = cp + dscale;
3283                 for (i = 0; i < dscale; d++, i += DEC_DIGITS)
3284                 {
3285                         dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0;
3286 #if DEC_DIGITS == 4
3287                         d1 = dig / 1000;
3288                         dig -= d1 * 1000;
3289                         *cp++ = d1 + '0';
3290                         d1 = dig / 100;
3291                         dig -= d1 * 100;
3292                         *cp++ = d1 + '0';
3293                         d1 = dig / 10;
3294                         dig -= d1 * 10;
3295                         *cp++ = d1 + '0';
3296                         *cp++ = dig + '0';
3297 #elif DEC_DIGITS == 2
3298                         d1 = dig / 10;
3299                         dig -= d1 * 10;
3300                         *cp++ = d1 + '0';
3301                         *cp++ = dig + '0';
3302 #elif DEC_DIGITS == 1
3303                         *cp++ = dig + '0';
3304 #else
3305 #error unsupported NBASE
3306 #endif
3307                 }
3308                 cp = endcp;
3309         }
3310
3311         /*
3312          * terminate the string and return it
3313          */
3314         *cp = '\0';
3315         return str;
3316 }
3317
3318
3319 /*
3320  * make_result() -
3321  *
3322  *      Create the packed db numeric format in palloc()'d memory from
3323  *      a variable.
3324  */
3325 static Numeric
3326 make_result(NumericVar *var)
3327 {
3328         Numeric         result;
3329         NumericDigit *digits = var->digits;
3330         int                     weight = var->weight;
3331         int                     sign = var->sign;
3332         int                     n;
3333         Size            len;
3334
3335         if (sign == NUMERIC_NAN)
3336         {
3337                 result = (Numeric) palloc(NUMERIC_HDRSZ);
3338
3339                 SET_VARSIZE(result, NUMERIC_HDRSZ);
3340                 result->n_weight = 0;
3341                 result->n_sign_dscale = NUMERIC_NAN;
3342
3343                 dump_numeric("make_result()", result);
3344                 return result;
3345         }
3346
3347         n = var->ndigits;
3348
3349         /* truncate leading zeroes */
3350         while (n > 0 && *digits == 0)
3351         {
3352                 digits++;
3353                 weight--;
3354                 n--;
3355         }
3356         /* truncate trailing zeroes */
3357         while (n > 0 && digits[n - 1] == 0)
3358                 n--;
3359
3360         /* If zero result, force to weight=0 and positive sign */
3361         if (n == 0)
3362         {
3363                 weight = 0;
3364                 sign = NUMERIC_POS;
3365         }
3366
3367         /* Build the result */
3368         len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
3369         result = (Numeric) palloc(len);
3370         SET_VARSIZE(result, len);
3371         result->n_weight = weight;
3372         result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK);
3373
3374         memcpy(result->n_data, digits, n * sizeof(NumericDigit));
3375
3376         /* Check for overflow of int16 fields */
3377         if (result->n_weight != weight ||
3378                 NUMERIC_DSCALE(result) != var->dscale)
3379                 ereport(ERROR,
3380                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3381                                  errmsg("value overflows numeric format")));
3382
3383         dump_numeric("make_result()", result);
3384         return result;
3385 }
3386
3387
3388 /*
3389  * apply_typmod() -
3390  *
3391  *      Do bounds checking and rounding according to the attributes
3392  *      typmod field.
3393  */
3394 static void
3395 apply_typmod(NumericVar *var, int32 typmod)
3396 {
3397         int                     precision;
3398         int                     scale;
3399         int                     maxdigits;
3400         int                     ddigits;
3401         int                     i;
3402
3403         /* Do nothing if we have a default typmod (-1) */
3404         if (typmod < (int32) (VARHDRSZ))
3405                 return;
3406
3407         typmod -= VARHDRSZ;
3408         precision = (typmod >> 16) & 0xffff;
3409         scale = typmod & 0xffff;
3410         maxdigits = precision - scale;
3411
3412         /* Round to target scale (and set var->dscale) */
3413         round_var(var, scale);
3414
3415         /*
3416          * Check for overflow - note we can't do this before rounding, because
3417          * rounding could raise the weight.  Also note that the var's weight could
3418          * be inflated by leading zeroes, which will be stripped before storage
3419          * but perhaps might not have been yet. In any case, we must recognize a
3420          * true zero, whose weight doesn't mean anything.
3421          */
3422         ddigits = (var->weight + 1) * DEC_DIGITS;
3423         if (ddigits > maxdigits)
3424         {
3425                 /* Determine true weight; and check for all-zero result */
3426                 for (i = 0; i < var->ndigits; i++)
3427                 {
3428                         NumericDigit dig = var->digits[i];
3429
3430                         if (dig)
3431                         {
3432                                 /* Adjust for any high-order decimal zero digits */
3433 #if DEC_DIGITS == 4
3434                                 if (dig < 10)
3435                                         ddigits -= 3;
3436                                 else if (dig < 100)
3437                                         ddigits -= 2;
3438                                 else if (dig < 1000)
3439                                         ddigits -= 1;
3440 #elif DEC_DIGITS == 2
3441                                 if (dig < 10)
3442                                         ddigits -= 1;
3443 #elif DEC_DIGITS == 1
3444                                 /* no adjustment */
3445 #else
3446 #error unsupported NBASE
3447 #endif
3448                                 if (ddigits > maxdigits)
3449                                         ereport(ERROR,
3450                                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3451                                                          errmsg("numeric field overflow"),
3452                                                          errdetail("A field with precision %d, scale %d must round to an absolute value less than %s%d.",
3453                                                                            precision, scale,
3454                                         /* Display 10^0 as 1 */
3455                                                                            maxdigits ? "10^" : "",
3456                                                                            maxdigits ? maxdigits : 1
3457                                                                            )));
3458                                 break;
3459                         }
3460                         ddigits -= DEC_DIGITS;
3461                 }
3462         }
3463 }
3464
3465 /*
3466  * Convert numeric to int8, rounding if needed.
3467  *
3468  * If overflow, return FALSE (no error is raised).      Return TRUE if okay.
3469  *
3470  *      CAUTION: var's contents may be modified by rounding!
3471  */
3472 static bool
3473 numericvar_to_int8(NumericVar *var, int64 *result)
3474 {
3475         NumericDigit *digits;
3476         int                     ndigits;
3477         int                     weight;
3478         int                     i;
3479         int64           val,
3480                                 oldval;
3481         bool            neg;
3482
3483         /* Round to nearest integer */
3484         round_var(var, 0);
3485
3486         /* Check for zero input */
3487         strip_var(var);
3488         ndigits = var->ndigits;
3489         if (ndigits == 0)
3490         {
3491                 *result = 0;
3492                 return true;
3493         }
3494
3495         /*
3496          * For input like 10000000000, we must treat stripped digits as real. So
3497          * the loop assumes there are weight+1 digits before the decimal point.
3498          */
3499         weight = var->weight;
3500         Assert(weight >= 0 && ndigits <= weight + 1);
3501
3502         /* Construct the result */
3503         digits = var->digits;
3504         neg = (var->sign == NUMERIC_NEG);
3505         val = digits[0];
3506         for (i = 1; i <= weight; i++)
3507         {
3508                 oldval = val;
3509                 val *= NBASE;
3510                 if (i < ndigits)
3511                         val += digits[i];
3512
3513                 /*
3514                  * The overflow check is a bit tricky because we want to accept
3515                  * INT64_MIN, which will overflow the positive accumulator.  We can
3516                  * detect this case easily though because INT64_MIN is the only
3517                  * nonzero value for which -val == val (on a two's complement machine,
3518                  * anyway).
3519                  */
3520                 if ((val / NBASE) != oldval)    /* possible overflow? */
3521                 {
3522                         if (!neg || (-val) != val || val == 0 || oldval < 0)
3523                                 return false;
3524                 }
3525         }
3526
3527         *result = neg ? -val : val;
3528         return true;
3529 }
3530
3531 /*
3532  * Convert int8 value to numeric.
3533  */
3534 static void
3535 int8_to_numericvar(int64 val, NumericVar *var)
3536 {
3537         uint64          uval,
3538                                 newuval;
3539         NumericDigit *ptr;
3540         int                     ndigits;
3541
3542         /* int8 can require at most 19 decimal digits; add one for safety */
3543         alloc_var(var, 20 / DEC_DIGITS);
3544         if (val < 0)
3545         {
3546                 var->sign = NUMERIC_NEG;
3547                 uval = -val;
3548         }
3549         else
3550         {
3551                 var->sign = NUMERIC_POS;
3552                 uval = val;
3553         }
3554         var->dscale = 0;
3555         if (val == 0)
3556         {
3557                 var->ndigits = 0;
3558                 var->weight = 0;
3559                 return;
3560         }
3561         ptr = var->digits + var->ndigits;
3562         ndigits = 0;
3563         do
3564         {
3565                 ptr--;
3566                 ndigits++;
3567                 newuval = uval / NBASE;
3568                 *ptr = uval - newuval * NBASE;
3569                 uval = newuval;
3570         } while (uval);
3571         var->digits = ptr;
3572         var->ndigits = ndigits;
3573         var->weight = ndigits - 1;
3574 }
3575
3576 /*
3577  * Convert numeric to float8; if out of range, return +/- HUGE_VAL
3578  */
3579 static double
3580 numeric_to_double_no_overflow(Numeric num)
3581 {
3582         char       *tmp;
3583         double          val;
3584         char       *endptr;
3585
3586         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
3587                                                                                           NumericGetDatum(num)));
3588
3589         /* unlike float8in, we ignore ERANGE from strtod */
3590         val = strtod(tmp, &endptr);
3591         if (*endptr != '\0')
3592         {
3593                 /* shouldn't happen ... */
3594                 ereport(ERROR,
3595                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3596                          errmsg("invalid input syntax for type double precision: \"%s\"",
3597                                         tmp)));
3598         }
3599
3600         pfree(tmp);
3601
3602         return val;
3603 }
3604
3605 /* As above, but work from a NumericVar */
3606 static double
3607 numericvar_to_double_no_overflow(NumericVar *var)
3608 {
3609         char       *tmp;
3610         double          val;
3611         char       *endptr;
3612
3613         tmp = get_str_from_var(var, var->dscale);
3614
3615         /* unlike float8in, we ignore ERANGE from strtod */
3616         val = strtod(tmp, &endptr);
3617         if (*endptr != '\0')
3618         {
3619                 /* shouldn't happen ... */
3620                 ereport(ERROR,
3621                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3622                          errmsg("invalid input syntax for type double precision: \"%s\"",
3623                                         tmp)));
3624         }
3625
3626         pfree(tmp);
3627
3628         return val;
3629 }
3630
3631
3632 /*
3633  * cmp_var() -
3634  *
3635  *      Compare two values on variable level.  We assume zeroes have been
3636  *      truncated to no digits.
3637  */
3638 static int
3639 cmp_var(NumericVar *var1, NumericVar *var2)
3640 {
3641         return cmp_var_common(var1->digits, var1->ndigits,
3642                                                   var1->weight, var1->sign,
3643                                                   var2->digits, var2->ndigits,
3644                                                   var2->weight, var2->sign);
3645 }
3646
3647 /*
3648  * cmp_var_common() -
3649  *
3650  *      Main routine of cmp_var(). This function can be used by both
3651  *      NumericVar and Numeric.
3652  */
3653 static int
3654 cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
3655                            int var1weight, int var1sign,
3656                            const NumericDigit *var2digits, int var2ndigits,
3657                            int var2weight, int var2sign)
3658 {
3659         if (var1ndigits == 0)
3660         {
3661                 if (var2ndigits == 0)
3662                         return 0;
3663                 if (var2sign == NUMERIC_NEG)
3664                         return 1;
3665                 return -1;
3666         }
3667         if (var2ndigits == 0)
3668         {
3669                 if (var1sign == NUMERIC_POS)
3670                         return 1;
3671                 return -1;
3672         }
3673
3674         if (var1sign == NUMERIC_POS)
3675         {
3676                 if (var2sign == NUMERIC_NEG)
3677                         return 1;
3678                 return cmp_abs_common(var1digits, var1ndigits, var1weight,
3679                                                           var2digits, var2ndigits, var2weight);
3680         }
3681
3682         if (var2sign == NUMERIC_POS)
3683                 return -1;
3684
3685         return cmp_abs_common(var2digits, var2ndigits, var2weight,
3686                                                   var1digits, var1ndigits, var1weight);
3687 }
3688
3689
3690 /*
3691  * add_var() -
3692  *
3693  *      Full version of add functionality on variable level (handling signs).
3694  *      result might point to one of the operands too without danger.
3695  */
3696 static void
3697 add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3698 {
3699         /*
3700          * Decide on the signs of the two variables what to do
3701          */
3702         if (var1->sign == NUMERIC_POS)
3703         {
3704                 if (var2->sign == NUMERIC_POS)
3705                 {
3706                         /*
3707                          * Both are positive result = +(ABS(var1) + ABS(var2))
3708                          */
3709                         add_abs(var1, var2, result);
3710                         result->sign = NUMERIC_POS;
3711                 }
3712                 else
3713                 {
3714                         /*
3715                          * var1 is positive, var2 is negative Must compare absolute values
3716                          */
3717                         switch (cmp_abs(var1, var2))
3718                         {
3719                                 case 0:
3720                                         /* ----------
3721                                          * ABS(var1) == ABS(var2)
3722                                          * result = ZERO
3723                                          * ----------
3724                                          */
3725                                         zero_var(result);
3726                                         result->dscale = Max(var1->dscale, var2->dscale);
3727                                         break;
3728
3729                                 case 1:
3730                                         /* ----------
3731                                          * ABS(var1) > ABS(var2)
3732                                          * result = +(ABS(var1) - ABS(var2))
3733                                          * ----------
3734                                          */
3735                                         sub_abs(var1, var2, result);
3736                                         result->sign = NUMERIC_POS;
3737                                         break;
3738
3739                                 case -1:
3740                                         /* ----------
3741                                          * ABS(var1) < ABS(var2)
3742                                          * result = -(ABS(var2) - ABS(var1))
3743                                          * ----------
3744                                          */
3745                                         sub_abs(var2, var1, result);
3746                                         result->sign = NUMERIC_NEG;
3747                                         break;
3748                         }
3749                 }
3750         }
3751         else
3752         {
3753                 if (var2->sign == NUMERIC_POS)
3754                 {
3755                         /* ----------
3756                          * var1 is negative, var2 is positive
3757                          * Must compare absolute values
3758                          * ----------
3759                          */
3760                         switch (cmp_abs(var1, var2))
3761                         {
3762                                 case 0:
3763                                         /* ----------
3764                                          * ABS(var1) == ABS(var2)
3765                                          * result = ZERO
3766                                          * ----------
3767                                          */
3768                                         zero_var(result);
3769                                         result->dscale = Max(var1->dscale, var2->dscale);
3770                                         break;
3771
3772                                 case 1:
3773                                         /* ----------
3774                                          * ABS(var1) > ABS(var2)
3775                                          * result = -(ABS(var1) - ABS(var2))
3776                                          * ----------
3777                                          */
3778                                         sub_abs(var1, var2, result);
3779                                         result->sign = NUMERIC_NEG;
3780                                         break;
3781
3782                                 case -1:
3783                                         /* ----------
3784                                          * ABS(var1) < ABS(var2)
3785                                          * result = +(ABS(var2) - ABS(var1))
3786                                          * ----------
3787                                          */
3788                                         sub_abs(var2, var1, result);
3789                                         result->sign = NUMERIC_POS;
3790                                         break;
3791                         }
3792                 }
3793                 else
3794                 {
3795                         /* ----------
3796                          * Both are negative
3797                          * result = -(ABS(var1) + ABS(var2))
3798                          * ----------
3799                          */
3800                         add_abs(var1, var2, result);
3801                         result->sign = NUMERIC_NEG;
3802                 }
3803         }
3804 }
3805
3806
3807 /*
3808  * sub_var() -
3809  *
3810  *      Full version of sub functionality on variable level (handling signs).
3811  *      result might point to one of the operands too without danger.
3812  */
3813 static void
3814 sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
3815 {
3816         /*
3817          * Decide on the signs of the two variables what to do
3818          */
3819         if (var1->sign == NUMERIC_POS)
3820         {
3821                 if (var2->sign == NUMERIC_NEG)
3822                 {
3823                         /* ----------
3824                          * var1 is positive, var2 is negative
3825                          * result = +(ABS(var1) + ABS(var2))
3826                          * ----------
3827                          */
3828                         add_abs(var1, var2, result);
3829                         result->sign = NUMERIC_POS;
3830                 }
3831                 else
3832                 {
3833                         /* ----------
3834                          * Both are positive
3835                          * Must compare absolute values
3836                          * ----------
3837                          */
3838                         switch (cmp_abs(var1, var2))
3839                         {
3840                                 case 0:
3841                                         /* ----------
3842                                          * ABS(var1) == ABS(var2)
3843                                          * result = ZERO
3844                                          * ----------
3845                                          */
3846                                         zero_var(result);
3847                                         result->dscale = Max(var1->dscale, var2->dscale);
3848                                         break;
3849
3850                                 case 1:
3851                                         /* ----------
3852                                          * ABS(var1) > ABS(var2)
3853                                          * result = +(ABS(var1) - ABS(var2))
3854                                          * ----------
3855                                          */
3856                                         sub_abs(var1, var2, result);
3857                                         result->sign = NUMERIC_POS;
3858                                         break;
3859
3860                                 case -1:
3861                                         /* ----------
3862                                          * ABS(var1) < ABS(var2)
3863                                          * result = -(ABS(var2) - ABS(var1))
3864                                          * ----------
3865                                          */
3866                                         sub_abs(var2, var1, result);
3867                                         result->sign = NUMERIC_NEG;
3868                                         break;
3869                         }
3870                 }
3871         }
3872         else
3873         {
3874                 if (var2->sign == NUMERIC_NEG)
3875                 {
3876                         /* ----------
3877                          * Both are negative
3878                          * Must compare absolute values
3879                          * ----------
3880                          */
3881                         switch (cmp_abs(var1, var2))
3882                         {
3883                                 case 0:
3884                                         /* ----------
3885                                          * ABS(var1) == ABS(var2)
3886                                          * result = ZERO
3887                                          * ----------
3888                                          */
3889                                         zero_var(result);
3890                                         result->dscale = Max(var1->dscale, var2->dscale);
3891                                         break;
3892
3893                                 case 1:
3894                                         /* ----------
3895                                          * ABS(var1) > ABS(var2)
3896                                          * result = -(ABS(var1) - ABS(var2))
3897                                          * ----------
3898                                          */
3899                                         sub_abs(var1, var2, result);
3900                                         result->sign = NUMERIC_NEG;
3901                                         break;
3902
3903                                 case -1:
3904                                         /* ----------
3905                                          * ABS(var1) < ABS(var2)
3906                                          * result = +(ABS(var2) - ABS(var1))
3907                                          * ----------
3908                                          */
3909                                         sub_abs(var2, var1, result);
3910                                         result->sign = NUMERIC_POS;
3911                                         break;
3912                         }
3913                 }
3914                 else
3915                 {
3916                         /* ----------
3917                          * var1 is negative, var2 is positive
3918                          * result = -(ABS(var1) + ABS(var2))
3919                          * ----------
3920                          */
3921                         add_abs(var1, var2, result);
3922                         result->sign = NUMERIC_NEG;
3923                 }
3924         }
3925 }
3926
3927
3928 /*
3929  * mul_var() -
3930  *
3931  *      Multiplication on variable level. Product of var1 * var2 is stored
3932  *      in result.      Result is rounded to no more than rscale fractional digits.
3933  */
3934 static void
3935 mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
3936                 int rscale)
3937 {
3938         int                     res_ndigits;
3939         int                     res_sign;
3940         int                     res_weight;
3941         int                     maxdigits;
3942         int                *dig;
3943         int                     carry;
3944         int                     maxdig;
3945         int                     newdig;
3946         NumericDigit *res_digits;
3947         int                     i,
3948                                 ri,
3949                                 i1,
3950                                 i2;
3951
3952         /* copy these values into local vars for speed in inner loop */
3953         int                     var1ndigits = var1->ndigits;
3954         int                     var2ndigits = var2->ndigits;
3955         NumericDigit *var1digits = var1->digits;
3956         NumericDigit *var2digits = var2->digits;
3957
3958         if (var1ndigits == 0 || var2ndigits == 0)
3959         {
3960                 /* one or both inputs is zero; so is result */
3961                 zero_var(result);
3962                 result->dscale = rscale;
3963                 return;
3964         }
3965
3966         /* Determine result sign and (maximum possible) weight */
3967         if (var1->sign == var2->sign)
3968                 res_sign = NUMERIC_POS;
3969         else
3970                 res_sign = NUMERIC_NEG;
3971         res_weight = var1->weight + var2->weight + 2;
3972
3973         /*
3974          * Determine number of result digits to compute.  If the exact result
3975          * would have more than rscale fractional digits, truncate the computation
3976          * with MUL_GUARD_DIGITS guard digits.  We do that by pretending that one
3977          * or both inputs have fewer digits than they really do.
3978          */
3979         res_ndigits = var1ndigits + var2ndigits + 1;
3980         maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS;
3981         if (res_ndigits > maxdigits)
3982         {
3983                 if (maxdigits < 3)
3984                 {
3985                         /* no useful precision at all in the result... */
3986                         zero_var(result);
3987                         result->dscale = rscale;
3988                         return;
3989                 }
3990                 /* force maxdigits odd so that input ndigits can be equal */
3991                 if ((maxdigits & 1) == 0)
3992                         maxdigits++;
3993                 if (var1ndigits > var2ndigits)
3994                 {
3995                         var1ndigits -= res_ndigits - maxdigits;
3996                         if (var1ndigits < var2ndigits)
3997                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
3998                 }
3999                 else
4000                 {
4001                         var2ndigits -= res_ndigits - maxdigits;
4002                         if (var2ndigits < var1ndigits)
4003                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
4004                 }
4005                 res_ndigits = maxdigits;
4006                 Assert(res_ndigits == var1ndigits + var2ndigits + 1);
4007         }
4008
4009         /*
4010          * We do the arithmetic in an array "dig[]" of signed int's.  Since
4011          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
4012          * to avoid normalizing carries immediately.
4013          *
4014          * maxdig tracks the maximum possible value of any dig[] entry; when this
4015          * threatens to exceed INT_MAX, we take the time to propagate carries. To
4016          * avoid overflow in maxdig itself, it actually represents the max
4017          * possible value divided by NBASE-1.
4018          */
4019         dig = (int *) palloc0(res_ndigits * sizeof(int));
4020         maxdig = 0;
4021
4022         ri = res_ndigits - 1;
4023         for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--)
4024         {
4025                 int                     var1digit = var1digits[i1];
4026
4027                 if (var1digit == 0)
4028                         continue;
4029
4030                 /* Time to normalize? */
4031                 maxdig += var1digit;
4032                 if (maxdig > INT_MAX / (NBASE - 1))
4033                 {
4034                         /* Yes, do it */
4035                         carry = 0;
4036                         for (i = res_ndigits - 1; i >= 0; i--)
4037                         {
4038                                 newdig = dig[i] + carry;
4039                                 if (newdig >= NBASE)
4040                                 {
4041                                         carry = newdig / NBASE;
4042                                         newdig -= carry * NBASE;
4043                                 }
4044                                 else
4045                                         carry = 0;
4046                                 dig[i] = newdig;
4047                         }
4048                         Assert(carry == 0);
4049                         /* Reset maxdig to indicate new worst-case */
4050                         maxdig = 1 + var1digit;
4051                 }
4052
4053                 /* Add appropriate multiple of var2 into the accumulator */
4054                 i = ri;
4055                 for (i2 = var2ndigits - 1; i2 >= 0; i2--)
4056                         dig[i--] += var1digit * var2digits[i2];
4057         }
4058
4059         /*
4060          * Now we do a final carry propagation pass to normalize the result, which
4061          * we combine with storing the result digits into the output. Note that
4062          * this is still done at full precision w/guard digits.
4063          */
4064         alloc_var(result, res_ndigits);
4065         res_digits = result->digits;
4066         carry = 0;
4067         for (i = res_ndigits - 1; i >= 0; i--)
4068         {
4069                 newdig = dig[i] + carry;
4070                 if (newdig >= NBASE)
4071                 {
4072                         carry = newdig / NBASE;
4073                         newdig -= carry * NBASE;
4074                 }
4075                 else
4076                         carry = 0;
4077                 res_digits[i] = newdig;
4078         }
4079         Assert(carry == 0);
4080
4081         pfree(dig);
4082
4083         /*
4084          * Finally, round the result to the requested precision.
4085          */
4086         result->weight = res_weight;
4087         result->sign = res_sign;
4088
4089         /* Round to target rscale (and set result->dscale) */
4090         round_var(result, rscale);
4091
4092         /* Strip leading and trailing zeroes */
4093         strip_var(result);
4094 }
4095
4096
4097 /*
4098  * div_var() -
4099  *
4100  *      Division on variable level. Quotient of var1 / var2 is stored in result.
4101  *      The quotient is figured to exactly rscale fractional digits.
4102  *      If round is true, it is rounded at the rscale'th digit; if false, it
4103  *      is truncated (towards zero) at that digit.
4104  */
4105 static void
4106 div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
4107                 int rscale, bool round)
4108 {
4109         int                     div_ndigits;
4110         int                     res_ndigits;
4111         int                     res_sign;
4112         int                     res_weight;
4113         int                     carry;
4114         int                     borrow;
4115         int                     divisor1;
4116         int                     divisor2;
4117         NumericDigit *dividend;
4118         NumericDigit *divisor;
4119         NumericDigit *res_digits;
4120         int                     i;
4121         int                     j;
4122
4123         /* copy these values into local vars for speed in inner loop */
4124         int                     var1ndigits = var1->ndigits;
4125         int                     var2ndigits = var2->ndigits;
4126
4127         /*
4128          * First of all division by zero check; we must not be handed an
4129          * unnormalized divisor.
4130          */
4131         if (var2ndigits == 0 || var2->digits[0] == 0)
4132                 ereport(ERROR,
4133                                 (errcode(ERRCODE_DIVISION_BY_ZERO),
4134                                  errmsg("division by zero")));
4135
4136         /*
4137          * Now result zero check
4138          */
4139         if (var1ndigits == 0)
4140         {
4141                 zero_var(result);
4142                 result->dscale = rscale;
4143                 return;
4144         }
4145
4146         /*
4147          * Determine the result sign, weight and number of digits to calculate.
4148          * The weight figured here is correct if the emitted quotient has no
4149          * leading zero digits; otherwise strip_var() will fix things up.
4150          */
4151         if (var1->sign == var2->sign)
4152                 res_sign = NUMERIC_POS;
4153         else
4154                 res_sign = NUMERIC_NEG;
4155         res_weight = var1->weight - var2->weight;
4156         /* The number of accurate result digits we need to produce: */
4157         res_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
4158         /* ... but always at least 1 */
4159         res_ndigits = Max(res_ndigits, 1);
4160         /* If rounding needed, figure one more digit to ensure correct result */
4161         if (round)
4162                 res_ndigits++;
4163         /*
4164          * The working dividend normally requires res_ndigits + var2ndigits
4165          * digits, but make it at least var1ndigits so we can load all of var1
4166          * into it.  (There will be an additional digit dividend[0] in the
4167          * dividend space, but for consistency with Knuth's notation we don't
4168          * count that in div_ndigits.)
4169          */
4170         div_ndigits = res_ndigits + var2ndigits;
4171         div_ndigits = Max(div_ndigits, var1ndigits);
4172
4173         /*
4174          * We need a workspace with room for the working dividend (div_ndigits+1
4175          * digits) plus room for the possibly-normalized divisor (var2ndigits
4176          * digits).  It is convenient also to have a zero at divisor[0] with
4177          * the actual divisor data in divisor[1 .. var2ndigits].  Transferring the
4178          * digits into the workspace also allows us to realloc the result (which
4179          * might be the same as either input var) before we begin the main loop.
4180          * Note that we use palloc0 to ensure that divisor[0], dividend[0], and
4181          * any additional dividend positions beyond var1ndigits, start out 0.
4182          */
4183         dividend = (NumericDigit *)
4184                 palloc0((div_ndigits + var2ndigits + 2) * sizeof(NumericDigit));
4185         divisor = dividend + (div_ndigits + 1);
4186         memcpy(dividend + 1, var1->digits, var1ndigits * sizeof(NumericDigit));
4187         memcpy(divisor + 1, var2->digits, var2ndigits * sizeof(NumericDigit));
4188
4189         /*
4190          * Now we can realloc the result to hold the generated quotient digits.
4191          */
4192         alloc_var(result, res_ndigits);
4193         res_digits = result->digits;
4194
4195         if (var2ndigits == 1)
4196         {
4197                 /*
4198                  * If there's only a single divisor digit, we can use a fast path
4199                  * (cf. Knuth section 4.3.1 exercise 16).
4200                  */
4201                 divisor1 = divisor[1];
4202                 carry = 0;
4203                 for (i = 0; i < res_ndigits; i++)
4204                 {
4205                         carry = carry * NBASE + dividend[i + 1];
4206                         res_digits[i] = carry / divisor1;
4207                         carry = carry % divisor1;
4208                 }
4209         }
4210         else
4211         {
4212                 /*
4213                  * The full multiple-place algorithm is taken from Knuth volume 2,
4214                  * Algorithm 4.3.1D.
4215                  *
4216                  * We need the first divisor digit to be >= NBASE/2.  If it isn't,
4217                  * make it so by scaling up both the divisor and dividend by the
4218                  * factor "d".  (The reason for allocating dividend[0] above is to
4219                  * leave room for possible carry here.)
4220                  */
4221                 if (divisor[1] < HALF_NBASE)
4222                 {
4223                         int             d = NBASE / (divisor[1] + 1);
4224
4225                         carry = 0;
4226                         for (i = var2ndigits; i > 0; i--)
4227                         {
4228                                 carry += divisor[i] * d;
4229                                 divisor[i] = carry % NBASE;
4230                                 carry = carry / NBASE;
4231                         }
4232                         Assert(carry == 0);
4233                         carry = 0;
4234                         /* at this point only var1ndigits of dividend can be nonzero */
4235                         for (i = var1ndigits; i >= 0; i--)
4236                         {
4237                                 carry += dividend[i] * d;
4238                                 dividend[i] = carry % NBASE;
4239                                 carry = carry / NBASE;
4240                         }
4241                         Assert(carry == 0);
4242                         Assert(divisor[1] >= HALF_NBASE);
4243                 }
4244                 /* First 2 divisor digits are used repeatedly in main loop */
4245                 divisor1 = divisor[1];
4246                 divisor2 = divisor[2];
4247
4248                 /*
4249                  * Begin the main loop.  Each iteration of this loop produces the
4250                  * j'th quotient digit by dividing dividend[j .. j + var2ndigits]
4251                  * by the divisor; this is essentially the same as the common manual
4252                  * procedure for long division.
4253                  */
4254                 for (j = 0; j < res_ndigits; j++)
4255                 {
4256                         /* Estimate quotient digit from the first two dividend digits */
4257                         int             next2digits = dividend[j] * NBASE + dividend[j+1];
4258                         int             qhat;
4259
4260                         /*
4261                          * If next2digits are 0, then quotient digit must be 0 and there's
4262                          * no need to adjust the working dividend.  It's worth testing
4263                          * here to fall out ASAP when processing trailing zeroes in
4264                          * a dividend.
4265                          */
4266                         if (next2digits == 0)
4267                         {
4268                                 res_digits[j] = 0;
4269                                 continue;
4270                         }
4271
4272                         if (dividend[j] == divisor1)
4273                                 qhat = NBASE - 1;
4274                         else
4275                                 qhat = next2digits / divisor1;
4276                         /*
4277                          * Adjust quotient digit if it's too large.  Knuth proves that
4278                          * after this step, the quotient digit will be either correct
4279                          * or just one too large.  (Note: it's OK to use dividend[j+2]
4280                          * here because we know the divisor length is at least 2.)
4281                          */
4282                         while (divisor2 * qhat >
4283                                    (next2digits - qhat * divisor1) * NBASE + dividend[j+2])
4284                                 qhat--;
4285
4286                         /* As above, need do nothing more when quotient digit is 0 */
4287                         if (qhat > 0)
4288                         {
4289                                 /*
4290                                  * Multiply the divisor by qhat, and subtract that from the
4291                                  * working dividend.  "carry" tracks the multiplication,
4292                                  * "borrow" the subtraction (could we fold these together?)
4293                                  */
4294                                 carry = 0;
4295                                 borrow = 0;
4296                                 for (i = var2ndigits; i >= 0; i--)
4297                                 {
4298                                         carry += divisor[i] * qhat;
4299                                         borrow -= carry % NBASE;
4300                                         carry = carry / NBASE;
4301                                         borrow += dividend[j + i];
4302                                         if (borrow < 0)
4303                                         {
4304                                                 dividend[j + i] = borrow + NBASE;
4305                                                 borrow = -1;
4306                                         }
4307                                         else
4308                                         {
4309                                                 dividend[j + i] = borrow;
4310                                                 borrow = 0;
4311                                         }
4312                                 }
4313                                 Assert(carry == 0);
4314
4315                                 /*
4316                                  * If we got a borrow out of the top dividend digit, then
4317                                  * indeed qhat was one too large.  Fix it, and add back the
4318                                  * divisor to correct the working dividend.  (Knuth proves
4319                                  * that this will occur only about 3/NBASE of the time; hence,
4320                                  * it's a good idea to test this code with small NBASE to be
4321                                  * sure this section gets exercised.)
4322                                  */
4323                                 if (borrow)
4324                                 {
4325                                         qhat--;
4326                                         carry = 0;
4327                                         for (i = var2ndigits; i >= 0; i--)
4328                                         {
4329                                                 carry += dividend[j + i] + divisor[i];
4330                                                 if (carry >= NBASE)
4331                                                 {
4332                                                         dividend[j + i] = carry - NBASE;
4333                                                         carry = 1;
4334                                                 }
4335                                                 else
4336                                                 {
4337                                                         dividend[j + i] = carry;
4338                                                         carry = 0;
4339                                                 }
4340                                         }
4341                                         /* A carry should occur here to cancel the borrow above */
4342                                         Assert(carry == 1);
4343                                 }
4344                         }
4345
4346                         /* And we're done with this quotient digit */
4347                         res_digits[j] = qhat;
4348                 }
4349         }
4350
4351         pfree(dividend);
4352
4353         /*
4354          * Finally, round or truncate the result to the requested precision.
4355          */
4356         result->weight = res_weight;
4357         result->sign = res_sign;
4358
4359         /* Round or truncate to target rscale (and set result->dscale) */
4360         if (round)
4361                 round_var(result, rscale);
4362         else
4363                 trunc_var(result, rscale);
4364
4365         /* Strip leading and trailing zeroes */
4366         strip_var(result);
4367 }
4368
4369
4370 /*
4371  * div_var_fast() -
4372  *
4373  *      This has the same API as div_var, but is implemented using the division
4374  *      algorithm from the "FM" library, rather than Knuth's schoolbook-division
4375  *      approach.  This is significantly faster but can produce inaccurate
4376  *      results, because it sometimes has to propagate rounding to the left,
4377  *      and so we can never be entirely sure that we know the requested digits
4378  *      exactly.  We compute DIV_GUARD_DIGITS extra digits, but there is
4379  *      no certainty that that's enough.  We use this only in the transcendental
4380  *      function calculation routines, where everything is approximate anyway.
4381  */
4382 static void
4383 div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result,
4384                          int rscale, bool round)
4385 {
4386         int                     div_ndigits;
4387         int                     res_sign;
4388         int                     res_weight;
4389         int                *div;
4390         int                     qdigit;
4391         int                     carry;
4392         int                     maxdiv;
4393         int                     newdig;
4394         NumericDigit *res_digits;
4395         double          fdividend,
4396                                 fdivisor,
4397                                 fdivisorinverse,
4398                                 fquotient;
4399         int                     qi;
4400         int                     i;
4401
4402         /* copy these values into local vars for speed in inner loop */
4403         int                     var1ndigits = var1->ndigits;
4404         int                     var2ndigits = var2->ndigits;
4405         NumericDigit *var1digits = var1->digits;
4406         NumericDigit *var2digits = var2->digits;
4407
4408         /*
4409          * First of all division by zero check; we must not be handed an
4410          * unnormalized divisor.
4411          */
4412         if (var2ndigits == 0 || var2digits[0] == 0)
4413                 ereport(ERROR,
4414                                 (errcode(ERRCODE_DIVISION_BY_ZERO),
4415                                  errmsg("division by zero")));
4416
4417         /*
4418          * Now result zero check
4419          */
4420         if (var1ndigits == 0)
4421         {
4422                 zero_var(result);
4423                 result->dscale = rscale;
4424                 return;
4425         }
4426
4427         /*
4428          * Determine the result sign, weight and number of digits to calculate
4429          */
4430         if (var1->sign == var2->sign)
4431                 res_sign = NUMERIC_POS;
4432         else
4433                 res_sign = NUMERIC_NEG;
4434         res_weight = var1->weight - var2->weight + 1;
4435         /* The number of accurate result digits we need to produce: */
4436         div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
4437         /* Add guard digits for roundoff error */
4438         div_ndigits += DIV_GUARD_DIGITS;
4439         if (div_ndigits < DIV_GUARD_DIGITS)
4440                 div_ndigits = DIV_GUARD_DIGITS;
4441         /* Must be at least var1ndigits, too, to simplify data-loading loop */
4442         if (div_ndigits < var1ndigits)
4443                 div_ndigits = var1ndigits;
4444
4445         /*
4446          * We do the arithmetic in an array "div[]" of signed int's.  Since
4447          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
4448          * to avoid normalizing carries immediately.
4449          *
4450          * We start with div[] containing one zero digit followed by the
4451          * dividend's digits (plus appended zeroes to reach the desired precision
4452          * including guard digits).  Each step of the main loop computes an
4453          * (approximate) quotient digit and stores it into div[], removing one
4454          * position of dividend space.  A final pass of carry propagation takes
4455          * care of any mistaken quotient digits.
4456          */
4457         div = (int *) palloc0((div_ndigits + 1) * sizeof(int));
4458         for (i = 0; i < var1ndigits; i++)
4459                 div[i + 1] = var1digits[i];
4460
4461         /*
4462          * We estimate each quotient digit using floating-point arithmetic, taking
4463          * the first four digits of the (current) dividend and divisor. This must
4464          * be float to avoid overflow.
4465          */
4466         fdivisor = (double) var2digits[0];
4467         for (i = 1; i < 4; i++)
4468         {
4469                 fdivisor *= NBASE;
4470                 if (i < var2ndigits)
4471                         fdivisor += (double) var2digits[i];
4472         }
4473         fdivisorinverse = 1.0 / fdivisor;
4474
4475         /*
4476          * maxdiv tracks the maximum possible absolute value of any div[] entry;
4477          * when this threatens to exceed INT_MAX, we take the time to propagate
4478          * carries.  To avoid overflow in maxdiv itself, it actually represents
4479          * the max possible abs. value divided by NBASE-1.
4480          */
4481         maxdiv = 1;
4482
4483         /*
4484          * Outer loop computes next quotient digit, which will go into div[qi]
4485          */
4486         for (qi = 0; qi < div_ndigits; qi++)
4487         {
4488                 /* Approximate the current dividend value */
4489                 fdividend = (double) div[qi];
4490                 for (i = 1; i < 4; i++)
4491                 {
4492                         fdividend *= NBASE;
4493                         if (qi + i <= div_ndigits)
4494                                 fdividend += (double) div[qi + i];
4495                 }
4496                 /* Compute the (approximate) quotient digit */
4497                 fquotient = fdividend * fdivisorinverse;
4498                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4499                         (((int) fquotient) - 1);        /* truncate towards -infinity */
4500
4501                 if (qdigit != 0)
4502                 {
4503                         /* Do we need to normalize now? */
4504                         maxdiv += Abs(qdigit);
4505                         if (maxdiv > INT_MAX / (NBASE - 1))
4506                         {
4507                                 /* Yes, do it */
4508                                 carry = 0;
4509                                 for (i = div_ndigits; i > qi; i--)
4510                                 {
4511                                         newdig = div[i] + carry;
4512                                         if (newdig < 0)
4513                                         {
4514                                                 carry = -((-newdig - 1) / NBASE) - 1;
4515                                                 newdig -= carry * NBASE;
4516                                         }
4517                                         else if (newdig >= NBASE)
4518                                         {
4519                                                 carry = newdig / NBASE;
4520                                                 newdig -= carry * NBASE;
4521                                         }
4522                                         else
4523                                                 carry = 0;
4524                                         div[i] = newdig;
4525                                 }
4526                                 newdig = div[qi] + carry;
4527                                 div[qi] = newdig;
4528
4529                                 /*
4530                                  * All the div[] digits except possibly div[qi] are now in the
4531                                  * range 0..NBASE-1.
4532                                  */
4533                                 maxdiv = Abs(newdig) / (NBASE - 1);
4534                                 maxdiv = Max(maxdiv, 1);
4535
4536                                 /*
4537                                  * Recompute the quotient digit since new info may have
4538                                  * propagated into the top four dividend digits
4539                                  */
4540                                 fdividend = (double) div[qi];
4541                                 for (i = 1; i < 4; i++)
4542                                 {
4543                                         fdividend *= NBASE;
4544                                         if (qi + i <= div_ndigits)
4545                                                 fdividend += (double) div[qi + i];
4546                                 }
4547                                 /* Compute the (approximate) quotient digit */
4548                                 fquotient = fdividend * fdivisorinverse;
4549                                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4550                                         (((int) fquotient) - 1);        /* truncate towards -infinity */
4551                                 maxdiv += Abs(qdigit);
4552                         }
4553
4554                         /* Subtract off the appropriate multiple of the divisor */
4555                         if (qdigit != 0)
4556                         {
4557                                 int                     istop = Min(var2ndigits, div_ndigits - qi + 1);
4558
4559                                 for (i = 0; i < istop; i++)
4560                                         div[qi + i] -= qdigit * var2digits[i];
4561                         }
4562                 }
4563
4564                 /*
4565                  * The dividend digit we are about to replace might still be nonzero.
4566                  * Fold it into the next digit position.  We don't need to worry about
4567                  * overflow here since this should nearly cancel with the subtraction
4568                  * of the divisor.
4569                  */
4570                 div[qi + 1] += div[qi] * NBASE;
4571
4572                 div[qi] = qdigit;
4573         }
4574
4575         /*
4576          * Approximate and store the last quotient digit (div[div_ndigits])
4577          */
4578         fdividend = (double) div[qi];
4579         for (i = 1; i < 4; i++)
4580                 fdividend *= NBASE;
4581         fquotient = fdividend * fdivisorinverse;
4582         qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4583                 (((int) fquotient) - 1);        /* truncate towards -infinity */
4584         div[qi] = qdigit;
4585
4586         /*
4587          * Now we do a final carry propagation pass to normalize the result, which
4588          * we combine with storing the result digits into the output. Note that
4589          * this is still done at full precision w/guard digits.
4590          */
4591         alloc_var(result, div_ndigits + 1);
4592         res_digits = result->digits;
4593         carry = 0;
4594         for (i = div_ndigits; i >= 0; i--)
4595         {
4596                 newdig = div[i] + carry;
4597                 if (newdig < 0)
4598                 {
4599                         carry = -((-newdig - 1) / NBASE) - 1;
4600                         newdig -= carry * NBASE;
4601                 }
4602                 else if (newdig >= NBASE)
4603                 {
4604                         carry = newdig / NBASE;
4605                         newdig -= carry * NBASE;
4606                 }
4607                 else
4608                         carry = 0;
4609                 res_digits[i] = newdig;
4610         }
4611         Assert(carry == 0);
4612
4613         pfree(div);
4614
4615         /*
4616          * Finally, round the result to the requested precision.
4617          */
4618         result->weight = res_weight;
4619         result->sign = res_sign;
4620
4621         /* Round to target rscale (and set result->dscale) */
4622         if (round)
4623                 round_var(result, rscale);
4624         else
4625                 trunc_var(result, rscale);
4626
4627         /* Strip leading and trailing zeroes */
4628         strip_var(result);
4629 }
4630
4631
4632 /*
4633  * Default scale selection for division
4634  *
4635  * Returns the appropriate result scale for the division result.
4636  */
4637 static int
4638 select_div_scale(NumericVar *var1, NumericVar *var2)
4639 {
4640         int                     weight1,
4641                                 weight2,
4642                                 qweight,
4643                                 i;
4644         NumericDigit firstdigit1,
4645                                 firstdigit2;
4646         int                     rscale;
4647
4648         /*
4649          * The result scale of a division isn't specified in any SQL standard. For
4650          * PostgreSQL we select a result scale that will give at least
4651          * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a
4652          * result no less accurate than float8; but use a scale not less than
4653          * either input's display scale.
4654          */
4655
4656         /* Get the actual (normalized) weight and first digit of each input */
4657
4658         weight1 = 0;                            /* values to use if var1 is zero */
4659         firstdigit1 = 0;
4660         for (i = 0; i < var1->ndigits; i++)
4661         {
4662                 firstdigit1 = var1->digits[i];
4663                 if (firstdigit1 != 0)
4664                 {
4665                         weight1 = var1->weight - i;
4666                         break;
4667                 }
4668         }
4669
4670         weight2 = 0;                            /* values to use if var2 is zero */
4671         firstdigit2 = 0;
4672         for (i = 0; i < var2->ndigits; i++)
4673         {
4674                 firstdigit2 = var2->digits[i];
4675                 if (firstdigit2 != 0)
4676                 {
4677                         weight2 = var2->weight - i;
4678                         break;
4679                 }
4680         }
4681
4682         /*
4683          * Estimate weight of quotient.  If the two first digits are equal, we
4684          * can't be sure, but assume that var1 is less than var2.
4685          */
4686         qweight = weight1 - weight2;
4687         if (firstdigit1 <= firstdigit2)
4688                 qweight--;
4689
4690         /* Select result scale */
4691         rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS;
4692         rscale = Max(rscale, var1->dscale);
4693         rscale = Max(rscale, var2->dscale);
4694         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
4695         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
4696
4697         return rscale;
4698 }
4699
4700
4701 /*
4702  * mod_var() -
4703  *
4704  *      Calculate the modulo of two numerics at variable level
4705  */
4706 static void
4707 mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
4708 {
4709         NumericVar      tmp;
4710
4711         init_var(&tmp);
4712
4713         /* ---------
4714          * We do this using the equation
4715          *              mod(x,y) = x - trunc(x/y)*y
4716          * div_var can be persuaded to give us trunc(x/y) directly.
4717          * ----------
4718          */
4719         div_var(var1, var2, &tmp, 0, false);
4720
4721         mul_var(var2, &tmp, &tmp, var2->dscale);
4722
4723         sub_var(var1, &tmp, result);
4724
4725         free_var(&tmp);
4726 }
4727
4728
4729 /*
4730  * ceil_var() -
4731  *
4732  *      Return the smallest integer greater than or equal to the argument
4733  *      on variable level
4734  */
4735 static void
4736 ceil_var(NumericVar *var, NumericVar *result)
4737 {
4738         NumericVar      tmp;
4739
4740         init_var(&tmp);
4741         set_var_from_var(var, &tmp);
4742
4743         trunc_var(&tmp, 0);
4744
4745         if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
4746                 add_var(&tmp, &const_one, &tmp);
4747
4748         set_var_from_var(&tmp, result);
4749         free_var(&tmp);
4750 }
4751
4752
4753 /*
4754  * floor_var() -
4755  *
4756  *      Return the largest integer equal to or less than the argument
4757  *      on variable level
4758  */
4759 static void
4760 floor_var(NumericVar *var, NumericVar *result)
4761 {
4762         NumericVar      tmp;
4763
4764         init_var(&tmp);
4765         set_var_from_var(var, &tmp);
4766
4767         trunc_var(&tmp, 0);
4768
4769         if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
4770                 sub_var(&tmp, &const_one, &tmp);
4771
4772         set_var_from_var(&tmp, result);
4773         free_var(&tmp);
4774 }
4775
4776
4777 /*
4778  * sqrt_var() -
4779  *
4780  *      Compute the square root of x using Newton's algorithm
4781  */
4782 static void
4783 sqrt_var(NumericVar *arg, NumericVar *result, int rscale)
4784 {
4785         NumericVar      tmp_arg;
4786         NumericVar      tmp_val;
4787         NumericVar      last_val;
4788         int                     local_rscale;
4789         int                     stat;
4790
4791         local_rscale = rscale + 8;
4792
4793         stat = cmp_var(arg, &const_zero);
4794         if (stat == 0)
4795         {
4796                 zero_var(result);
4797                 result->dscale = rscale;
4798                 return;
4799         }
4800
4801         /*
4802          * SQL2003 defines sqrt() in terms of power, so we need to emit the right
4803          * SQLSTATE error code if the operand is negative.
4804          */
4805         if (stat < 0)
4806                 ereport(ERROR,
4807                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
4808                                  errmsg("cannot take square root of a negative number")));
4809
4810         init_var(&tmp_arg);
4811         init_var(&tmp_val);
4812         init_var(&last_val);
4813
4814         /* Copy arg in case it is the same var as result */
4815         set_var_from_var(arg, &tmp_arg);
4816
4817         /*
4818          * Initialize the result to the first guess
4819          */
4820         alloc_var(result, 1);
4821         result->digits[0] = tmp_arg.digits[0] / 2;
4822         if (result->digits[0] == 0)
4823                 result->digits[0] = 1;
4824         result->weight = tmp_arg.weight / 2;
4825         result->sign = NUMERIC_POS;
4826
4827         set_var_from_var(result, &last_val);
4828
4829         for (;;)
4830         {
4831                 div_var_fast(&tmp_arg, result, &tmp_val, local_rscale, true);
4832
4833                 add_var(result, &tmp_val, result);
4834                 mul_var(result, &const_zero_point_five, result, local_rscale);
4835
4836                 if (cmp_var(&last_val, result) == 0)
4837                         break;
4838                 set_var_from_var(result, &last_val);
4839         }
4840
4841         free_var(&last_val);
4842         free_var(&tmp_val);
4843         free_var(&tmp_arg);
4844
4845         /* Round to requested precision */
4846         round_var(result, rscale);
4847 }
4848
4849
4850 /*
4851  * exp_var() -
4852  *
4853  *      Raise e to the power of x
4854  */
4855 static void
4856 exp_var(NumericVar *arg, NumericVar *result, int rscale)
4857 {
4858         NumericVar      x;
4859         int                     xintval;
4860         bool            xneg = FALSE;
4861         int                     local_rscale;
4862
4863         /*----------
4864          * We separate the integral and fraction parts of x, then compute
4865          *              e^x = e^xint * e^xfrac
4866          * where e = exp(1) and e^xfrac = exp(xfrac) are computed by
4867          * exp_var_internal; the limited range of inputs allows that routine
4868          * to do a good job with a simple Taylor series.  Raising e^xint is
4869          * done by repeated multiplications in power_var_int.
4870          *----------
4871          */
4872         init_var(&x);
4873
4874         set_var_from_var(arg, &x);
4875
4876         if (x.sign == NUMERIC_NEG)
4877         {
4878                 xneg = TRUE;
4879                 x.sign = NUMERIC_POS;
4880         }
4881
4882         /* Extract the integer part, remove it from x */
4883         xintval = 0;
4884         while (x.weight >= 0)
4885         {
4886                 xintval *= NBASE;
4887                 if (x.ndigits > 0)
4888                 {
4889                         xintval += x.digits[0];
4890                         x.digits++;
4891                         x.ndigits--;
4892                 }
4893                 x.weight--;
4894                 /* Guard against overflow */
4895                 if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3)
4896                         ereport(ERROR,
4897                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
4898                                          errmsg("argument for function \"exp\" too big")));
4899         }
4900
4901         /* Select an appropriate scale for internal calculation */
4902         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
4903
4904         /* Compute e^xfrac */
4905         exp_var_internal(&x, result, local_rscale);
4906
4907         /* If there's an integer part, multiply by e^xint */
4908         if (xintval > 0)
4909         {
4910                 NumericVar      e;
4911
4912                 init_var(&e);
4913                 exp_var_internal(&const_one, &e, local_rscale);
4914                 power_var_int(&e, xintval, &e, local_rscale);
4915                 mul_var(&e, result, result, local_rscale);
4916                 free_var(&e);
4917         }
4918
4919         /* Compensate for input sign, and round to requested rscale */
4920         if (xneg)
4921                 div_var_fast(&const_one, result, result, rscale, true);
4922         else
4923                 round_var(result, rscale);
4924
4925         free_var(&x);
4926 }
4927
4928
4929 /*
4930  * exp_var_internal() -
4931  *
4932  *      Raise e to the power of x, where 0 <= x <= 1
4933  *
4934  * NB: the result should be good to at least rscale digits, but it has
4935  * *not* been rounded off; the caller must do that if wanted.
4936  */
4937 static void
4938 exp_var_internal(NumericVar *arg, NumericVar *result, int rscale)
4939 {
4940         NumericVar      x;
4941         NumericVar      xpow;
4942         NumericVar      ifac;
4943         NumericVar      elem;
4944         NumericVar      ni;
4945         int                     ndiv2 = 0;
4946         int                     local_rscale;
4947
4948         init_var(&x);
4949         init_var(&xpow);
4950         init_var(&ifac);
4951         init_var(&elem);
4952         init_var(&ni);
4953
4954         set_var_from_var(arg, &x);
4955
4956         Assert(x.sign == NUMERIC_POS);
4957
4958         local_rscale = rscale + 8;
4959
4960         /* Reduce input into range 0 <= x <= 0.01 */
4961         while (cmp_var(&x, &const_zero_point_01) > 0)
4962         {
4963                 ndiv2++;
4964                 local_rscale++;
4965                 mul_var(&x, &const_zero_point_five, &x, x.dscale + 1);
4966         }
4967
4968         /*
4969          * Use the Taylor series
4970          *
4971          * exp(x) = 1 + x + x^2/2! + x^3/3! + ...
4972          *
4973          * Given the limited range of x, this should converge reasonably quickly.
4974          * We run the series until the terms fall below the local_rscale limit.
4975          */
4976         add_var(&const_one, &x, result);
4977         set_var_from_var(&x, &xpow);
4978         set_var_from_var(&const_one, &ifac);
4979         set_var_from_var(&const_one, &ni);
4980
4981         for (;;)
4982         {
4983                 add_var(&ni, &const_one, &ni);
4984                 mul_var(&xpow, &x, &xpow, local_rscale);
4985                 mul_var(&ifac, &ni, &ifac, 0);
4986                 div_var_fast(&xpow, &ifac, &elem, local_rscale, true);
4987
4988                 if (elem.ndigits == 0)
4989                         break;
4990
4991                 add_var(result, &elem, result);
4992         }
4993
4994         /* Compensate for argument range reduction */
4995         while (ndiv2-- > 0)
4996                 mul_var(result, result, result, local_rscale);
4997
4998         free_var(&x);
4999         free_var(&xpow);
5000         free_var(&ifac);
5001         free_var(&elem);
5002         free_var(&ni);
5003 }
5004
5005
5006 /*
5007  * ln_var() -
5008  *
5009  *      Compute the natural log of x
5010  */
5011 static void
5012 ln_var(NumericVar *arg, NumericVar *result, int rscale)
5013 {
5014         NumericVar      x;
5015         NumericVar      xx;
5016         NumericVar      ni;
5017         NumericVar      elem;
5018         NumericVar      fact;
5019         int                     local_rscale;
5020         int                     cmp;
5021
5022         cmp = cmp_var(arg, &const_zero);
5023         if (cmp == 0)
5024                 ereport(ERROR,
5025                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
5026                                  errmsg("cannot take logarithm of zero")));
5027         else if (cmp < 0)
5028                 ereport(ERROR,
5029                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
5030                                  errmsg("cannot take logarithm of a negative number")));
5031
5032         local_rscale = rscale + 8;
5033
5034         init_var(&x);
5035         init_var(&xx);
5036         init_var(&ni);
5037         init_var(&elem);
5038         init_var(&fact);
5039
5040         set_var_from_var(arg, &x);
5041         set_var_from_var(&const_two, &fact);
5042
5043         /* Reduce input into range 0.9 < x < 1.1 */
5044         while (cmp_var(&x, &const_zero_point_nine) <= 0)
5045         {
5046                 local_rscale++;
5047                 sqrt_var(&x, &x, local_rscale);
5048                 mul_var(&fact, &const_two, &fact, 0);
5049         }
5050         while (cmp_var(&x, &const_one_point_one) >= 0)
5051         {
5052                 local_rscale++;
5053                 sqrt_var(&x, &x, local_rscale);
5054                 mul_var(&fact, &const_two, &fact, 0);
5055         }
5056
5057         /*
5058          * We use the Taylor series for 0.5 * ln((1+z)/(1-z)),
5059          *
5060          * z + z^3/3 + z^5/5 + ...
5061          *
5062          * where z = (x-1)/(x+1) is in the range (approximately) -0.053 .. 0.048
5063          * due to the above range-reduction of x.
5064          *
5065          * The convergence of this is not as fast as one would like, but is
5066          * tolerable given that z is small.
5067          */
5068         sub_var(&x, &const_one, result);
5069         add_var(&x, &const_one, &elem);
5070         div_var_fast(result, &elem, result, local_rscale, true);
5071         set_var_from_var(result, &xx);
5072         mul_var(result, result, &x, local_rscale);
5073
5074         set_var_from_var(&const_one, &ni);
5075
5076         for (;;)
5077         {
5078                 add_var(&ni, &const_two, &ni);
5079                 mul_var(&xx, &x, &xx, local_rscale);
5080                 div_var_fast(&xx, &ni, &elem, local_rscale, true);
5081
5082                 if (elem.ndigits == 0)
5083                         break;
5084
5085                 add_var(result, &elem, result);
5086
5087                 if (elem.weight < (result->weight - local_rscale * 2 / DEC_DIGITS))
5088                         break;
5089         }
5090
5091         /* Compensate for argument range reduction, round to requested rscale */
5092         mul_var(result, &fact, result, rscale);
5093
5094         free_var(&x);
5095         free_var(&xx);
5096         free_var(&ni);
5097         free_var(&elem);
5098         free_var(&fact);
5099 }
5100
5101
5102 /*
5103  * log_var() -
5104  *
5105  *      Compute the logarithm of num in a given base.
5106  *
5107  *      Note: this routine chooses dscale of the result.
5108  */
5109 static void
5110 log_var(NumericVar *base, NumericVar *num, NumericVar *result)
5111 {
5112         NumericVar      ln_base;
5113         NumericVar      ln_num;
5114         int                     dec_digits;
5115         int                     rscale;
5116         int                     local_rscale;
5117
5118         init_var(&ln_base);
5119         init_var(&ln_num);
5120
5121         /* Set scale for ln() calculations --- compare numeric_ln() */
5122
5123         /* Approx decimal digits before decimal point */
5124         dec_digits = (num->weight + 1) * DEC_DIGITS;
5125
5126         if (dec_digits > 1)
5127                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
5128         else if (dec_digits < 1)
5129                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
5130         else
5131                 rscale = NUMERIC_MIN_SIG_DIGITS;
5132
5133         rscale = Max(rscale, base->dscale);
5134         rscale = Max(rscale, num->dscale);
5135         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
5136         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
5137
5138         local_rscale = rscale + 8;
5139
5140         /* Form natural logarithms */
5141         ln_var(base, &ln_base, local_rscale);
5142         ln_var(num, &ln_num, local_rscale);
5143
5144         ln_base.dscale = rscale;
5145         ln_num.dscale = rscale;
5146
5147         /* Select scale for division result */
5148         rscale = select_div_scale(&ln_num, &ln_base);
5149
5150         div_var_fast(&ln_num, &ln_base, result, rscale, true);
5151
5152         free_var(&ln_num);
5153         free_var(&ln_base);
5154 }
5155
5156
5157 /*
5158  * power_var() -
5159  *
5160  *      Raise base to the power of exp
5161  *
5162  *      Note: this routine chooses dscale of the result.
5163  */
5164 static void
5165 power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
5166 {
5167         NumericVar      ln_base;
5168         NumericVar      ln_num;
5169         int                     dec_digits;
5170         int                     rscale;
5171         int                     local_rscale;
5172         double          val;
5173
5174         /* If exp can be represented as an integer, use power_var_int */
5175         if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1)
5176         {
5177                 /* exact integer, but does it fit in int? */
5178                 NumericVar      x;
5179                 int64           expval64;
5180
5181                 /* must copy because numericvar_to_int8() scribbles on input */
5182                 init_var(&x);
5183                 set_var_from_var(exp, &x);
5184                 if (numericvar_to_int8(&x, &expval64))
5185                 {
5186                         int                     expval = (int) expval64;
5187
5188                         /* Test for overflow by reverse-conversion. */
5189                         if ((int64) expval == expval64)
5190                         {
5191                                 /* Okay, select rscale */
5192                                 rscale = NUMERIC_MIN_SIG_DIGITS;
5193                                 rscale = Max(rscale, base->dscale);
5194                                 rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
5195                                 rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
5196
5197                                 power_var_int(base, expval, result, rscale);
5198
5199                                 free_var(&x);
5200                                 return;
5201                         }
5202                 }
5203                 free_var(&x);
5204         }
5205
5206         /*
5207          *      This avoids log(0) for cases of 0 raised to a non-integer.
5208          *      0 ^ 0 handled by power_var_int().
5209          */
5210         if (cmp_var(base, &const_zero) == 0)
5211         {
5212                 set_var_from_var(&const_zero, result);
5213                 result->dscale = NUMERIC_MIN_SIG_DIGITS;        /* no need to round */
5214                 return;
5215         }
5216         
5217         init_var(&ln_base);
5218         init_var(&ln_num);
5219
5220         /* Set scale for ln() calculation --- need extra accuracy here */
5221
5222         /* Approx decimal digits before decimal point */
5223         dec_digits = (base->weight + 1) * DEC_DIGITS;
5224
5225         if (dec_digits > 1)
5226                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(dec_digits - 1);
5227         else if (dec_digits < 1)
5228                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(1 - dec_digits);
5229         else
5230                 rscale = NUMERIC_MIN_SIG_DIGITS * 2;
5231
5232         rscale = Max(rscale, base->dscale * 2);
5233         rscale = Max(rscale, exp->dscale * 2);
5234         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2);
5235         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2);
5236
5237         local_rscale = rscale + 8;
5238
5239         ln_var(base, &ln_base, local_rscale);
5240
5241         mul_var(&ln_base, exp, &ln_num, local_rscale);
5242
5243         /* Set scale for exp() -- compare numeric_exp() */
5244
5245         /* convert input to float8, ignoring overflow */
5246         val = numericvar_to_double_no_overflow(&ln_num);
5247
5248         /*
5249          * log10(result) = num * log10(e), so this is approximately the weight:
5250          */
5251         val *= 0.434294481903252;
5252
5253         /* limit to something that won't cause integer overflow */
5254         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
5255         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
5256
5257         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
5258         rscale = Max(rscale, base->dscale);
5259         rscale = Max(rscale, exp->dscale);
5260         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
5261         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
5262
5263         exp_var(&ln_num, result, rscale);
5264
5265         free_var(&ln_num);
5266         free_var(&ln_base);
5267 }
5268
5269 /*
5270  * power_var_int() -
5271  *
5272  *      Raise base to the power of exp, where exp is an integer.
5273  */
5274 static void
5275 power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale)
5276 {
5277         bool            neg;
5278         NumericVar      base_prod;
5279         int                     local_rscale;
5280
5281         switch (exp)
5282         {
5283                 case 0:
5284                         /*
5285                          *      While 0 ^ 0 can be either 1 or indeterminate (error), we
5286                          *      treat it as 1 because most programming languages do this.
5287                          *      SQL:2003 also requires a return value of 1.
5288                          *      http://en.wikipedia.org/wiki/Exponentiation#Zero_to_the_zero_power
5289                          */
5290                         set_var_from_var(&const_one, result);
5291                         result->dscale = rscale;        /* no need to round */
5292                         return;
5293                 case 1:
5294                         set_var_from_var(base, result);
5295                         round_var(result, rscale);
5296                         return;
5297                 case -1:
5298                         div_var(&const_one, base, result, rscale, true);
5299                         return;
5300                 case 2:
5301                         mul_var(base, base, result, rscale);
5302                         return;
5303                 default:
5304                         break;
5305         }
5306
5307         /*
5308          * The general case repeatedly multiplies base according to the bit
5309          * pattern of exp.      We do the multiplications with some extra precision.
5310          */
5311         neg = (exp < 0);
5312         exp = Abs(exp);
5313
5314         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
5315
5316         init_var(&base_prod);
5317         set_var_from_var(base, &base_prod);
5318
5319         if (exp & 1)
5320                 set_var_from_var(base, result);
5321         else
5322                 set_var_from_var(&const_one, result);
5323
5324         while ((exp >>= 1) > 0)
5325         {
5326                 mul_var(&base_prod, &base_prod, &base_prod, local_rscale);
5327                 if (exp & 1)
5328                         mul_var(&base_prod, result, result, local_rscale);
5329         }
5330
5331         free_var(&base_prod);
5332
5333         /* Compensate for input sign, and round to requested rscale */
5334         if (neg)
5335                 div_var_fast(&const_one, result, result, rscale, true);
5336         else
5337                 round_var(result, rscale);
5338 }
5339
5340
5341 /* ----------------------------------------------------------------------
5342  *
5343  * Following are the lowest level functions that operate unsigned
5344  * on the variable level
5345  *
5346  * ----------------------------------------------------------------------
5347  */
5348
5349
5350 /* ----------
5351  * cmp_abs() -
5352  *
5353  *      Compare the absolute values of var1 and var2
5354  *      Returns:        -1 for ABS(var1) < ABS(var2)
5355  *                              0  for ABS(var1) == ABS(var2)
5356  *                              1  for ABS(var1) > ABS(var2)
5357  * ----------
5358  */
5359 static int
5360 cmp_abs(NumericVar *var1, NumericVar *var2)
5361 {
5362         return cmp_abs_common(var1->digits, var1->ndigits, var1->weight,
5363                                                   var2->digits, var2->ndigits, var2->weight);
5364 }
5365
5366 /* ----------
5367  * cmp_abs_common() -
5368  *
5369  *      Main routine of cmp_abs(). This function can be used by both
5370  *      NumericVar and Numeric.
5371  * ----------
5372  */
5373 static int
5374 cmp_abs_common(const NumericDigit *var1digits, int var1ndigits, int var1weight,
5375                          const NumericDigit *var2digits, int var2ndigits, int var2weight)
5376 {
5377         int                     i1 = 0;
5378         int                     i2 = 0;
5379
5380         /* Check any digits before the first common digit */
5381
5382         while (var1weight > var2weight && i1 < var1ndigits)
5383         {
5384                 if (var1digits[i1++] != 0)
5385                         return 1;
5386                 var1weight--;
5387         }
5388         while (var2weight > var1weight && i2 < var2ndigits)
5389         {
5390                 if (var2digits[i2++] != 0)
5391                         return -1;
5392                 var2weight--;
5393         }
5394
5395         /* At this point, either w1 == w2 or we've run out of digits */
5396
5397         if (var1weight == var2weight)
5398         {
5399                 while (i1 < var1ndigits && i2 < var2ndigits)
5400                 {
5401                         int                     stat = var1digits[i1++] - var2digits[i2++];
5402
5403                         if (stat)
5404                         {
5405                                 if (stat > 0)
5406                                         return 1;
5407                                 return -1;
5408                         }
5409                 }
5410         }
5411
5412         /*
5413          * At this point, we've run out of digits on one side or the other; so any
5414          * remaining nonzero digits imply that side is larger
5415          */
5416         while (i1 < var1ndigits)
5417         {
5418                 if (var1digits[i1++] != 0)
5419                         return 1;
5420         }
5421         while (i2 < var2ndigits)
5422         {
5423                 if (var2digits[i2++] != 0)
5424                         return -1;
5425         }
5426
5427         return 0;
5428 }
5429
5430
5431 /*
5432  * add_abs() -
5433  *
5434  *      Add the absolute values of two variables into result.
5435  *      result might point to one of the operands without danger.
5436  */
5437 static void
5438 add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5439 {
5440         NumericDigit *res_buf;
5441         NumericDigit *res_digits;
5442         int                     res_ndigits;
5443         int                     res_weight;
5444         int                     res_rscale,
5445                                 rscale1,
5446                                 rscale2;
5447         int                     res_dscale;
5448         int                     i,
5449                                 i1,
5450                                 i2;
5451         int                     carry = 0;
5452
5453         /* copy these values into local vars for speed in inner loop */
5454         int                     var1ndigits = var1->ndigits;
5455         int                     var2ndigits = var2->ndigits;
5456         NumericDigit *var1digits = var1->digits;
5457         NumericDigit *var2digits = var2->digits;
5458
5459         res_weight = Max(var1->weight, var2->weight) + 1;
5460
5461         res_dscale = Max(var1->dscale, var2->dscale);
5462
5463         /* Note: here we are figuring rscale in base-NBASE digits */
5464         rscale1 = var1->ndigits - var1->weight - 1;
5465         rscale2 = var2->ndigits - var2->weight - 1;
5466         res_rscale = Max(rscale1, rscale2);
5467
5468         res_ndigits = res_rscale + res_weight + 1;
5469         if (res_ndigits <= 0)
5470                 res_ndigits = 1;
5471
5472         res_buf = digitbuf_alloc(res_ndigits + 1);
5473         res_buf[0] = 0;                         /* spare digit for later rounding */
5474         res_digits = res_buf + 1;
5475
5476         i1 = res_rscale + var1->weight + 1;
5477         i2 = res_rscale + var2->weight + 1;
5478         for (i = res_ndigits - 1; i >= 0; i--)
5479         {
5480                 i1--;
5481                 i2--;
5482                 if (i1 >= 0 && i1 < var1ndigits)
5483                         carry += var1digits[i1];
5484                 if (i2 >= 0 && i2 < var2ndigits)
5485                         carry += var2digits[i2];
5486
5487                 if (carry >= NBASE)
5488                 {
5489                         res_digits[i] = carry - NBASE;
5490                         carry = 1;
5491                 }
5492                 else
5493                 {
5494                         res_digits[i] = carry;
5495                         carry = 0;
5496                 }
5497         }
5498
5499         Assert(carry == 0);                     /* else we failed to allow for carry out */
5500
5501         digitbuf_free(result->buf);
5502         result->ndigits = res_ndigits;
5503         result->buf = res_buf;
5504         result->digits = res_digits;
5505         result->weight = res_weight;
5506         result->dscale = res_dscale;
5507
5508         /* Remove leading/trailing zeroes */
5509         strip_var(result);
5510 }
5511
5512
5513 /*
5514  * sub_abs()
5515  *
5516  *      Subtract the absolute value of var2 from the absolute value of var1
5517  *      and store in result. result might point to one of the operands
5518  *      without danger.
5519  *
5520  *      ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!!
5521  */
5522 static void
5523 sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5524 {
5525         NumericDigit *res_buf;
5526         NumericDigit *res_digits;
5527         int                     res_ndigits;
5528         int                     res_weight;
5529         int                     res_rscale,
5530                                 rscale1,
5531                                 rscale2;
5532         int                     res_dscale;
5533         int                     i,
5534                                 i1,
5535                                 i2;
5536         int                     borrow = 0;
5537
5538         /* copy these values into local vars for speed in inner loop */
5539         int                     var1ndigits = var1->ndigits;
5540         int                     var2ndigits = var2->ndigits;
5541         NumericDigit *var1digits = var1->digits;
5542         NumericDigit *var2digits = var2->digits;
5543
5544         res_weight = var1->weight;
5545
5546         res_dscale = Max(var1->dscale, var2->dscale);
5547
5548         /* Note: here we are figuring rscale in base-NBASE digits */
5549         rscale1 = var1->ndigits - var1->weight - 1;
5550         rscale2 = var2->ndigits - var2->weight - 1;
5551         res_rscale = Max(rscale1, rscale2);
5552
5553         res_ndigits = res_rscale + res_weight + 1;
5554         if (res_ndigits <= 0)
5555                 res_ndigits = 1;
5556
5557         res_buf = digitbuf_alloc(res_ndigits + 1);
5558         res_buf[0] = 0;                         /* spare digit for later rounding */
5559         res_digits = res_buf + 1;
5560
5561         i1 = res_rscale + var1->weight + 1;
5562         i2 = res_rscale + var2->weight + 1;
5563         for (i = res_ndigits - 1; i >= 0; i--)
5564         {
5565                 i1--;
5566                 i2--;
5567                 if (i1 >= 0 && i1 < var1ndigits)
5568                         borrow += var1digits[i1];
5569                 if (i2 >= 0 && i2 < var2ndigits)
5570                         borrow -= var2digits[i2];
5571
5572                 if (borrow < 0)
5573                 {
5574                         res_digits[i] = borrow + NBASE;
5575                         borrow = -1;
5576                 }
5577                 else
5578                 {
5579                         res_digits[i] = borrow;
5580                         borrow = 0;
5581                 }
5582         }
5583
5584         Assert(borrow == 0);            /* else caller gave us var1 < var2 */
5585
5586         digitbuf_free(result->buf);
5587         result->ndigits = res_ndigits;
5588         result->buf = res_buf;
5589         result->digits = res_digits;
5590         result->weight = res_weight;
5591         result->dscale = res_dscale;
5592
5593         /* Remove leading/trailing zeroes */
5594         strip_var(result);
5595 }
5596
5597 /*
5598  * round_var
5599  *
5600  * Round the value of a variable to no more than rscale decimal digits
5601  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
5602  * rounding before the decimal point.
5603  */
5604 static void
5605 round_var(NumericVar *var, int rscale)
5606 {
5607         NumericDigit *digits = var->digits;
5608         int                     di;
5609         int                     ndigits;
5610         int                     carry;
5611
5612         var->dscale = rscale;
5613
5614         /* decimal digits wanted */
5615         di = (var->weight + 1) * DEC_DIGITS + rscale;
5616
5617         /*
5618          * If di = 0, the value loses all digits, but could round up to 1 if its
5619          * first extra digit is >= 5.  If di < 0 the result must be 0.
5620          */
5621         if (di < 0)
5622         {
5623                 var->ndigits = 0;
5624                 var->weight = 0;
5625                 var->sign = NUMERIC_POS;
5626         }
5627         else
5628         {
5629                 /* NBASE digits wanted */
5630                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5631
5632                 /* 0, or number of decimal digits to keep in last NBASE digit */
5633                 di %= DEC_DIGITS;
5634
5635                 if (ndigits < var->ndigits ||
5636                         (ndigits == var->ndigits && di > 0))
5637                 {
5638                         var->ndigits = ndigits;
5639
5640 #if DEC_DIGITS == 1
5641                         /* di must be zero */
5642                         carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5643 #else
5644                         if (di == 0)
5645                                 carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
5646                         else
5647                         {
5648                                 /* Must round within last NBASE digit */
5649                                 int                     extra,
5650                                                         pow10;
5651
5652 #if DEC_DIGITS == 4
5653                                 pow10 = round_powers[di];
5654 #elif DEC_DIGITS == 2
5655                                 pow10 = 10;
5656 #else
5657 #error unsupported NBASE
5658 #endif
5659                                 extra = digits[--ndigits] % pow10;
5660                                 digits[ndigits] -= extra;
5661                                 carry = 0;
5662                                 if (extra >= pow10 / 2)
5663                                 {
5664                                         pow10 += digits[ndigits];
5665                                         if (pow10 >= NBASE)
5666                                         {
5667                                                 pow10 -= NBASE;
5668                                                 carry = 1;
5669                                         }
5670                                         digits[ndigits] = pow10;
5671                                 }
5672                         }
5673 #endif
5674
5675                         /* Propagate carry if needed */
5676                         while (carry)
5677                         {
5678                                 carry += digits[--ndigits];
5679                                 if (carry >= NBASE)
5680                                 {
5681                                         digits[ndigits] = carry - NBASE;
5682                                         carry = 1;
5683                                 }
5684                                 else
5685                                 {
5686                                         digits[ndigits] = carry;
5687                                         carry = 0;
5688                                 }
5689                         }
5690
5691                         if (ndigits < 0)
5692                         {
5693                                 Assert(ndigits == -1);  /* better not have added > 1 digit */
5694                                 Assert(var->digits > var->buf);
5695                                 var->digits--;
5696                                 var->ndigits++;
5697                                 var->weight++;
5698                         }
5699                 }
5700         }
5701 }
5702
5703 /*
5704  * trunc_var
5705  *
5706  * Truncate (towards zero) the value of a variable at rscale decimal digits
5707  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
5708  * truncation before the decimal point.
5709  */
5710 static void
5711 trunc_var(NumericVar *var, int rscale)
5712 {
5713         int                     di;
5714         int                     ndigits;
5715
5716         var->dscale = rscale;
5717
5718         /* decimal digits wanted */
5719         di = (var->weight + 1) * DEC_DIGITS + rscale;
5720
5721         /*
5722          * If di <= 0, the value loses all digits.
5723          */
5724         if (di <= 0)
5725         {
5726                 var->ndigits = 0;
5727                 var->weight = 0;
5728                 var->sign = NUMERIC_POS;
5729         }
5730         else
5731         {
5732                 /* NBASE digits wanted */
5733                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
5734
5735                 if (ndigits <= var->ndigits)
5736                 {
5737                         var->ndigits = ndigits;
5738
5739 #if DEC_DIGITS == 1
5740                         /* no within-digit stuff to worry about */
5741 #else
5742                         /* 0, or number of decimal digits to keep in last NBASE digit */
5743                         di %= DEC_DIGITS;
5744
5745                         if (di > 0)
5746                         {
5747                                 /* Must truncate within last NBASE digit */
5748                                 NumericDigit *digits = var->digits;
5749                                 int                     extra,
5750                                                         pow10;
5751
5752 #if DEC_DIGITS == 4
5753                                 pow10 = round_powers[di];
5754 #elif DEC_DIGITS == 2
5755                                 pow10 = 10;
5756 #else
5757 #error unsupported NBASE
5758 #endif
5759                                 extra = digits[--ndigits] % pow10;
5760                                 digits[ndigits] -= extra;
5761                         }
5762 #endif
5763                 }
5764         }
5765 }
5766
5767 /*
5768  * strip_var
5769  *
5770  * Strip any leading and trailing zeroes from a numeric variable
5771  */
5772 static void
5773 strip_var(NumericVar *var)
5774 {
5775         NumericDigit *digits = var->digits;
5776         int                     ndigits = var->ndigits;
5777
5778         /* Strip leading zeroes */
5779         while (ndigits > 0 && *digits == 0)
5780         {
5781                 digits++;
5782                 var->weight--;
5783                 ndigits--;
5784         }
5785
5786         /* Strip trailing zeroes */
5787         while (ndigits > 0 && digits[ndigits - 1] == 0)
5788                 ndigits--;
5789
5790         /* If it's zero, normalize the sign and weight */
5791         if (ndigits == 0)
5792         {
5793                 var->sign = NUMERIC_POS;
5794                 var->weight = 0;
5795         }
5796
5797         var->digits = digits;
5798         var->ndigits = ndigits;
5799 }