]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/numeric.c
Silence compiler warnings about possibly unset variables.
[postgresql] / src / backend / utils / adt / numeric.c
1 /*-------------------------------------------------------------------------
2  *
3  * numeric.c
4  *        An exact numeric data type for the Postgres database system
5  *
6  * Original coding 1998, Jan Wieck.  Heavily revised 2003, Tom Lane.
7  *
8  * Many of the algorithmic ideas are borrowed from David M. Smith's "FM"
9  * multiple-precision math library, most recently published as Algorithm
10  * 786: Multiple-Precision Complex Arithmetic and Functions, ACM
11  * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998,
12  * pages 359-367.
13  *
14  * Copyright (c) 1998-2014, PostgreSQL Global Development Group
15  *
16  * IDENTIFICATION
17  *        src/backend/utils/adt/numeric.c
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include <ctype.h>
25 #include <float.h>
26 #include <limits.h>
27 #include <math.h>
28
29 #include "access/hash.h"
30 #include "catalog/pg_type.h"
31 #include "libpq/pqformat.h"
32 #include "miscadmin.h"
33 #include "nodes/nodeFuncs.h"
34 #include "utils/array.h"
35 #include "utils/builtins.h"
36 #include "utils/int8.h"
37 #include "utils/numeric.h"
38
39 /* ----------
40  * Uncomment the following to enable compilation of dump_numeric()
41  * and dump_var() and to get a dump of any result produced by make_result().
42  * ----------
43 #define NUMERIC_DEBUG
44  */
45
46
47 /* ----------
48  * Local data types
49  *
50  * Numeric values are represented in a base-NBASE floating point format.
51  * Each "digit" ranges from 0 to NBASE-1.  The type NumericDigit is signed
52  * and wide enough to store a digit.  We assume that NBASE*NBASE can fit in
53  * an int.      Although the purely calculational routines could handle any even
54  * NBASE that's less than sqrt(INT_MAX), in practice we are only interested
55  * in NBASE a power of ten, so that I/O conversions and decimal rounding
56  * are easy.  Also, it's actually more efficient if NBASE is rather less than
57  * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var_fast to
58  * postpone processing carries.
59  * ----------
60  */
61
62 #if 0
63 #define NBASE           10
64 #define HALF_NBASE      5
65 #define DEC_DIGITS      1                       /* decimal digits per NBASE digit */
66 #define MUL_GUARD_DIGITS        4       /* these are measured in NBASE digits */
67 #define DIV_GUARD_DIGITS        8
68
69 typedef signed char NumericDigit;
70 #endif
71
72 #if 0
73 #define NBASE           100
74 #define HALF_NBASE      50
75 #define DEC_DIGITS      2                       /* decimal digits per NBASE digit */
76 #define MUL_GUARD_DIGITS        3       /* these are measured in NBASE digits */
77 #define DIV_GUARD_DIGITS        6
78
79 typedef signed char NumericDigit;
80 #endif
81
82 #if 1
83 #define NBASE           10000
84 #define HALF_NBASE      5000
85 #define DEC_DIGITS      4                       /* decimal digits per NBASE digit */
86 #define MUL_GUARD_DIGITS        2       /* these are measured in NBASE digits */
87 #define DIV_GUARD_DIGITS        4
88
89 typedef int16 NumericDigit;
90 #endif
91
92 /*
93  * The Numeric type as stored on disk.
94  *
95  * If the high bits of the first word of a NumericChoice (n_header, or
96  * n_short.n_header, or n_long.n_sign_dscale) are NUMERIC_SHORT, then the
97  * numeric follows the NumericShort format; if they are NUMERIC_POS or
98  * NUMERIC_NEG, it follows the NumericLong format.      If they are NUMERIC_NAN,
99  * it is a NaN.  We currently always store a NaN using just two bytes (i.e.
100  * only n_header), but previous releases used only the NumericLong format,
101  * so we might find 4-byte NaNs on disk if a database has been migrated using
102  * pg_upgrade.  In either case, when the high bits indicate a NaN, the
103  * remaining bits are never examined.  Currently, we always initialize these
104  * to zero, but it might be possible to use them for some other purpose in
105  * the future.
106  *
107  * In the NumericShort format, the remaining 14 bits of the header word
108  * (n_short.n_header) are allocated as follows: 1 for sign (positive or
109  * negative), 6 for dynamic scale, and 7 for weight.  In practice, most
110  * commonly-encountered values can be represented this way.
111  *
112  * In the NumericLong format, the remaining 14 bits of the header word
113  * (n_long.n_sign_dscale) represent the display scale; and the weight is
114  * stored separately in n_weight.
115  *
116  * NOTE: by convention, values in the packed form have been stripped of
117  * all leading and trailing zero digits (where a "digit" is of base NBASE).
118  * In particular, if the value is zero, there will be no digits at all!
119  * The weight is arbitrary in that case, but we normally set it to zero.
120  */
121
122 struct NumericShort
123 {
124         uint16          n_header;               /* Sign + display scale + weight */
125         NumericDigit n_data[1];         /* Digits */
126 };
127
128 struct NumericLong
129 {
130         uint16          n_sign_dscale;  /* Sign + display scale */
131         int16           n_weight;               /* Weight of 1st digit  */
132         NumericDigit n_data[1];         /* Digits */
133 };
134
135 union NumericChoice
136 {
137         uint16          n_header;               /* Header word */
138         struct NumericLong n_long;      /* Long form (4-byte header) */
139         struct NumericShort n_short;    /* Short form (2-byte header) */
140 };
141
142 struct NumericData
143 {
144         int32           vl_len_;                /* varlena header (do not touch directly!) */
145         union NumericChoice choice; /* choice of format */
146 };
147
148
149 /*
150  * Interpretation of high bits.
151  */
152
153 #define NUMERIC_SIGN_MASK       0xC000
154 #define NUMERIC_POS                     0x0000
155 #define NUMERIC_NEG                     0x4000
156 #define NUMERIC_SHORT           0x8000
157 #define NUMERIC_NAN                     0xC000
158
159 #define NUMERIC_FLAGBITS(n) ((n)->choice.n_header & NUMERIC_SIGN_MASK)
160 #define NUMERIC_IS_NAN(n)               (NUMERIC_FLAGBITS(n) == NUMERIC_NAN)
161 #define NUMERIC_IS_SHORT(n)             (NUMERIC_FLAGBITS(n) == NUMERIC_SHORT)
162
163 #define NUMERIC_HDRSZ   (VARHDRSZ + sizeof(uint16) + sizeof(int16))
164 #define NUMERIC_HDRSZ_SHORT (VARHDRSZ + sizeof(uint16))
165
166 /*
167  * If the flag bits are NUMERIC_SHORT or NUMERIC_NAN, we want the short header;
168  * otherwise, we want the long one.  Instead of testing against each value, we
169  * can just look at the high bit, for a slight efficiency gain.
170  */
171 #define NUMERIC_HEADER_SIZE(n) \
172         (VARHDRSZ + sizeof(uint16) + \
173                 (((NUMERIC_FLAGBITS(n) & 0x8000) == 0) ? sizeof(int16) : 0))
174
175 /*
176  * Short format definitions.
177  */
178
179 #define NUMERIC_SHORT_SIGN_MASK                 0x2000
180 #define NUMERIC_SHORT_DSCALE_MASK               0x1F80
181 #define NUMERIC_SHORT_DSCALE_SHIFT              7
182 #define NUMERIC_SHORT_DSCALE_MAX                \
183         (NUMERIC_SHORT_DSCALE_MASK >> NUMERIC_SHORT_DSCALE_SHIFT)
184 #define NUMERIC_SHORT_WEIGHT_SIGN_MASK  0x0040
185 #define NUMERIC_SHORT_WEIGHT_MASK               0x003F
186 #define NUMERIC_SHORT_WEIGHT_MAX                NUMERIC_SHORT_WEIGHT_MASK
187 #define NUMERIC_SHORT_WEIGHT_MIN                (-(NUMERIC_SHORT_WEIGHT_MASK+1))
188
189 /*
190  * Extract sign, display scale, weight.
191  */
192
193 #define NUMERIC_DSCALE_MASK                     0x3FFF
194
195 #define NUMERIC_SIGN(n) \
196         (NUMERIC_IS_SHORT(n) ? \
197                 (((n)->choice.n_short.n_header & NUMERIC_SHORT_SIGN_MASK) ? \
198                 NUMERIC_NEG : NUMERIC_POS) : NUMERIC_FLAGBITS(n))
199 #define NUMERIC_DSCALE(n)       (NUMERIC_IS_SHORT((n)) ? \
200         ((n)->choice.n_short.n_header & NUMERIC_SHORT_DSCALE_MASK) \
201                 >> NUMERIC_SHORT_DSCALE_SHIFT \
202         : ((n)->choice.n_long.n_sign_dscale & NUMERIC_DSCALE_MASK))
203 #define NUMERIC_WEIGHT(n)       (NUMERIC_IS_SHORT((n)) ? \
204         (((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_SIGN_MASK ? \
205                 ~NUMERIC_SHORT_WEIGHT_MASK : 0) \
206          | ((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_MASK)) \
207         : ((n)->choice.n_long.n_weight))
208
209 /* ----------
210  * NumericVar is the format we use for arithmetic.      The digit-array part
211  * is the same as the NumericData storage format, but the header is more
212  * complex.
213  *
214  * The value represented by a NumericVar is determined by the sign, weight,
215  * ndigits, and digits[] array.
216  * Note: the first digit of a NumericVar's value is assumed to be multiplied
217  * by NBASE ** weight.  Another way to say it is that there are weight+1
218  * digits before the decimal point.  It is possible to have weight < 0.
219  *
220  * buf points at the physical start of the palloc'd digit buffer for the
221  * NumericVar.  digits points at the first digit in actual use (the one
222  * with the specified weight).  We normally leave an unused digit or two
223  * (preset to zeroes) between buf and digits, so that there is room to store
224  * a carry out of the top digit without reallocating space.  We just need to
225  * decrement digits (and increment weight) to make room for the carry digit.
226  * (There is no such extra space in a numeric value stored in the database,
227  * only in a NumericVar in memory.)
228  *
229  * If buf is NULL then the digit buffer isn't actually palloc'd and should
230  * not be freed --- see the constants below for an example.
231  *
232  * dscale, or display scale, is the nominal precision expressed as number
233  * of digits after the decimal point (it must always be >= 0 at present).
234  * dscale may be more than the number of physically stored fractional digits,
235  * implying that we have suppressed storage of significant trailing zeroes.
236  * It should never be less than the number of stored digits, since that would
237  * imply hiding digits that are present.  NOTE that dscale is always expressed
238  * in *decimal* digits, and so it may correspond to a fractional number of
239  * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits.
240  *
241  * rscale, or result scale, is the target precision for a computation.
242  * Like dscale it is expressed as number of *decimal* digits after the decimal
243  * point, and is always >= 0 at present.
244  * Note that rscale is not stored in variables --- it's figured on-the-fly
245  * from the dscales of the inputs.
246  *
247  * NB: All the variable-level functions are written in a style that makes it
248  * possible to give one and the same variable as argument and destination.
249  * This is feasible because the digit buffer is separate from the variable.
250  * ----------
251  */
252 typedef struct NumericVar
253 {
254         int                     ndigits;                /* # of digits in digits[] - can be 0! */
255         int                     weight;                 /* weight of first digit */
256         int                     sign;                   /* NUMERIC_POS, NUMERIC_NEG, or NUMERIC_NAN */
257         int                     dscale;                 /* display scale */
258         NumericDigit *buf;                      /* start of palloc'd space for digits[] */
259         NumericDigit *digits;           /* base-NBASE digits */
260 } NumericVar;
261
262
263 /* ----------
264  * Some preinitialized constants
265  * ----------
266  */
267 static NumericDigit const_zero_data[1] = {0};
268 static NumericVar const_zero =
269 {0, 0, NUMERIC_POS, 0, NULL, const_zero_data};
270
271 static NumericDigit const_one_data[1] = {1};
272 static NumericVar const_one =
273 {1, 0, NUMERIC_POS, 0, NULL, const_one_data};
274
275 static NumericDigit const_two_data[1] = {2};
276 static NumericVar const_two =
277 {1, 0, NUMERIC_POS, 0, NULL, const_two_data};
278
279 #if DEC_DIGITS == 4 || DEC_DIGITS == 2
280 static NumericDigit const_ten_data[1] = {10};
281 static NumericVar const_ten =
282 {1, 0, NUMERIC_POS, 0, NULL, const_ten_data};
283 #elif DEC_DIGITS == 1
284 static NumericDigit const_ten_data[1] = {1};
285 static NumericVar const_ten =
286 {1, 1, NUMERIC_POS, 0, NULL, const_ten_data};
287 #endif
288
289 #if DEC_DIGITS == 4
290 static NumericDigit const_zero_point_five_data[1] = {5000};
291 #elif DEC_DIGITS == 2
292 static NumericDigit const_zero_point_five_data[1] = {50};
293 #elif DEC_DIGITS == 1
294 static NumericDigit const_zero_point_five_data[1] = {5};
295 #endif
296 static NumericVar const_zero_point_five =
297 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data};
298
299 #if DEC_DIGITS == 4
300 static NumericDigit const_zero_point_nine_data[1] = {9000};
301 #elif DEC_DIGITS == 2
302 static NumericDigit const_zero_point_nine_data[1] = {90};
303 #elif DEC_DIGITS == 1
304 static NumericDigit const_zero_point_nine_data[1] = {9};
305 #endif
306 static NumericVar const_zero_point_nine =
307 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data};
308
309 #if DEC_DIGITS == 4
310 static NumericDigit const_zero_point_01_data[1] = {100};
311 static NumericVar const_zero_point_01 =
312 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
313 #elif DEC_DIGITS == 2
314 static NumericDigit const_zero_point_01_data[1] = {1};
315 static NumericVar const_zero_point_01 =
316 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
317 #elif DEC_DIGITS == 1
318 static NumericDigit const_zero_point_01_data[1] = {1};
319 static NumericVar const_zero_point_01 =
320 {1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
321 #endif
322
323 #if DEC_DIGITS == 4
324 static NumericDigit const_one_point_one_data[2] = {1, 1000};
325 #elif DEC_DIGITS == 2
326 static NumericDigit const_one_point_one_data[2] = {1, 10};
327 #elif DEC_DIGITS == 1
328 static NumericDigit const_one_point_one_data[2] = {1, 1};
329 #endif
330 static NumericVar const_one_point_one =
331 {2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data};
332
333 static NumericVar const_nan =
334 {0, 0, NUMERIC_NAN, 0, NULL, NULL};
335
336 #if DEC_DIGITS == 4
337 static const int round_powers[4] = {0, 1000, 100, 10};
338 #endif
339
340
341 /* ----------
342  * Local functions
343  * ----------
344  */
345
346 #ifdef NUMERIC_DEBUG
347 static void dump_numeric(const char *str, Numeric num);
348 static void dump_var(const char *str, NumericVar *var);
349 #else
350 #define dump_numeric(s,n)
351 #define dump_var(s,v)
352 #endif
353
354 #define digitbuf_alloc(ndigits)  \
355         ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit)))
356 #define digitbuf_free(buf)      \
357         do { \
358                  if ((buf) != NULL) \
359                          pfree(buf); \
360         } while (0)
361
362 #define init_var(v)             MemSetAligned(v, 0, sizeof(NumericVar))
363
364 #define NUMERIC_DIGITS(num) (NUMERIC_IS_SHORT(num) ? \
365         (num)->choice.n_short.n_data : (num)->choice.n_long.n_data)
366 #define NUMERIC_NDIGITS(num) \
367         ((VARSIZE(num) - NUMERIC_HEADER_SIZE(num)) / sizeof(NumericDigit))
368 #define NUMERIC_CAN_BE_SHORT(scale,weight) \
369         ((scale) <= NUMERIC_SHORT_DSCALE_MAX && \
370         (weight) <= NUMERIC_SHORT_WEIGHT_MAX && \
371         (weight) >= NUMERIC_SHORT_WEIGHT_MIN)
372
373 static void alloc_var(NumericVar *var, int ndigits);
374 static void free_var(NumericVar *var);
375 static void zero_var(NumericVar *var);
376
377 static const char *set_var_from_str(const char *str, const char *cp,
378                                  NumericVar *dest);
379 static void set_var_from_num(Numeric value, NumericVar *dest);
380 static void init_var_from_num(Numeric num, NumericVar *dest);
381 static void set_var_from_var(NumericVar *value, NumericVar *dest);
382 static char *get_str_from_var(NumericVar *var);
383 static char *get_str_from_var_sci(NumericVar *var, int rscale);
384
385 static Numeric make_result(NumericVar *var);
386
387 static void apply_typmod(NumericVar *var, int32 typmod);
388
389 static int32 numericvar_to_int4(NumericVar *var);
390 static bool numericvar_to_int8(NumericVar *var, int64 *result);
391 static void int8_to_numericvar(int64 val, NumericVar *var);
392 static double numeric_to_double_no_overflow(Numeric num);
393 static double numericvar_to_double_no_overflow(NumericVar *var);
394
395 static int      cmp_numerics(Numeric num1, Numeric num2);
396 static int      cmp_var(NumericVar *var1, NumericVar *var2);
397 static int cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
398                            int var1weight, int var1sign,
399                            const NumericDigit *var2digits, int var2ndigits,
400                            int var2weight, int var2sign);
401 static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
402 static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
403 static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
404                 int rscale);
405 static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
406                 int rscale, bool round);
407 static void div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result,
408                          int rscale, bool round);
409 static int      select_div_scale(NumericVar *var1, NumericVar *var2);
410 static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
411 static void ceil_var(NumericVar *var, NumericVar *result);
412 static void floor_var(NumericVar *var, NumericVar *result);
413
414 static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale);
415 static void exp_var(NumericVar *arg, NumericVar *result, int rscale);
416 static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale);
417 static void ln_var(NumericVar *arg, NumericVar *result, int rscale);
418 static void log_var(NumericVar *base, NumericVar *num, NumericVar *result);
419 static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result);
420 static void power_var_int(NumericVar *base, int exp, NumericVar *result,
421                           int rscale);
422
423 static int      cmp_abs(NumericVar *var1, NumericVar *var2);
424 static int cmp_abs_common(const NumericDigit *var1digits, int var1ndigits,
425                            int var1weight,
426                            const NumericDigit *var2digits, int var2ndigits,
427                            int var2weight);
428 static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
429 static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
430 static void round_var(NumericVar *var, int rscale);
431 static void trunc_var(NumericVar *var, int rscale);
432 static void strip_var(NumericVar *var);
433 static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
434                            NumericVar *count_var, NumericVar *result_var);
435
436
437 /* ----------------------------------------------------------------------
438  *
439  * Input-, output- and rounding-functions
440  *
441  * ----------------------------------------------------------------------
442  */
443
444
445 /*
446  * numeric_in() -
447  *
448  *      Input function for numeric data type
449  */
450 Datum
451 numeric_in(PG_FUNCTION_ARGS)
452 {
453         char       *str = PG_GETARG_CSTRING(0);
454
455 #ifdef NOT_USED
456         Oid                     typelem = PG_GETARG_OID(1);
457 #endif
458         int32           typmod = PG_GETARG_INT32(2);
459         Numeric         res;
460         const char *cp;
461
462         /* Skip leading spaces */
463         cp = str;
464         while (*cp)
465         {
466                 if (!isspace((unsigned char) *cp))
467                         break;
468                 cp++;
469         }
470
471         /*
472          * Check for NaN
473          */
474         if (pg_strncasecmp(cp, "NaN", 3) == 0)
475         {
476                 res = make_result(&const_nan);
477
478                 /* Should be nothing left but spaces */
479                 cp += 3;
480                 while (*cp)
481                 {
482                         if (!isspace((unsigned char) *cp))
483                                 ereport(ERROR,
484                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
485                                           errmsg("invalid input syntax for type numeric: \"%s\"",
486                                                          str)));
487                         cp++;
488                 }
489         }
490         else
491         {
492                 /*
493                  * Use set_var_from_str() to parse a normal numeric value
494                  */
495                 NumericVar      value;
496
497                 init_var(&value);
498
499                 cp = set_var_from_str(str, cp, &value);
500
501                 /*
502                  * We duplicate a few lines of code here because we would like to
503                  * throw any trailing-junk syntax error before any semantic error
504                  * resulting from apply_typmod.  We can't easily fold the two cases
505                  * together because we mustn't apply apply_typmod to a NaN.
506                  */
507                 while (*cp)
508                 {
509                         if (!isspace((unsigned char) *cp))
510                                 ereport(ERROR,
511                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
512                                           errmsg("invalid input syntax for type numeric: \"%s\"",
513                                                          str)));
514                         cp++;
515                 }
516
517                 apply_typmod(&value, typmod);
518
519                 res = make_result(&value);
520                 free_var(&value);
521         }
522
523         PG_RETURN_NUMERIC(res);
524 }
525
526
527 /*
528  * numeric_out() -
529  *
530  *      Output function for numeric data type
531  */
532 Datum
533 numeric_out(PG_FUNCTION_ARGS)
534 {
535         Numeric         num = PG_GETARG_NUMERIC(0);
536         NumericVar      x;
537         char       *str;
538
539         /*
540          * Handle NaN
541          */
542         if (NUMERIC_IS_NAN(num))
543                 PG_RETURN_CSTRING(pstrdup("NaN"));
544
545         /*
546          * Get the number in the variable format.
547          */
548         init_var_from_num(num, &x);
549
550         str = get_str_from_var(&x);
551
552         PG_RETURN_CSTRING(str);
553 }
554
555 /*
556  * numeric_is_nan() -
557  *
558  *      Is Numeric value a NaN?
559  */
560 bool
561 numeric_is_nan(Numeric num)
562 {
563         return NUMERIC_IS_NAN(num);
564 }
565
566 /*
567  * numeric_maximum_size() -
568  *
569  *      Maximum size of a numeric with given typmod, or -1 if unlimited/unknown.
570  */
571 int32
572 numeric_maximum_size(int32 typmod)
573 {
574         int                     precision;
575         int                     numeric_digits;
576
577         if (typmod < (int32) (VARHDRSZ))
578                 return -1;
579
580         /* precision (ie, max # of digits) is in upper bits of typmod */
581         precision = ((typmod - VARHDRSZ) >> 16) & 0xffff;
582
583         /*
584          * This formula computes the maximum number of NumericDigits we could need
585          * in order to store the specified number of decimal digits. Because the
586          * weight is stored as a number of NumericDigits rather than a number of
587          * decimal digits, it's possible that the first NumericDigit will contain
588          * only a single decimal digit.  Thus, the first two decimal digits can
589          * require two NumericDigits to store, but it isn't until we reach
590          * DEC_DIGITS + 2 decimal digits that we potentially need a third
591          * NumericDigit.
592          */
593         numeric_digits = (precision + 2 * (DEC_DIGITS - 1)) / DEC_DIGITS;
594
595         /*
596          * In most cases, the size of a numeric will be smaller than the value
597          * computed below, because the varlena header will typically get toasted
598          * down to a single byte before being stored on disk, and it may also be
599          * possible to use a short numeric header.      But our job here is to compute
600          * the worst case.
601          */
602         return NUMERIC_HDRSZ + (numeric_digits * sizeof(NumericDigit));
603 }
604
605 /*
606  * numeric_out_sci() -
607  *
608  *      Output function for numeric data type in scientific notation.
609  */
610 char *
611 numeric_out_sci(Numeric num, int scale)
612 {
613         NumericVar      x;
614         char       *str;
615
616         /*
617          * Handle NaN
618          */
619         if (NUMERIC_IS_NAN(num))
620                 return pstrdup("NaN");
621
622         init_var_from_num(num, &x);
623
624         str = get_str_from_var_sci(&x, scale);
625
626         return str;
627 }
628
629 /*
630  *              numeric_recv                    - converts external binary format to numeric
631  *
632  * External format is a sequence of int16's:
633  * ndigits, weight, sign, dscale, NumericDigits.
634  */
635 Datum
636 numeric_recv(PG_FUNCTION_ARGS)
637 {
638         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
639
640 #ifdef NOT_USED
641         Oid                     typelem = PG_GETARG_OID(1);
642 #endif
643         int32           typmod = PG_GETARG_INT32(2);
644         NumericVar      value;
645         Numeric         res;
646         int                     len,
647                                 i;
648
649         init_var(&value);
650
651         len = (uint16) pq_getmsgint(buf, sizeof(uint16));
652         if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE)
653                 ereport(ERROR,
654                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
655                                  errmsg("invalid length in external \"numeric\" value")));
656
657         alloc_var(&value, len);
658
659         value.weight = (int16) pq_getmsgint(buf, sizeof(int16));
660         value.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
661         if (!(value.sign == NUMERIC_POS ||
662                   value.sign == NUMERIC_NEG ||
663                   value.sign == NUMERIC_NAN))
664                 ereport(ERROR,
665                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
666                                  errmsg("invalid sign in external \"numeric\" value")));
667
668         value.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
669         for (i = 0; i < len; i++)
670         {
671                 NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
672
673                 if (d < 0 || d >= NBASE)
674                         ereport(ERROR,
675                                         (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
676                                          errmsg("invalid digit in external \"numeric\" value")));
677                 value.digits[i] = d;
678         }
679
680         apply_typmod(&value, typmod);
681
682         res = make_result(&value);
683         free_var(&value);
684
685         PG_RETURN_NUMERIC(res);
686 }
687
688 /*
689  *              numeric_send                    - converts numeric to binary format
690  */
691 Datum
692 numeric_send(PG_FUNCTION_ARGS)
693 {
694         Numeric         num = PG_GETARG_NUMERIC(0);
695         NumericVar      x;
696         StringInfoData buf;
697         int                     i;
698
699         init_var_from_num(num, &x);
700
701         pq_begintypsend(&buf);
702
703         pq_sendint(&buf, x.ndigits, sizeof(int16));
704         pq_sendint(&buf, x.weight, sizeof(int16));
705         pq_sendint(&buf, x.sign, sizeof(int16));
706         pq_sendint(&buf, x.dscale, sizeof(int16));
707         for (i = 0; i < x.ndigits; i++)
708                 pq_sendint(&buf, x.digits[i], sizeof(NumericDigit));
709
710         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
711 }
712
713
714 /*
715  * numeric_transform() -
716  *
717  * Flatten calls to numeric's length coercion function that solely represent
718  * increases in allowable precision.  Scale changes mutate every datum, so
719  * they are unoptimizable.      Some values, e.g. 1E-1001, can only fit into an
720  * unconstrained numeric, so a change from an unconstrained numeric to any
721  * constrained numeric is also unoptimizable.
722  */
723 Datum
724 numeric_transform(PG_FUNCTION_ARGS)
725 {
726         FuncExpr   *expr = (FuncExpr *) PG_GETARG_POINTER(0);
727         Node       *ret = NULL;
728         Node       *typmod;
729
730         Assert(IsA(expr, FuncExpr));
731         Assert(list_length(expr->args) >= 2);
732
733         typmod = (Node *) lsecond(expr->args);
734
735         if (IsA(typmod, Const) &&!((Const *) typmod)->constisnull)
736         {
737                 Node       *source = (Node *) linitial(expr->args);
738                 int32           old_typmod = exprTypmod(source);
739                 int32           new_typmod = DatumGetInt32(((Const *) typmod)->constvalue);
740                 int32           old_scale = (old_typmod - VARHDRSZ) & 0xffff;
741                 int32           new_scale = (new_typmod - VARHDRSZ) & 0xffff;
742                 int32           old_precision = (old_typmod - VARHDRSZ) >> 16 & 0xffff;
743                 int32           new_precision = (new_typmod - VARHDRSZ) >> 16 & 0xffff;
744
745                 /*
746                  * If new_typmod < VARHDRSZ, the destination is unconstrained; that's
747                  * always OK.  If old_typmod >= VARHDRSZ, the source is constrained,
748                  * and we're OK if the scale is unchanged and the precision is not
749                  * decreasing.  See further notes in function header comment.
750                  */
751                 if (new_typmod < (int32) VARHDRSZ ||
752                         (old_typmod >= (int32) VARHDRSZ &&
753                          new_scale == old_scale && new_precision >= old_precision))
754                         ret = relabel_to_typmod(source, new_typmod);
755         }
756
757         PG_RETURN_POINTER(ret);
758 }
759
760 /*
761  * numeric() -
762  *
763  *      This is a special function called by the Postgres database system
764  *      before a value is stored in a tuple's attribute. The precision and
765  *      scale of the attribute have to be applied on the value.
766  */
767 Datum
768 numeric         (PG_FUNCTION_ARGS)
769 {
770         Numeric         num = PG_GETARG_NUMERIC(0);
771         int32           typmod = PG_GETARG_INT32(1);
772         Numeric         new;
773         int32           tmp_typmod;
774         int                     precision;
775         int                     scale;
776         int                     ddigits;
777         int                     maxdigits;
778         NumericVar      var;
779
780         /*
781          * Handle NaN
782          */
783         if (NUMERIC_IS_NAN(num))
784                 PG_RETURN_NUMERIC(make_result(&const_nan));
785
786         /*
787          * If the value isn't a valid type modifier, simply return a copy of the
788          * input value
789          */
790         if (typmod < (int32) (VARHDRSZ))
791         {
792                 new = (Numeric) palloc(VARSIZE(num));
793                 memcpy(new, num, VARSIZE(num));
794                 PG_RETURN_NUMERIC(new);
795         }
796
797         /*
798          * Get the precision and scale out of the typmod value
799          */
800         tmp_typmod = typmod - VARHDRSZ;
801         precision = (tmp_typmod >> 16) & 0xffff;
802         scale = tmp_typmod & 0xffff;
803         maxdigits = precision - scale;
804
805         /*
806          * If the number is certainly in bounds and due to the target scale no
807          * rounding could be necessary, just make a copy of the input and modify
808          * its scale fields, unless the larger scale forces us to abandon the
809          * short representation.  (Note we assume the existing dscale is
810          * honest...)
811          */
812         ddigits = (NUMERIC_WEIGHT(num) + 1) * DEC_DIGITS;
813         if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num)
814                 && (NUMERIC_CAN_BE_SHORT(scale, NUMERIC_WEIGHT(num))
815                         || !NUMERIC_IS_SHORT(num)))
816         {
817                 new = (Numeric) palloc(VARSIZE(num));
818                 memcpy(new, num, VARSIZE(num));
819                 if (NUMERIC_IS_SHORT(num))
820                         new->choice.n_short.n_header =
821                                 (num->choice.n_short.n_header & ~NUMERIC_SHORT_DSCALE_MASK)
822                                 | (scale << NUMERIC_SHORT_DSCALE_SHIFT);
823                 else
824                         new->choice.n_long.n_sign_dscale = NUMERIC_SIGN(new) |
825                                 ((uint16) scale & NUMERIC_DSCALE_MASK);
826                 PG_RETURN_NUMERIC(new);
827         }
828
829         /*
830          * We really need to fiddle with things - unpack the number into a
831          * variable and let apply_typmod() do it.
832          */
833         init_var(&var);
834
835         set_var_from_num(num, &var);
836         apply_typmod(&var, typmod);
837         new = make_result(&var);
838
839         free_var(&var);
840
841         PG_RETURN_NUMERIC(new);
842 }
843
844 Datum
845 numerictypmodin(PG_FUNCTION_ARGS)
846 {
847         ArrayType  *ta = PG_GETARG_ARRAYTYPE_P(0);
848         int32      *tl;
849         int                     n;
850         int32           typmod;
851
852         tl = ArrayGetIntegerTypmods(ta, &n);
853
854         if (n == 2)
855         {
856                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
857                         ereport(ERROR,
858                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
859                                          errmsg("NUMERIC precision %d must be between 1 and %d",
860                                                         tl[0], NUMERIC_MAX_PRECISION)));
861                 if (tl[1] < 0 || tl[1] > tl[0])
862                         ereport(ERROR,
863                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
864                                 errmsg("NUMERIC scale %d must be between 0 and precision %d",
865                                            tl[1], tl[0])));
866                 typmod = ((tl[0] << 16) | tl[1]) + VARHDRSZ;
867         }
868         else if (n == 1)
869         {
870                 if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
871                         ereport(ERROR,
872                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
873                                          errmsg("NUMERIC precision %d must be between 1 and %d",
874                                                         tl[0], NUMERIC_MAX_PRECISION)));
875                 /* scale defaults to zero */
876                 typmod = (tl[0] << 16) + VARHDRSZ;
877         }
878         else
879         {
880                 ereport(ERROR,
881                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
882                                  errmsg("invalid NUMERIC type modifier")));
883                 typmod = 0;                             /* keep compiler quiet */
884         }
885
886         PG_RETURN_INT32(typmod);
887 }
888
889 Datum
890 numerictypmodout(PG_FUNCTION_ARGS)
891 {
892         int32           typmod = PG_GETARG_INT32(0);
893         char       *res = (char *) palloc(64);
894
895         if (typmod >= 0)
896                 snprintf(res, 64, "(%d,%d)",
897                                  ((typmod - VARHDRSZ) >> 16) & 0xffff,
898                                  (typmod - VARHDRSZ) & 0xffff);
899         else
900                 *res = '\0';
901
902         PG_RETURN_CSTRING(res);
903 }
904
905
906 /* ----------------------------------------------------------------------
907  *
908  * Sign manipulation, rounding and the like
909  *
910  * ----------------------------------------------------------------------
911  */
912
913 Datum
914 numeric_abs(PG_FUNCTION_ARGS)
915 {
916         Numeric         num = PG_GETARG_NUMERIC(0);
917         Numeric         res;
918
919         /*
920          * Handle NaN
921          */
922         if (NUMERIC_IS_NAN(num))
923                 PG_RETURN_NUMERIC(make_result(&const_nan));
924
925         /*
926          * Do it the easy way directly on the packed format
927          */
928         res = (Numeric) palloc(VARSIZE(num));
929         memcpy(res, num, VARSIZE(num));
930
931         if (NUMERIC_IS_SHORT(num))
932                 res->choice.n_short.n_header =
933                         num->choice.n_short.n_header & ~NUMERIC_SHORT_SIGN_MASK;
934         else
935                 res->choice.n_long.n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
936
937         PG_RETURN_NUMERIC(res);
938 }
939
940
941 Datum
942 numeric_uminus(PG_FUNCTION_ARGS)
943 {
944         Numeric         num = PG_GETARG_NUMERIC(0);
945         Numeric         res;
946
947         /*
948          * Handle NaN
949          */
950         if (NUMERIC_IS_NAN(num))
951                 PG_RETURN_NUMERIC(make_result(&const_nan));
952
953         /*
954          * Do it the easy way directly on the packed format
955          */
956         res = (Numeric) palloc(VARSIZE(num));
957         memcpy(res, num, VARSIZE(num));
958
959         /*
960          * The packed format is known to be totally zero digit trimmed always. So
961          * we can identify a ZERO by the fact that there are no digits at all.  Do
962          * nothing to a zero.
963          */
964         if (NUMERIC_NDIGITS(num) != 0)
965         {
966                 /* Else, flip the sign */
967                 if (NUMERIC_IS_SHORT(num))
968                         res->choice.n_short.n_header =
969                                 num->choice.n_short.n_header ^ NUMERIC_SHORT_SIGN_MASK;
970                 else if (NUMERIC_SIGN(num) == NUMERIC_POS)
971                         res->choice.n_long.n_sign_dscale =
972                                 NUMERIC_NEG | NUMERIC_DSCALE(num);
973                 else
974                         res->choice.n_long.n_sign_dscale =
975                                 NUMERIC_POS | NUMERIC_DSCALE(num);
976         }
977
978         PG_RETURN_NUMERIC(res);
979 }
980
981
982 Datum
983 numeric_uplus(PG_FUNCTION_ARGS)
984 {
985         Numeric         num = PG_GETARG_NUMERIC(0);
986         Numeric         res;
987
988         res = (Numeric) palloc(VARSIZE(num));
989         memcpy(res, num, VARSIZE(num));
990
991         PG_RETURN_NUMERIC(res);
992 }
993
994 /*
995  * numeric_sign() -
996  *
997  * returns -1 if the argument is less than 0, 0 if the argument is equal
998  * to 0, and 1 if the argument is greater than zero.
999  */
1000 Datum
1001 numeric_sign(PG_FUNCTION_ARGS)
1002 {
1003         Numeric         num = PG_GETARG_NUMERIC(0);
1004         Numeric         res;
1005         NumericVar      result;
1006
1007         /*
1008          * Handle NaN
1009          */
1010         if (NUMERIC_IS_NAN(num))
1011                 PG_RETURN_NUMERIC(make_result(&const_nan));
1012
1013         init_var(&result);
1014
1015         /*
1016          * The packed format is known to be totally zero digit trimmed always. So
1017          * we can identify a ZERO by the fact that there are no digits at all.
1018          */
1019         if (NUMERIC_NDIGITS(num) == 0)
1020                 set_var_from_var(&const_zero, &result);
1021         else
1022         {
1023                 /*
1024                  * And if there are some, we return a copy of ONE with the sign of our
1025                  * argument
1026                  */
1027                 set_var_from_var(&const_one, &result);
1028                 result.sign = NUMERIC_SIGN(num);
1029         }
1030
1031         res = make_result(&result);
1032         free_var(&result);
1033
1034         PG_RETURN_NUMERIC(res);
1035 }
1036
1037
1038 /*
1039  * numeric_round() -
1040  *
1041  *      Round a value to have 'scale' digits after the decimal point.
1042  *      We allow negative 'scale', implying rounding before the decimal
1043  *      point --- Oracle interprets rounding that way.
1044  */
1045 Datum
1046 numeric_round(PG_FUNCTION_ARGS)
1047 {
1048         Numeric         num = PG_GETARG_NUMERIC(0);
1049         int32           scale = PG_GETARG_INT32(1);
1050         Numeric         res;
1051         NumericVar      arg;
1052
1053         /*
1054          * Handle NaN
1055          */
1056         if (NUMERIC_IS_NAN(num))
1057                 PG_RETURN_NUMERIC(make_result(&const_nan));
1058
1059         /*
1060          * Limit the scale value to avoid possible overflow in calculations
1061          */
1062         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
1063         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
1064
1065         /*
1066          * Unpack the argument and round it at the proper digit position
1067          */
1068         init_var(&arg);
1069         set_var_from_num(num, &arg);
1070
1071         round_var(&arg, scale);
1072
1073         /* We don't allow negative output dscale */
1074         if (scale < 0)
1075                 arg.dscale = 0;
1076
1077         /*
1078          * Return the rounded result
1079          */
1080         res = make_result(&arg);
1081
1082         free_var(&arg);
1083         PG_RETURN_NUMERIC(res);
1084 }
1085
1086
1087 /*
1088  * numeric_trunc() -
1089  *
1090  *      Truncate a value to have 'scale' digits after the decimal point.
1091  *      We allow negative 'scale', implying a truncation before the decimal
1092  *      point --- Oracle interprets truncation that way.
1093  */
1094 Datum
1095 numeric_trunc(PG_FUNCTION_ARGS)
1096 {
1097         Numeric         num = PG_GETARG_NUMERIC(0);
1098         int32           scale = PG_GETARG_INT32(1);
1099         Numeric         res;
1100         NumericVar      arg;
1101
1102         /*
1103          * Handle NaN
1104          */
1105         if (NUMERIC_IS_NAN(num))
1106                 PG_RETURN_NUMERIC(make_result(&const_nan));
1107
1108         /*
1109          * Limit the scale value to avoid possible overflow in calculations
1110          */
1111         scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
1112         scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
1113
1114         /*
1115          * Unpack the argument and truncate it at the proper digit position
1116          */
1117         init_var(&arg);
1118         set_var_from_num(num, &arg);
1119
1120         trunc_var(&arg, scale);
1121
1122         /* We don't allow negative output dscale */
1123         if (scale < 0)
1124                 arg.dscale = 0;
1125
1126         /*
1127          * Return the truncated result
1128          */
1129         res = make_result(&arg);
1130
1131         free_var(&arg);
1132         PG_RETURN_NUMERIC(res);
1133 }
1134
1135
1136 /*
1137  * numeric_ceil() -
1138  *
1139  *      Return the smallest integer greater than or equal to the argument
1140  */
1141 Datum
1142 numeric_ceil(PG_FUNCTION_ARGS)
1143 {
1144         Numeric         num = PG_GETARG_NUMERIC(0);
1145         Numeric         res;
1146         NumericVar      result;
1147
1148         if (NUMERIC_IS_NAN(num))
1149                 PG_RETURN_NUMERIC(make_result(&const_nan));
1150
1151         init_var_from_num(num, &result);
1152         ceil_var(&result, &result);
1153
1154         res = make_result(&result);
1155         free_var(&result);
1156
1157         PG_RETURN_NUMERIC(res);
1158 }
1159
1160
1161 /*
1162  * numeric_floor() -
1163  *
1164  *      Return the largest integer equal to or less than the argument
1165  */
1166 Datum
1167 numeric_floor(PG_FUNCTION_ARGS)
1168 {
1169         Numeric         num = PG_GETARG_NUMERIC(0);
1170         Numeric         res;
1171         NumericVar      result;
1172
1173         if (NUMERIC_IS_NAN(num))
1174                 PG_RETURN_NUMERIC(make_result(&const_nan));
1175
1176         init_var_from_num(num, &result);
1177         floor_var(&result, &result);
1178
1179         res = make_result(&result);
1180         free_var(&result);
1181
1182         PG_RETURN_NUMERIC(res);
1183 }
1184
1185 /*
1186  * Implements the numeric version of the width_bucket() function
1187  * defined by SQL2003. See also width_bucket_float8().
1188  *
1189  * 'bound1' and 'bound2' are the lower and upper bounds of the
1190  * histogram's range, respectively. 'count' is the number of buckets
1191  * in the histogram. width_bucket() returns an integer indicating the
1192  * bucket number that 'operand' belongs to in an equiwidth histogram
1193  * with the specified characteristics. An operand smaller than the
1194  * lower bound is assigned to bucket 0. An operand greater than the
1195  * upper bound is assigned to an additional bucket (with number
1196  * count+1). We don't allow "NaN" for any of the numeric arguments.
1197  */
1198 Datum
1199 width_bucket_numeric(PG_FUNCTION_ARGS)
1200 {
1201         Numeric         operand = PG_GETARG_NUMERIC(0);
1202         Numeric         bound1 = PG_GETARG_NUMERIC(1);
1203         Numeric         bound2 = PG_GETARG_NUMERIC(2);
1204         int32           count = PG_GETARG_INT32(3);
1205         NumericVar      count_var;
1206         NumericVar      result_var;
1207         int32           result;
1208
1209         if (count <= 0)
1210                 ereport(ERROR,
1211                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
1212                                  errmsg("count must be greater than zero")));
1213
1214         if (NUMERIC_IS_NAN(operand) ||
1215                 NUMERIC_IS_NAN(bound1) ||
1216                 NUMERIC_IS_NAN(bound2))
1217                 ereport(ERROR,
1218                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
1219                          errmsg("operand, lower bound, and upper bound cannot be NaN")));
1220
1221         init_var(&result_var);
1222         init_var(&count_var);
1223
1224         /* Convert 'count' to a numeric, for ease of use later */
1225         int8_to_numericvar((int64) count, &count_var);
1226
1227         switch (cmp_numerics(bound1, bound2))
1228         {
1229                 case 0:
1230                         ereport(ERROR,
1231                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
1232                                  errmsg("lower bound cannot equal upper bound")));
1233
1234                         /* bound1 < bound2 */
1235                 case -1:
1236                         if (cmp_numerics(operand, bound1) < 0)
1237                                 set_var_from_var(&const_zero, &result_var);
1238                         else if (cmp_numerics(operand, bound2) >= 0)
1239                                 add_var(&count_var, &const_one, &result_var);
1240                         else
1241                                 compute_bucket(operand, bound1, bound2,
1242                                                            &count_var, &result_var);
1243                         break;
1244
1245                         /* bound1 > bound2 */
1246                 case 1:
1247                         if (cmp_numerics(operand, bound1) > 0)
1248                                 set_var_from_var(&const_zero, &result_var);
1249                         else if (cmp_numerics(operand, bound2) <= 0)
1250                                 add_var(&count_var, &const_one, &result_var);
1251                         else
1252                                 compute_bucket(operand, bound1, bound2,
1253                                                            &count_var, &result_var);
1254                         break;
1255         }
1256
1257         /* if result exceeds the range of a legal int4, we ereport here */
1258         result = numericvar_to_int4(&result_var);
1259
1260         free_var(&count_var);
1261         free_var(&result_var);
1262
1263         PG_RETURN_INT32(result);
1264 }
1265
1266 /*
1267  * If 'operand' is not outside the bucket range, determine the correct
1268  * bucket for it to go. The calculations performed by this function
1269  * are derived directly from the SQL2003 spec.
1270  */
1271 static void
1272 compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
1273                            NumericVar *count_var, NumericVar *result_var)
1274 {
1275         NumericVar      bound1_var;
1276         NumericVar      bound2_var;
1277         NumericVar      operand_var;
1278
1279         init_var_from_num(bound1, &bound1_var);
1280         init_var_from_num(bound2, &bound2_var);
1281         init_var_from_num(operand, &operand_var);
1282
1283         if (cmp_var(&bound1_var, &bound2_var) < 0)
1284         {
1285                 sub_var(&operand_var, &bound1_var, &operand_var);
1286                 sub_var(&bound2_var, &bound1_var, &bound2_var);
1287                 div_var(&operand_var, &bound2_var, result_var,
1288                                 select_div_scale(&operand_var, &bound2_var), true);
1289         }
1290         else
1291         {
1292                 sub_var(&bound1_var, &operand_var, &operand_var);
1293                 sub_var(&bound1_var, &bound2_var, &bound1_var);
1294                 div_var(&operand_var, &bound1_var, result_var,
1295                                 select_div_scale(&operand_var, &bound1_var), true);
1296         }
1297
1298         mul_var(result_var, count_var, result_var,
1299                         result_var->dscale + count_var->dscale);
1300         add_var(result_var, &const_one, result_var);
1301         floor_var(result_var, result_var);
1302
1303         free_var(&bound1_var);
1304         free_var(&bound2_var);
1305         free_var(&operand_var);
1306 }
1307
1308 /* ----------------------------------------------------------------------
1309  *
1310  * Comparison functions
1311  *
1312  * Note: btree indexes need these routines not to leak memory; therefore,
1313  * be careful to free working copies of toasted datums.  Most places don't
1314  * need to be so careful.
1315  * ----------------------------------------------------------------------
1316  */
1317
1318
1319 Datum
1320 numeric_cmp(PG_FUNCTION_ARGS)
1321 {
1322         Numeric         num1 = PG_GETARG_NUMERIC(0);
1323         Numeric         num2 = PG_GETARG_NUMERIC(1);
1324         int                     result;
1325
1326         result = cmp_numerics(num1, num2);
1327
1328         PG_FREE_IF_COPY(num1, 0);
1329         PG_FREE_IF_COPY(num2, 1);
1330
1331         PG_RETURN_INT32(result);
1332 }
1333
1334
1335 Datum
1336 numeric_eq(PG_FUNCTION_ARGS)
1337 {
1338         Numeric         num1 = PG_GETARG_NUMERIC(0);
1339         Numeric         num2 = PG_GETARG_NUMERIC(1);
1340         bool            result;
1341
1342         result = cmp_numerics(num1, num2) == 0;
1343
1344         PG_FREE_IF_COPY(num1, 0);
1345         PG_FREE_IF_COPY(num2, 1);
1346
1347         PG_RETURN_BOOL(result);
1348 }
1349
1350 Datum
1351 numeric_ne(PG_FUNCTION_ARGS)
1352 {
1353         Numeric         num1 = PG_GETARG_NUMERIC(0);
1354         Numeric         num2 = PG_GETARG_NUMERIC(1);
1355         bool            result;
1356
1357         result = cmp_numerics(num1, num2) != 0;
1358
1359         PG_FREE_IF_COPY(num1, 0);
1360         PG_FREE_IF_COPY(num2, 1);
1361
1362         PG_RETURN_BOOL(result);
1363 }
1364
1365 Datum
1366 numeric_gt(PG_FUNCTION_ARGS)
1367 {
1368         Numeric         num1 = PG_GETARG_NUMERIC(0);
1369         Numeric         num2 = PG_GETARG_NUMERIC(1);
1370         bool            result;
1371
1372         result = cmp_numerics(num1, num2) > 0;
1373
1374         PG_FREE_IF_COPY(num1, 0);
1375         PG_FREE_IF_COPY(num2, 1);
1376
1377         PG_RETURN_BOOL(result);
1378 }
1379
1380 Datum
1381 numeric_ge(PG_FUNCTION_ARGS)
1382 {
1383         Numeric         num1 = PG_GETARG_NUMERIC(0);
1384         Numeric         num2 = PG_GETARG_NUMERIC(1);
1385         bool            result;
1386
1387         result = cmp_numerics(num1, num2) >= 0;
1388
1389         PG_FREE_IF_COPY(num1, 0);
1390         PG_FREE_IF_COPY(num2, 1);
1391
1392         PG_RETURN_BOOL(result);
1393 }
1394
1395 Datum
1396 numeric_lt(PG_FUNCTION_ARGS)
1397 {
1398         Numeric         num1 = PG_GETARG_NUMERIC(0);
1399         Numeric         num2 = PG_GETARG_NUMERIC(1);
1400         bool            result;
1401
1402         result = cmp_numerics(num1, num2) < 0;
1403
1404         PG_FREE_IF_COPY(num1, 0);
1405         PG_FREE_IF_COPY(num2, 1);
1406
1407         PG_RETURN_BOOL(result);
1408 }
1409
1410 Datum
1411 numeric_le(PG_FUNCTION_ARGS)
1412 {
1413         Numeric         num1 = PG_GETARG_NUMERIC(0);
1414         Numeric         num2 = PG_GETARG_NUMERIC(1);
1415         bool            result;
1416
1417         result = cmp_numerics(num1, num2) <= 0;
1418
1419         PG_FREE_IF_COPY(num1, 0);
1420         PG_FREE_IF_COPY(num2, 1);
1421
1422         PG_RETURN_BOOL(result);
1423 }
1424
1425 static int
1426 cmp_numerics(Numeric num1, Numeric num2)
1427 {
1428         int                     result;
1429
1430         /*
1431          * We consider all NANs to be equal and larger than any non-NAN. This is
1432          * somewhat arbitrary; the important thing is to have a consistent sort
1433          * order.
1434          */
1435         if (NUMERIC_IS_NAN(num1))
1436         {
1437                 if (NUMERIC_IS_NAN(num2))
1438                         result = 0;                     /* NAN = NAN */
1439                 else
1440                         result = 1;                     /* NAN > non-NAN */
1441         }
1442         else if (NUMERIC_IS_NAN(num2))
1443         {
1444                 result = -1;                    /* non-NAN < NAN */
1445         }
1446         else
1447         {
1448                 result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
1449                                                                 NUMERIC_WEIGHT(num1), NUMERIC_SIGN(num1),
1450                                                                 NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
1451                                                                 NUMERIC_WEIGHT(num2), NUMERIC_SIGN(num2));
1452         }
1453
1454         return result;
1455 }
1456
1457 Datum
1458 hash_numeric(PG_FUNCTION_ARGS)
1459 {
1460         Numeric         key = PG_GETARG_NUMERIC(0);
1461         Datum           digit_hash;
1462         Datum           result;
1463         int                     weight;
1464         int                     start_offset;
1465         int                     end_offset;
1466         int                     i;
1467         int                     hash_len;
1468         NumericDigit *digits;
1469
1470         /* If it's NaN, don't try to hash the rest of the fields */
1471         if (NUMERIC_IS_NAN(key))
1472                 PG_RETURN_UINT32(0);
1473
1474         weight = NUMERIC_WEIGHT(key);
1475         start_offset = 0;
1476         end_offset = 0;
1477
1478         /*
1479          * Omit any leading or trailing zeros from the input to the hash. The
1480          * numeric implementation *should* guarantee that leading and trailing
1481          * zeros are suppressed, but we're paranoid. Note that we measure the
1482          * starting and ending offsets in units of NumericDigits, not bytes.
1483          */
1484         digits = NUMERIC_DIGITS(key);
1485         for (i = 0; i < NUMERIC_NDIGITS(key); i++)
1486         {
1487                 if (digits[i] != (NumericDigit) 0)
1488                         break;
1489
1490                 start_offset++;
1491
1492                 /*
1493                  * The weight is effectively the # of digits before the decimal point,
1494                  * so decrement it for each leading zero we skip.
1495                  */
1496                 weight--;
1497         }
1498
1499         /*
1500          * If there are no non-zero digits, then the value of the number is zero,
1501          * regardless of any other fields.
1502          */
1503         if (NUMERIC_NDIGITS(key) == start_offset)
1504                 PG_RETURN_UINT32(-1);
1505
1506         for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--)
1507         {
1508                 if (digits[i] != (NumericDigit) 0)
1509                         break;
1510
1511                 end_offset++;
1512         }
1513
1514         /* If we get here, there should be at least one non-zero digit */
1515         Assert(start_offset + end_offset < NUMERIC_NDIGITS(key));
1516
1517         /*
1518          * Note that we don't hash on the Numeric's scale, since two numerics can
1519          * compare equal but have different scales. We also don't hash on the
1520          * sign, although we could: since a sign difference implies inequality,
1521          * this shouldn't affect correctness.
1522          */
1523         hash_len = NUMERIC_NDIGITS(key) - start_offset - end_offset;
1524         digit_hash = hash_any((unsigned char *) (NUMERIC_DIGITS(key) + start_offset),
1525                                                   hash_len * sizeof(NumericDigit));
1526
1527         /* Mix in the weight, via XOR */
1528         result = digit_hash ^ weight;
1529
1530         PG_RETURN_DATUM(result);
1531 }
1532
1533
1534 /* ----------------------------------------------------------------------
1535  *
1536  * Basic arithmetic functions
1537  *
1538  * ----------------------------------------------------------------------
1539  */
1540
1541
1542 /*
1543  * numeric_add() -
1544  *
1545  *      Add two numerics
1546  */
1547 Datum
1548 numeric_add(PG_FUNCTION_ARGS)
1549 {
1550         Numeric         num1 = PG_GETARG_NUMERIC(0);
1551         Numeric         num2 = PG_GETARG_NUMERIC(1);
1552         NumericVar      arg1;
1553         NumericVar      arg2;
1554         NumericVar      result;
1555         Numeric         res;
1556
1557         /*
1558          * Handle NaN
1559          */
1560         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1561                 PG_RETURN_NUMERIC(make_result(&const_nan));
1562
1563         /*
1564          * Unpack the values, let add_var() compute the result and return it.
1565          */
1566         init_var_from_num(num1, &arg1);
1567         init_var_from_num(num2, &arg2);
1568
1569         init_var(&result);
1570         add_var(&arg1, &arg2, &result);
1571
1572         res = make_result(&result);
1573
1574         free_var(&result);
1575
1576         PG_RETURN_NUMERIC(res);
1577 }
1578
1579
1580 /*
1581  * numeric_sub() -
1582  *
1583  *      Subtract one numeric from another
1584  */
1585 Datum
1586 numeric_sub(PG_FUNCTION_ARGS)
1587 {
1588         Numeric         num1 = PG_GETARG_NUMERIC(0);
1589         Numeric         num2 = PG_GETARG_NUMERIC(1);
1590         NumericVar      arg1;
1591         NumericVar      arg2;
1592         NumericVar      result;
1593         Numeric         res;
1594
1595         /*
1596          * Handle NaN
1597          */
1598         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1599                 PG_RETURN_NUMERIC(make_result(&const_nan));
1600
1601         /*
1602          * Unpack the values, let sub_var() compute the result and return it.
1603          */
1604         init_var_from_num(num1, &arg1);
1605         init_var_from_num(num2, &arg2);
1606
1607         init_var(&result);
1608         sub_var(&arg1, &arg2, &result);
1609
1610         res = make_result(&result);
1611
1612         free_var(&result);
1613
1614         PG_RETURN_NUMERIC(res);
1615 }
1616
1617
1618 /*
1619  * numeric_mul() -
1620  *
1621  *      Calculate the product of two numerics
1622  */
1623 Datum
1624 numeric_mul(PG_FUNCTION_ARGS)
1625 {
1626         Numeric         num1 = PG_GETARG_NUMERIC(0);
1627         Numeric         num2 = PG_GETARG_NUMERIC(1);
1628         NumericVar      arg1;
1629         NumericVar      arg2;
1630         NumericVar      result;
1631         Numeric         res;
1632
1633         /*
1634          * Handle NaN
1635          */
1636         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1637                 PG_RETURN_NUMERIC(make_result(&const_nan));
1638
1639         /*
1640          * Unpack the values, let mul_var() compute the result and return it.
1641          * Unlike add_var() and sub_var(), mul_var() will round its result. In the
1642          * case of numeric_mul(), which is invoked for the * operator on numerics,
1643          * we request exact representation for the product (rscale = sum(dscale of
1644          * arg1, dscale of arg2)).
1645          */
1646         init_var_from_num(num1, &arg1);
1647         init_var_from_num(num2, &arg2);
1648
1649         init_var(&result);
1650         mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
1651
1652         res = make_result(&result);
1653
1654         free_var(&result);
1655
1656         PG_RETURN_NUMERIC(res);
1657 }
1658
1659
1660 /*
1661  * numeric_div() -
1662  *
1663  *      Divide one numeric into another
1664  */
1665 Datum
1666 numeric_div(PG_FUNCTION_ARGS)
1667 {
1668         Numeric         num1 = PG_GETARG_NUMERIC(0);
1669         Numeric         num2 = PG_GETARG_NUMERIC(1);
1670         NumericVar      arg1;
1671         NumericVar      arg2;
1672         NumericVar      result;
1673         Numeric         res;
1674         int                     rscale;
1675
1676         /*
1677          * Handle NaN
1678          */
1679         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1680                 PG_RETURN_NUMERIC(make_result(&const_nan));
1681
1682         /*
1683          * Unpack the arguments
1684          */
1685         init_var_from_num(num1, &arg1);
1686         init_var_from_num(num2, &arg2);
1687
1688         init_var(&result);
1689
1690         /*
1691          * Select scale for division result
1692          */
1693         rscale = select_div_scale(&arg1, &arg2);
1694
1695         /*
1696          * Do the divide and return the result
1697          */
1698         div_var(&arg1, &arg2, &result, rscale, true);
1699
1700         res = make_result(&result);
1701
1702         free_var(&result);
1703
1704         PG_RETURN_NUMERIC(res);
1705 }
1706
1707
1708 /*
1709  * numeric_div_trunc() -
1710  *
1711  *      Divide one numeric into another, truncating the result to an integer
1712  */
1713 Datum
1714 numeric_div_trunc(PG_FUNCTION_ARGS)
1715 {
1716         Numeric         num1 = PG_GETARG_NUMERIC(0);
1717         Numeric         num2 = PG_GETARG_NUMERIC(1);
1718         NumericVar      arg1;
1719         NumericVar      arg2;
1720         NumericVar      result;
1721         Numeric         res;
1722
1723         /*
1724          * Handle NaN
1725          */
1726         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1727                 PG_RETURN_NUMERIC(make_result(&const_nan));
1728
1729         /*
1730          * Unpack the arguments
1731          */
1732         init_var_from_num(num1, &arg1);
1733         init_var_from_num(num2, &arg2);
1734
1735         init_var(&result);
1736
1737         /*
1738          * Do the divide and return the result
1739          */
1740         div_var(&arg1, &arg2, &result, 0, false);
1741
1742         res = make_result(&result);
1743
1744         free_var(&result);
1745
1746         PG_RETURN_NUMERIC(res);
1747 }
1748
1749
1750 /*
1751  * numeric_mod() -
1752  *
1753  *      Calculate the modulo of two numerics
1754  */
1755 Datum
1756 numeric_mod(PG_FUNCTION_ARGS)
1757 {
1758         Numeric         num1 = PG_GETARG_NUMERIC(0);
1759         Numeric         num2 = PG_GETARG_NUMERIC(1);
1760         Numeric         res;
1761         NumericVar      arg1;
1762         NumericVar      arg2;
1763         NumericVar      result;
1764
1765         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
1766                 PG_RETURN_NUMERIC(make_result(&const_nan));
1767
1768         init_var_from_num(num1, &arg1);
1769         init_var_from_num(num2, &arg2);
1770
1771         init_var(&result);
1772
1773         mod_var(&arg1, &arg2, &result);
1774
1775         res = make_result(&result);
1776
1777         free_var(&result);
1778
1779         PG_RETURN_NUMERIC(res);
1780 }
1781
1782
1783 /*
1784  * numeric_inc() -
1785  *
1786  *      Increment a number by one
1787  */
1788 Datum
1789 numeric_inc(PG_FUNCTION_ARGS)
1790 {
1791         Numeric         num = PG_GETARG_NUMERIC(0);
1792         NumericVar      arg;
1793         Numeric         res;
1794
1795         /*
1796          * Handle NaN
1797          */
1798         if (NUMERIC_IS_NAN(num))
1799                 PG_RETURN_NUMERIC(make_result(&const_nan));
1800
1801         /*
1802          * Compute the result and return it
1803          */
1804         init_var_from_num(num, &arg);
1805
1806         add_var(&arg, &const_one, &arg);
1807
1808         res = make_result(&arg);
1809
1810         free_var(&arg);
1811
1812         PG_RETURN_NUMERIC(res);
1813 }
1814
1815
1816 /*
1817  * numeric_smaller() -
1818  *
1819  *      Return the smaller of two numbers
1820  */
1821 Datum
1822 numeric_smaller(PG_FUNCTION_ARGS)
1823 {
1824         Numeric         num1 = PG_GETARG_NUMERIC(0);
1825         Numeric         num2 = PG_GETARG_NUMERIC(1);
1826
1827         /*
1828          * Use cmp_numerics so that this will agree with the comparison operators,
1829          * particularly as regards comparisons involving NaN.
1830          */
1831         if (cmp_numerics(num1, num2) < 0)
1832                 PG_RETURN_NUMERIC(num1);
1833         else
1834                 PG_RETURN_NUMERIC(num2);
1835 }
1836
1837
1838 /*
1839  * numeric_larger() -
1840  *
1841  *      Return the larger of two numbers
1842  */
1843 Datum
1844 numeric_larger(PG_FUNCTION_ARGS)
1845 {
1846         Numeric         num1 = PG_GETARG_NUMERIC(0);
1847         Numeric         num2 = PG_GETARG_NUMERIC(1);
1848
1849         /*
1850          * Use cmp_numerics so that this will agree with the comparison operators,
1851          * particularly as regards comparisons involving NaN.
1852          */
1853         if (cmp_numerics(num1, num2) > 0)
1854                 PG_RETURN_NUMERIC(num1);
1855         else
1856                 PG_RETURN_NUMERIC(num2);
1857 }
1858
1859
1860 /* ----------------------------------------------------------------------
1861  *
1862  * Advanced math functions
1863  *
1864  * ----------------------------------------------------------------------
1865  */
1866
1867 /*
1868  * numeric_fac()
1869  *
1870  * Compute factorial
1871  */
1872 Datum
1873 numeric_fac(PG_FUNCTION_ARGS)
1874 {
1875         int64           num = PG_GETARG_INT64(0);
1876         Numeric         res;
1877         NumericVar      fact;
1878         NumericVar      result;
1879
1880         if (num <= 1)
1881         {
1882                 res = make_result(&const_one);
1883                 PG_RETURN_NUMERIC(res);
1884         }
1885         /* Fail immediately if the result would overflow */
1886         if (num > 32177)
1887                 ereport(ERROR,
1888                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1889                                  errmsg("value overflows numeric format")));
1890
1891         init_var(&fact);
1892         init_var(&result);
1893
1894         int8_to_numericvar(num, &result);
1895
1896         for (num = num - 1; num > 1; num--)
1897         {
1898                 /* this loop can take awhile, so allow it to be interrupted */
1899                 CHECK_FOR_INTERRUPTS();
1900
1901                 int8_to_numericvar(num, &fact);
1902
1903                 mul_var(&result, &fact, &result, 0);
1904         }
1905
1906         res = make_result(&result);
1907
1908         free_var(&fact);
1909         free_var(&result);
1910
1911         PG_RETURN_NUMERIC(res);
1912 }
1913
1914
1915 /*
1916  * numeric_sqrt() -
1917  *
1918  *      Compute the square root of a numeric.
1919  */
1920 Datum
1921 numeric_sqrt(PG_FUNCTION_ARGS)
1922 {
1923         Numeric         num = PG_GETARG_NUMERIC(0);
1924         Numeric         res;
1925         NumericVar      arg;
1926         NumericVar      result;
1927         int                     sweight;
1928         int                     rscale;
1929
1930         /*
1931          * Handle NaN
1932          */
1933         if (NUMERIC_IS_NAN(num))
1934                 PG_RETURN_NUMERIC(make_result(&const_nan));
1935
1936         /*
1937          * Unpack the argument and determine the result scale.  We choose a scale
1938          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1939          * case not less than the input's dscale.
1940          */
1941         init_var_from_num(num, &arg);
1942
1943         init_var(&result);
1944
1945         /* Assume the input was normalized, so arg.weight is accurate */
1946         sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
1947
1948         rscale = NUMERIC_MIN_SIG_DIGITS - sweight;
1949         rscale = Max(rscale, arg.dscale);
1950         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
1951         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
1952
1953         /*
1954          * Let sqrt_var() do the calculation and return the result.
1955          */
1956         sqrt_var(&arg, &result, rscale);
1957
1958         res = make_result(&result);
1959
1960         free_var(&result);
1961
1962         PG_RETURN_NUMERIC(res);
1963 }
1964
1965
1966 /*
1967  * numeric_exp() -
1968  *
1969  *      Raise e to the power of x
1970  */
1971 Datum
1972 numeric_exp(PG_FUNCTION_ARGS)
1973 {
1974         Numeric         num = PG_GETARG_NUMERIC(0);
1975         Numeric         res;
1976         NumericVar      arg;
1977         NumericVar      result;
1978         int                     rscale;
1979         double          val;
1980
1981         /*
1982          * Handle NaN
1983          */
1984         if (NUMERIC_IS_NAN(num))
1985                 PG_RETURN_NUMERIC(make_result(&const_nan));
1986
1987         /*
1988          * Unpack the argument and determine the result scale.  We choose a scale
1989          * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
1990          * case not less than the input's dscale.
1991          */
1992         init_var_from_num(num, &arg);
1993
1994         init_var(&result);
1995
1996         /* convert input to float8, ignoring overflow */
1997         val = numericvar_to_double_no_overflow(&arg);
1998
1999         /*
2000          * log10(result) = num * log10(e), so this is approximately the decimal
2001          * weight of the result:
2002          */
2003         val *= 0.434294481903252;
2004
2005         /* limit to something that won't cause integer overflow */
2006         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
2007         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
2008
2009         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
2010         rscale = Max(rscale, arg.dscale);
2011         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
2012         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
2013
2014         /*
2015          * Let exp_var() do the calculation and return the result.
2016          */
2017         exp_var(&arg, &result, rscale);
2018
2019         res = make_result(&result);
2020
2021         free_var(&result);
2022
2023         PG_RETURN_NUMERIC(res);
2024 }
2025
2026
2027 /*
2028  * numeric_ln() -
2029  *
2030  *      Compute the natural logarithm of x
2031  */
2032 Datum
2033 numeric_ln(PG_FUNCTION_ARGS)
2034 {
2035         Numeric         num = PG_GETARG_NUMERIC(0);
2036         Numeric         res;
2037         NumericVar      arg;
2038         NumericVar      result;
2039         int                     dec_digits;
2040         int                     rscale;
2041
2042         /*
2043          * Handle NaN
2044          */
2045         if (NUMERIC_IS_NAN(num))
2046                 PG_RETURN_NUMERIC(make_result(&const_nan));
2047
2048         init_var_from_num(num, &arg);
2049         init_var(&result);
2050
2051         /* Approx decimal digits before decimal point */
2052         dec_digits = (arg.weight + 1) * DEC_DIGITS;
2053
2054         if (dec_digits > 1)
2055                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
2056         else if (dec_digits < 1)
2057                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
2058         else
2059                 rscale = NUMERIC_MIN_SIG_DIGITS;
2060
2061         rscale = Max(rscale, arg.dscale);
2062         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
2063         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
2064
2065         ln_var(&arg, &result, rscale);
2066
2067         res = make_result(&result);
2068
2069         free_var(&result);
2070
2071         PG_RETURN_NUMERIC(res);
2072 }
2073
2074
2075 /*
2076  * numeric_log() -
2077  *
2078  *      Compute the logarithm of x in a given base
2079  */
2080 Datum
2081 numeric_log(PG_FUNCTION_ARGS)
2082 {
2083         Numeric         num1 = PG_GETARG_NUMERIC(0);
2084         Numeric         num2 = PG_GETARG_NUMERIC(1);
2085         Numeric         res;
2086         NumericVar      arg1;
2087         NumericVar      arg2;
2088         NumericVar      result;
2089
2090         /*
2091          * Handle NaN
2092          */
2093         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
2094                 PG_RETURN_NUMERIC(make_result(&const_nan));
2095
2096         /*
2097          * Initialize things
2098          */
2099         init_var_from_num(num1, &arg1);
2100         init_var_from_num(num2, &arg2);
2101         init_var(&result);
2102
2103         /*
2104          * Call log_var() to compute and return the result; note it handles scale
2105          * selection itself.
2106          */
2107         log_var(&arg1, &arg2, &result);
2108
2109         res = make_result(&result);
2110
2111         free_var(&result);
2112
2113         PG_RETURN_NUMERIC(res);
2114 }
2115
2116
2117 /*
2118  * numeric_power() -
2119  *
2120  *      Raise b to the power of x
2121  */
2122 Datum
2123 numeric_power(PG_FUNCTION_ARGS)
2124 {
2125         Numeric         num1 = PG_GETARG_NUMERIC(0);
2126         Numeric         num2 = PG_GETARG_NUMERIC(1);
2127         Numeric         res;
2128         NumericVar      arg1;
2129         NumericVar      arg2;
2130         NumericVar      arg2_trunc;
2131         NumericVar      result;
2132
2133         /*
2134          * Handle NaN
2135          */
2136         if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
2137                 PG_RETURN_NUMERIC(make_result(&const_nan));
2138
2139         /*
2140          * Initialize things
2141          */
2142         init_var(&arg2_trunc);
2143         init_var(&result);
2144         init_var_from_num(num1, &arg1);
2145         init_var_from_num(num2, &arg2);
2146
2147         set_var_from_var(&arg2, &arg2_trunc);
2148         trunc_var(&arg2_trunc, 0);
2149
2150         /*
2151          * The SQL spec requires that we emit a particular SQLSTATE error code for
2152          * certain error conditions.  Specifically, we don't return a
2153          * divide-by-zero error code for 0 ^ -1.
2154          */
2155         if (cmp_var(&arg1, &const_zero) == 0 &&
2156                 cmp_var(&arg2, &const_zero) < 0)
2157                 ereport(ERROR,
2158                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
2159                                  errmsg("zero raised to a negative power is undefined")));
2160
2161         if (cmp_var(&arg1, &const_zero) < 0 &&
2162                 cmp_var(&arg2, &arg2_trunc) != 0)
2163                 ereport(ERROR,
2164                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
2165                                  errmsg("a negative number raised to a non-integer power yields a complex result")));
2166
2167         /*
2168          * Call power_var() to compute and return the result; note it handles
2169          * scale selection itself.
2170          */
2171         power_var(&arg1, &arg2, &result);
2172
2173         res = make_result(&result);
2174
2175         free_var(&result);
2176         free_var(&arg2_trunc);
2177
2178         PG_RETURN_NUMERIC(res);
2179 }
2180
2181
2182 /* ----------------------------------------------------------------------
2183  *
2184  * Type conversion functions
2185  *
2186  * ----------------------------------------------------------------------
2187  */
2188
2189
2190 Datum
2191 int4_numeric(PG_FUNCTION_ARGS)
2192 {
2193         int32           val = PG_GETARG_INT32(0);
2194         Numeric         res;
2195         NumericVar      result;
2196
2197         init_var(&result);
2198
2199         int8_to_numericvar((int64) val, &result);
2200
2201         res = make_result(&result);
2202
2203         free_var(&result);
2204
2205         PG_RETURN_NUMERIC(res);
2206 }
2207
2208
2209 Datum
2210 numeric_int4(PG_FUNCTION_ARGS)
2211 {
2212         Numeric         num = PG_GETARG_NUMERIC(0);
2213         NumericVar      x;
2214         int32           result;
2215
2216         /* XXX would it be better to return NULL? */
2217         if (NUMERIC_IS_NAN(num))
2218                 ereport(ERROR,
2219                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2220                                  errmsg("cannot convert NaN to integer")));
2221
2222         /* Convert to variable format, then convert to int4 */
2223         init_var_from_num(num, &x);
2224         result = numericvar_to_int4(&x);
2225         PG_RETURN_INT32(result);
2226 }
2227
2228 /*
2229  * Given a NumericVar, convert it to an int32. If the NumericVar
2230  * exceeds the range of an int32, raise the appropriate error via
2231  * ereport(). The input NumericVar is *not* free'd.
2232  */
2233 static int32
2234 numericvar_to_int4(NumericVar *var)
2235 {
2236         int32           result;
2237         int64           val;
2238
2239         if (!numericvar_to_int8(var, &val))
2240                 ereport(ERROR,
2241                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2242                                  errmsg("integer out of range")));
2243
2244         /* Down-convert to int4 */
2245         result = (int32) val;
2246
2247         /* Test for overflow by reverse-conversion. */
2248         if ((int64) result != val)
2249                 ereport(ERROR,
2250                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2251                                  errmsg("integer out of range")));
2252
2253         return result;
2254 }
2255
2256 Datum
2257 int8_numeric(PG_FUNCTION_ARGS)
2258 {
2259         int64           val = PG_GETARG_INT64(0);
2260         Numeric         res;
2261         NumericVar      result;
2262
2263         init_var(&result);
2264
2265         int8_to_numericvar(val, &result);
2266
2267         res = make_result(&result);
2268
2269         free_var(&result);
2270
2271         PG_RETURN_NUMERIC(res);
2272 }
2273
2274
2275 Datum
2276 numeric_int8(PG_FUNCTION_ARGS)
2277 {
2278         Numeric         num = PG_GETARG_NUMERIC(0);
2279         NumericVar      x;
2280         int64           result;
2281
2282         /* XXX would it be better to return NULL? */
2283         if (NUMERIC_IS_NAN(num))
2284                 ereport(ERROR,
2285                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2286                                  errmsg("cannot convert NaN to bigint")));
2287
2288         /* Convert to variable format and thence to int8 */
2289         init_var_from_num(num, &x);
2290
2291         if (!numericvar_to_int8(&x, &result))
2292                 ereport(ERROR,
2293                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2294                                  errmsg("bigint out of range")));
2295
2296         PG_RETURN_INT64(result);
2297 }
2298
2299
2300 Datum
2301 int2_numeric(PG_FUNCTION_ARGS)
2302 {
2303         int16           val = PG_GETARG_INT16(0);
2304         Numeric         res;
2305         NumericVar      result;
2306
2307         init_var(&result);
2308
2309         int8_to_numericvar((int64) val, &result);
2310
2311         res = make_result(&result);
2312
2313         free_var(&result);
2314
2315         PG_RETURN_NUMERIC(res);
2316 }
2317
2318
2319 Datum
2320 numeric_int2(PG_FUNCTION_ARGS)
2321 {
2322         Numeric         num = PG_GETARG_NUMERIC(0);
2323         NumericVar      x;
2324         int64           val;
2325         int16           result;
2326
2327         /* XXX would it be better to return NULL? */
2328         if (NUMERIC_IS_NAN(num))
2329                 ereport(ERROR,
2330                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2331                                  errmsg("cannot convert NaN to smallint")));
2332
2333         /* Convert to variable format and thence to int8 */
2334         init_var_from_num(num, &x);
2335
2336         if (!numericvar_to_int8(&x, &val))
2337                 ereport(ERROR,
2338                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2339                                  errmsg("smallint out of range")));
2340
2341         /* Down-convert to int2 */
2342         result = (int16) val;
2343
2344         /* Test for overflow by reverse-conversion. */
2345         if ((int64) result != val)
2346                 ereport(ERROR,
2347                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
2348                                  errmsg("smallint out of range")));
2349
2350         PG_RETURN_INT16(result);
2351 }
2352
2353
2354 Datum
2355 float8_numeric(PG_FUNCTION_ARGS)
2356 {
2357         float8          val = PG_GETARG_FLOAT8(0);
2358         Numeric         res;
2359         NumericVar      result;
2360         char            buf[DBL_DIG + 100];
2361
2362         if (isnan(val))
2363                 PG_RETURN_NUMERIC(make_result(&const_nan));
2364
2365         sprintf(buf, "%.*g", DBL_DIG, val);
2366
2367         init_var(&result);
2368
2369         /* Assume we need not worry about leading/trailing spaces */
2370         (void) set_var_from_str(buf, buf, &result);
2371
2372         res = make_result(&result);
2373
2374         free_var(&result);
2375
2376         PG_RETURN_NUMERIC(res);
2377 }
2378
2379
2380 Datum
2381 numeric_float8(PG_FUNCTION_ARGS)
2382 {
2383         Numeric         num = PG_GETARG_NUMERIC(0);
2384         char       *tmp;
2385         Datum           result;
2386
2387         if (NUMERIC_IS_NAN(num))
2388                 PG_RETURN_FLOAT8(get_float8_nan());
2389
2390         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2391                                                                                           NumericGetDatum(num)));
2392
2393         result = DirectFunctionCall1(float8in, CStringGetDatum(tmp));
2394
2395         pfree(tmp);
2396
2397         PG_RETURN_DATUM(result);
2398 }
2399
2400
2401 /* Convert numeric to float8; if out of range, return +/- HUGE_VAL */
2402 Datum
2403 numeric_float8_no_overflow(PG_FUNCTION_ARGS)
2404 {
2405         Numeric         num = PG_GETARG_NUMERIC(0);
2406         double          val;
2407
2408         if (NUMERIC_IS_NAN(num))
2409                 PG_RETURN_FLOAT8(get_float8_nan());
2410
2411         val = numeric_to_double_no_overflow(num);
2412
2413         PG_RETURN_FLOAT8(val);
2414 }
2415
2416 Datum
2417 float4_numeric(PG_FUNCTION_ARGS)
2418 {
2419         float4          val = PG_GETARG_FLOAT4(0);
2420         Numeric         res;
2421         NumericVar      result;
2422         char            buf[FLT_DIG + 100];
2423
2424         if (isnan(val))
2425                 PG_RETURN_NUMERIC(make_result(&const_nan));
2426
2427         sprintf(buf, "%.*g", FLT_DIG, val);
2428
2429         init_var(&result);
2430
2431         /* Assume we need not worry about leading/trailing spaces */
2432         (void) set_var_from_str(buf, buf, &result);
2433
2434         res = make_result(&result);
2435
2436         free_var(&result);
2437
2438         PG_RETURN_NUMERIC(res);
2439 }
2440
2441
2442 Datum
2443 numeric_float4(PG_FUNCTION_ARGS)
2444 {
2445         Numeric         num = PG_GETARG_NUMERIC(0);
2446         char       *tmp;
2447         Datum           result;
2448
2449         if (NUMERIC_IS_NAN(num))
2450                 PG_RETURN_FLOAT4(get_float4_nan());
2451
2452         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
2453                                                                                           NumericGetDatum(num)));
2454
2455         result = DirectFunctionCall1(float4in, CStringGetDatum(tmp));
2456
2457         pfree(tmp);
2458
2459         PG_RETURN_DATUM(result);
2460 }
2461
2462
2463 /* ----------------------------------------------------------------------
2464  *
2465  * Aggregate functions
2466  *
2467  * The transition datatype for all these aggregates is declared as INTERNAL.
2468  * Actually, it's a pointer to a NumericAggState allocated in the aggregate
2469  * context.  The digit buffers for the NumericVars will be there too.
2470  *
2471  * Note that the transition functions don't bother to create a NumericAggState
2472  * until they see the first non-null input value; therefore, the final
2473  * functions will never see N == 0.  (The case is represented as a NULL
2474  * state pointer, instead.)
2475  *
2476  * ----------------------------------------------------------------------
2477  */
2478
2479 typedef struct NumericAggState
2480 {
2481         bool            calcSumX2;              /* if true, calculate sumX2 */
2482         bool            isNaN;                  /* true if any processed number was NaN */
2483         MemoryContext agg_context;      /* context we're calculating in */
2484         int64           N;                              /* count of processed numbers */
2485         NumericVar      sumX;                   /* sum of processed numbers */
2486         NumericVar      sumX2;                  /* sum of squares of processed numbers */
2487 } NumericAggState;
2488
2489 /*
2490  * Prepare state data for a numeric aggregate function that needs to compute
2491  * sum, count and optionally sum of squares of the input.
2492  */
2493 static NumericAggState *
2494 makeNumericAggState(FunctionCallInfo fcinfo, bool calcSumX2)
2495 {
2496         NumericAggState *state;
2497         MemoryContext agg_context;
2498         MemoryContext old_context;
2499
2500         if (!AggCheckCallContext(fcinfo, &agg_context))
2501                 elog(ERROR, "aggregate function called in non-aggregate context");
2502
2503         old_context = MemoryContextSwitchTo(agg_context);
2504
2505         state = (NumericAggState *) palloc0(sizeof(NumericAggState));
2506         state->calcSumX2 = calcSumX2;
2507         state->agg_context = agg_context;
2508
2509         MemoryContextSwitchTo(old_context);
2510
2511         return state;
2512 }
2513
2514 /*
2515  * Accumulate a new input value for numeric aggregate functions.
2516  */
2517 static void
2518 do_numeric_accum(NumericAggState *state, Numeric newval)
2519 {
2520         NumericVar      X;
2521         NumericVar      X2;
2522         MemoryContext old_context;
2523
2524         /* result is NaN if any processed number is NaN */
2525         if (state->isNaN || NUMERIC_IS_NAN(newval))
2526         {
2527                 state->isNaN = true;
2528                 return;
2529         }
2530
2531         /* load processed number in short-lived context */
2532         init_var_from_num(newval, &X);
2533
2534         /* if we need X^2, calculate that in short-lived context */
2535         if (state->calcSumX2)
2536         {
2537                 init_var(&X2);
2538                 mul_var(&X, &X, &X2, X.dscale * 2);
2539         }
2540
2541         /* The rest of this needs to work in the aggregate context */
2542         old_context = MemoryContextSwitchTo(state->agg_context);
2543
2544         if (state->N++ > 0)
2545         {
2546                 /* Accumulate sums */
2547                 add_var(&X, &(state->sumX), &(state->sumX));
2548
2549                 if (state->calcSumX2)
2550                         add_var(&X2, &(state->sumX2), &(state->sumX2));
2551         }
2552         else
2553         {
2554                 /* First input, so initialize sums */
2555                 set_var_from_var(&X, &(state->sumX));
2556
2557                 if (state->calcSumX2)
2558                         set_var_from_var(&X2, &(state->sumX2));
2559         }
2560
2561         MemoryContextSwitchTo(old_context);
2562 }
2563
2564 /*
2565  * Generic transition function for numeric aggregates that require sumX2.
2566  */
2567 Datum
2568 numeric_accum(PG_FUNCTION_ARGS)
2569 {
2570         NumericAggState *state;
2571
2572         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2573
2574         if (!PG_ARGISNULL(1))
2575         {
2576                 /* Create the state data when we see the first non-null input. */
2577                 if (state == NULL)
2578                         state = makeNumericAggState(fcinfo, true);
2579
2580                 do_numeric_accum(state, PG_GETARG_NUMERIC(1));
2581         }
2582
2583         PG_RETURN_POINTER(state);
2584 }
2585
2586 /*
2587  * Generic transition function for numeric aggregates that don't require sumX2.
2588  */
2589 Datum
2590 numeric_avg_accum(PG_FUNCTION_ARGS)
2591 {
2592         NumericAggState *state;
2593
2594         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2595
2596         if (!PG_ARGISNULL(1))
2597         {
2598                 /* Create the state data when we see the first non-null input. */
2599                 if (state == NULL)
2600                         state = makeNumericAggState(fcinfo, false);
2601
2602                 do_numeric_accum(state, PG_GETARG_NUMERIC(1));
2603         }
2604
2605         PG_RETURN_POINTER(state);
2606 }
2607
2608 /*
2609  * Integer data types all use Numeric accumulators to share code and
2610  * avoid risk of overflow.      For int2 and int4 inputs, Numeric accumulation
2611  * is overkill for the N and sum(X) values, but definitely not overkill
2612  * for the sum(X*X) value.      Hence, we use int2_accum and int4_accum only
2613  * for stddev/variance --- there are faster special-purpose accumulator
2614  * routines for SUM and AVG of these datatypes.
2615  */
2616
2617 Datum
2618 int2_accum(PG_FUNCTION_ARGS)
2619 {
2620         NumericAggState *state;
2621
2622         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2623
2624         if (!PG_ARGISNULL(1))
2625         {
2626                 Numeric         newval;
2627
2628                 newval = DatumGetNumeric(DirectFunctionCall1(int2_numeric,
2629                                                                                                          PG_GETARG_DATUM(1)));
2630
2631                 /* Create the state data when we see the first non-null input. */
2632                 if (state == NULL)
2633                         state = makeNumericAggState(fcinfo, true);
2634
2635                 do_numeric_accum(state, newval);
2636         }
2637
2638         PG_RETURN_POINTER(state);
2639 }
2640
2641 Datum
2642 int4_accum(PG_FUNCTION_ARGS)
2643 {
2644         NumericAggState *state;
2645
2646         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2647
2648         if (!PG_ARGISNULL(1))
2649         {
2650                 Numeric         newval;
2651
2652                 newval = DatumGetNumeric(DirectFunctionCall1(int4_numeric,
2653                                                                                                          PG_GETARG_DATUM(1)));
2654
2655                 /* Create the state data when we see the first non-null input. */
2656                 if (state == NULL)
2657                         state = makeNumericAggState(fcinfo, true);
2658
2659                 do_numeric_accum(state, newval);
2660         }
2661
2662         PG_RETURN_POINTER(state);
2663 }
2664
2665 Datum
2666 int8_accum(PG_FUNCTION_ARGS)
2667 {
2668         NumericAggState *state;
2669
2670         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2671
2672         if (!PG_ARGISNULL(1))
2673         {
2674                 Numeric         newval;
2675
2676                 newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric,
2677                                                                                                          PG_GETARG_DATUM(1)));
2678
2679                 /* Create the state data when we see the first non-null input. */
2680                 if (state == NULL)
2681                         state = makeNumericAggState(fcinfo, true);
2682
2683                 do_numeric_accum(state, newval);
2684         }
2685
2686         PG_RETURN_POINTER(state);
2687 }
2688
2689 /*
2690  * Transition function for int8 input when we don't need sumX2.
2691  */
2692 Datum
2693 int8_avg_accum(PG_FUNCTION_ARGS)
2694 {
2695         NumericAggState *state;
2696
2697         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2698
2699         if (!PG_ARGISNULL(1))
2700         {
2701                 Numeric         newval;
2702
2703                 newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric,
2704                                                                                                          PG_GETARG_DATUM(1)));
2705
2706                 /* Create the state data when we see the first non-null input. */
2707                 if (state == NULL)
2708                         state = makeNumericAggState(fcinfo, false);
2709
2710                 do_numeric_accum(state, newval);
2711         }
2712
2713         PG_RETURN_POINTER(state);
2714 }
2715
2716
2717 Datum
2718 numeric_avg(PG_FUNCTION_ARGS)
2719 {
2720         NumericAggState *state;
2721         Datum           N_datum;
2722         Datum           sumX_datum;
2723
2724         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2725         if (state == NULL)                      /* there were no non-null inputs */
2726                 PG_RETURN_NULL();
2727
2728         if (state->isNaN)                       /* there was at least one NaN input */
2729                 PG_RETURN_NUMERIC(make_result(&const_nan));
2730
2731         N_datum = DirectFunctionCall1(int8_numeric, Int64GetDatum(state->N));
2732         sumX_datum = NumericGetDatum(make_result(&state->sumX));
2733
2734         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumX_datum, N_datum));
2735 }
2736
2737 Datum
2738 numeric_sum(PG_FUNCTION_ARGS)
2739 {
2740         NumericAggState *state;
2741
2742         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2743         if (state == NULL)                      /* there were no non-null inputs */
2744                 PG_RETURN_NULL();
2745
2746         if (state->isNaN)                       /* there was at least one NaN input */
2747                 PG_RETURN_NUMERIC(make_result(&const_nan));
2748
2749         PG_RETURN_NUMERIC(make_result(&(state->sumX)));
2750 }
2751
2752 /*
2753  * Workhorse routine for the standard deviance and variance
2754  * aggregates. 'state' is aggregate's transition state.
2755  * 'variance' specifies whether we should calculate the
2756  * variance or the standard deviation. 'sample' indicates whether the
2757  * caller is interested in the sample or the population
2758  * variance/stddev.
2759  *
2760  * If appropriate variance statistic is undefined for the input,
2761  * *is_null is set to true and NULL is returned.
2762  */
2763 static Numeric
2764 numeric_stddev_internal(NumericAggState *state,
2765                                                 bool variance, bool sample,
2766                                                 bool *is_null)
2767 {
2768         Numeric         res;
2769         NumericVar      vN,
2770                                 vsumX,
2771                                 vsumX2,
2772                                 vNminus1;
2773         NumericVar *comp;
2774         int                     rscale;
2775
2776         /* Deal with empty input and NaN-input cases */
2777         if (state == NULL)
2778         {
2779                 *is_null = true;
2780                 return NULL;
2781         }
2782
2783         *is_null = false;
2784
2785         if (state->isNaN)
2786                 return make_result(&const_nan);
2787
2788         init_var(&vN);
2789         init_var(&vsumX);
2790         init_var(&vsumX2);
2791
2792         int8_to_numericvar(state->N, &vN);
2793         set_var_from_var(&(state->sumX), &vsumX);
2794         set_var_from_var(&(state->sumX2), &vsumX2);
2795
2796         /*
2797          * Sample stddev and variance are undefined when N <= 1; population stddev
2798          * is undefined when N == 0. Return NULL in either case.
2799          */
2800         if (sample)
2801                 comp = &const_one;
2802         else
2803                 comp = &const_zero;
2804
2805         if (cmp_var(&vN, comp) <= 0)
2806         {
2807                 *is_null = true;
2808                 return NULL;
2809         }
2810
2811         init_var(&vNminus1);
2812         sub_var(&vN, &const_one, &vNminus1);
2813
2814         /* compute rscale for mul_var calls */
2815         rscale = vsumX.dscale * 2;
2816
2817         mul_var(&vsumX, &vsumX, &vsumX, rscale);        /* vsumX = sumX * sumX */
2818         mul_var(&vN, &vsumX2, &vsumX2, rscale);         /* vsumX2 = N * sumX2 */
2819         sub_var(&vsumX2, &vsumX, &vsumX2);      /* N * sumX2 - sumX * sumX */
2820
2821         if (cmp_var(&vsumX2, &const_zero) <= 0)
2822         {
2823                 /* Watch out for roundoff error producing a negative numerator */
2824                 res = make_result(&const_zero);
2825         }
2826         else
2827         {
2828                 if (sample)
2829                         mul_var(&vN, &vNminus1, &vNminus1, 0);          /* N * (N - 1) */
2830                 else
2831                         mul_var(&vN, &vN, &vNminus1, 0);        /* N * N */
2832                 rscale = select_div_scale(&vsumX2, &vNminus1);
2833                 div_var(&vsumX2, &vNminus1, &vsumX, rscale, true);              /* variance */
2834                 if (!variance)
2835                         sqrt_var(&vsumX, &vsumX, rscale);       /* stddev */
2836
2837                 res = make_result(&vsumX);
2838         }
2839
2840         free_var(&vNminus1);
2841         free_var(&vsumX);
2842         free_var(&vsumX2);
2843
2844         return res;
2845 }
2846
2847 Datum
2848 numeric_var_samp(PG_FUNCTION_ARGS)
2849 {
2850         NumericAggState *state;
2851         Numeric         res;
2852         bool            is_null;
2853
2854         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2855
2856         res = numeric_stddev_internal(state, true, true, &is_null);
2857
2858         if (is_null)
2859                 PG_RETURN_NULL();
2860         else
2861                 PG_RETURN_NUMERIC(res);
2862 }
2863
2864 Datum
2865 numeric_stddev_samp(PG_FUNCTION_ARGS)
2866 {
2867         NumericAggState *state;
2868         Numeric         res;
2869         bool            is_null;
2870
2871         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2872
2873         res = numeric_stddev_internal(state, false, true, &is_null);
2874
2875         if (is_null)
2876                 PG_RETURN_NULL();
2877         else
2878                 PG_RETURN_NUMERIC(res);
2879 }
2880
2881 Datum
2882 numeric_var_pop(PG_FUNCTION_ARGS)
2883 {
2884         NumericAggState *state;
2885         Numeric         res;
2886         bool            is_null;
2887
2888         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2889
2890         res = numeric_stddev_internal(state, true, false, &is_null);
2891
2892         if (is_null)
2893                 PG_RETURN_NULL();
2894         else
2895                 PG_RETURN_NUMERIC(res);
2896 }
2897
2898 Datum
2899 numeric_stddev_pop(PG_FUNCTION_ARGS)
2900 {
2901         NumericAggState *state;
2902         Numeric         res;
2903         bool            is_null;
2904
2905         state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
2906
2907         res = numeric_stddev_internal(state, false, false, &is_null);
2908
2909         if (is_null)
2910                 PG_RETURN_NULL();
2911         else
2912                 PG_RETURN_NUMERIC(res);
2913 }
2914
2915 /*
2916  * SUM transition functions for integer datatypes.
2917  *
2918  * To avoid overflow, we use accumulators wider than the input datatype.
2919  * A Numeric accumulator is needed for int8 input; for int4 and int2
2920  * inputs, we use int8 accumulators which should be sufficient for practical
2921  * purposes.  (The latter two therefore don't really belong in this file,
2922  * but we keep them here anyway.)
2923  *
2924  * Because SQL defines the SUM() of no values to be NULL, not zero,
2925  * the initial condition of the transition data value needs to be NULL. This
2926  * means we can't rely on ExecAgg to automatically insert the first non-null
2927  * data value into the transition data: it doesn't know how to do the type
2928  * conversion.  The upshot is that these routines have to be marked non-strict
2929  * and handle substitution of the first non-null input themselves.
2930  */
2931
2932 Datum
2933 int2_sum(PG_FUNCTION_ARGS)
2934 {
2935         int64           newval;
2936
2937         if (PG_ARGISNULL(0))
2938         {
2939                 /* No non-null input seen so far... */
2940                 if (PG_ARGISNULL(1))
2941                         PG_RETURN_NULL();       /* still no non-null */
2942                 /* This is the first non-null input. */
2943                 newval = (int64) PG_GETARG_INT16(1);
2944                 PG_RETURN_INT64(newval);
2945         }
2946
2947         /*
2948          * If we're invoked as an aggregate, we can cheat and modify our first
2949          * parameter in-place to avoid palloc overhead. If not, we need to return
2950          * the new value of the transition variable. (If int8 is pass-by-value,
2951          * then of course this is useless as well as incorrect, so just ifdef it
2952          * out.)
2953          */
2954 #ifndef USE_FLOAT8_BYVAL                /* controls int8 too */
2955         if (AggCheckCallContext(fcinfo, NULL))
2956         {
2957                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
2958
2959                 /* Leave the running sum unchanged in the new input is null */
2960                 if (!PG_ARGISNULL(1))
2961                         *oldsum = *oldsum + (int64) PG_GETARG_INT16(1);
2962
2963                 PG_RETURN_POINTER(oldsum);
2964         }
2965         else
2966 #endif
2967         {
2968                 int64           oldsum = PG_GETARG_INT64(0);
2969
2970                 /* Leave sum unchanged if new input is null. */
2971                 if (PG_ARGISNULL(1))
2972                         PG_RETURN_INT64(oldsum);
2973
2974                 /* OK to do the addition. */
2975                 newval = oldsum + (int64) PG_GETARG_INT16(1);
2976
2977                 PG_RETURN_INT64(newval);
2978         }
2979 }
2980
2981 Datum
2982 int4_sum(PG_FUNCTION_ARGS)
2983 {
2984         int64           newval;
2985
2986         if (PG_ARGISNULL(0))
2987         {
2988                 /* No non-null input seen so far... */
2989                 if (PG_ARGISNULL(1))
2990                         PG_RETURN_NULL();       /* still no non-null */
2991                 /* This is the first non-null input. */
2992                 newval = (int64) PG_GETARG_INT32(1);
2993                 PG_RETURN_INT64(newval);
2994         }
2995
2996         /*
2997          * If we're invoked as an aggregate, we can cheat and modify our first
2998          * parameter in-place to avoid palloc overhead. If not, we need to return
2999          * the new value of the transition variable. (If int8 is pass-by-value,
3000          * then of course this is useless as well as incorrect, so just ifdef it
3001          * out.)
3002          */
3003 #ifndef USE_FLOAT8_BYVAL                /* controls int8 too */
3004         if (AggCheckCallContext(fcinfo, NULL))
3005         {
3006                 int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
3007
3008                 /* Leave the running sum unchanged in the new input is null */
3009                 if (!PG_ARGISNULL(1))
3010                         *oldsum = *oldsum + (int64) PG_GETARG_INT32(1);
3011
3012                 PG_RETURN_POINTER(oldsum);
3013         }
3014         else
3015 #endif
3016         {
3017                 int64           oldsum = PG_GETARG_INT64(0);
3018
3019                 /* Leave sum unchanged if new input is null. */
3020                 if (PG_ARGISNULL(1))
3021                         PG_RETURN_INT64(oldsum);
3022
3023                 /* OK to do the addition. */
3024                 newval = oldsum + (int64) PG_GETARG_INT32(1);
3025
3026                 PG_RETURN_INT64(newval);
3027         }
3028 }
3029
3030 /*
3031  * Note: this function is obsolete, it's no longer used for SUM(int8).
3032  */
3033 Datum
3034 int8_sum(PG_FUNCTION_ARGS)
3035 {
3036         Numeric         oldsum;
3037         Datum           newval;
3038
3039         if (PG_ARGISNULL(0))
3040         {
3041                 /* No non-null input seen so far... */
3042                 if (PG_ARGISNULL(1))
3043                         PG_RETURN_NULL();       /* still no non-null */
3044                 /* This is the first non-null input. */
3045                 newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
3046                 PG_RETURN_DATUM(newval);
3047         }
3048
3049         /*
3050          * Note that we cannot special-case the aggregate case here, as we do for
3051          * int2_sum and int4_sum: numeric is of variable size, so we cannot modify
3052          * our first parameter in-place.
3053          */
3054
3055         oldsum = PG_GETARG_NUMERIC(0);
3056
3057         /* Leave sum unchanged if new input is null. */
3058         if (PG_ARGISNULL(1))
3059                 PG_RETURN_NUMERIC(oldsum);
3060
3061         /* OK to do the addition. */
3062         newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
3063
3064         PG_RETURN_DATUM(DirectFunctionCall2(numeric_add,
3065                                                                                 NumericGetDatum(oldsum), newval));
3066 }
3067
3068
3069 /*
3070  * Routines for avg(int2) and avg(int4).  The transition datatype
3071  * is a two-element int8 array, holding count and sum.
3072  */
3073
3074 typedef struct Int8TransTypeData
3075 {
3076         int64           count;
3077         int64           sum;
3078 } Int8TransTypeData;
3079
3080 Datum
3081 int2_avg_accum(PG_FUNCTION_ARGS)
3082 {
3083         ArrayType  *transarray;
3084         int16           newval = PG_GETARG_INT16(1);
3085         Int8TransTypeData *transdata;
3086
3087         /*
3088          * If we're invoked as an aggregate, we can cheat and modify our first
3089          * parameter in-place to reduce palloc overhead. Otherwise we need to make
3090          * a copy of it before scribbling on it.
3091          */
3092         if (AggCheckCallContext(fcinfo, NULL))
3093                 transarray = PG_GETARG_ARRAYTYPE_P(0);
3094         else
3095                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
3096
3097         if (ARR_HASNULL(transarray) ||
3098                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
3099                 elog(ERROR, "expected 2-element int8 array");
3100
3101         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
3102         transdata->count++;
3103         transdata->sum += newval;
3104
3105         PG_RETURN_ARRAYTYPE_P(transarray);
3106 }
3107
3108 Datum
3109 int4_avg_accum(PG_FUNCTION_ARGS)
3110 {
3111         ArrayType  *transarray;
3112         int32           newval = PG_GETARG_INT32(1);
3113         Int8TransTypeData *transdata;
3114
3115         /*
3116          * If we're invoked as an aggregate, we can cheat and modify our first
3117          * parameter in-place to reduce palloc overhead. Otherwise we need to make
3118          * a copy of it before scribbling on it.
3119          */
3120         if (AggCheckCallContext(fcinfo, NULL))
3121                 transarray = PG_GETARG_ARRAYTYPE_P(0);
3122         else
3123                 transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
3124
3125         if (ARR_HASNULL(transarray) ||
3126                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
3127                 elog(ERROR, "expected 2-element int8 array");
3128
3129         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
3130         transdata->count++;
3131         transdata->sum += newval;
3132
3133         PG_RETURN_ARRAYTYPE_P(transarray);
3134 }
3135
3136 Datum
3137 int8_avg(PG_FUNCTION_ARGS)
3138 {
3139         ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
3140         Int8TransTypeData *transdata;
3141         Datum           countd,
3142                                 sumd;
3143
3144         if (ARR_HASNULL(transarray) ||
3145                 ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
3146                 elog(ERROR, "expected 2-element int8 array");
3147         transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
3148
3149         /* SQL defines AVG of no values to be NULL */
3150         if (transdata->count == 0)
3151                 PG_RETURN_NULL();
3152
3153         countd = DirectFunctionCall1(int8_numeric,
3154                                                                  Int64GetDatumFast(transdata->count));
3155         sumd = DirectFunctionCall1(int8_numeric,
3156                                                            Int64GetDatumFast(transdata->sum));
3157
3158         PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumd, countd));
3159 }
3160
3161
3162 /* ----------------------------------------------------------------------
3163  *
3164  * Debug support
3165  *
3166  * ----------------------------------------------------------------------
3167  */
3168
3169 #ifdef NUMERIC_DEBUG
3170
3171 /*
3172  * dump_numeric() - Dump a value in the db storage format for debugging
3173  */
3174 static void
3175 dump_numeric(const char *str, Numeric num)
3176 {
3177         NumericDigit *digits = NUMERIC_DIGITS(num);
3178         int                     ndigits;
3179         int                     i;
3180
3181         ndigits = NUMERIC_NDIGITS(num);
3182
3183         printf("%s: NUMERIC w=%d d=%d ", str,
3184                    NUMERIC_WEIGHT(num), NUMERIC_DSCALE(num));
3185         switch (NUMERIC_SIGN(num))
3186         {
3187                 case NUMERIC_POS:
3188                         printf("POS");
3189                         break;
3190                 case NUMERIC_NEG:
3191                         printf("NEG");
3192                         break;
3193                 case NUMERIC_NAN:
3194                         printf("NaN");
3195                         break;
3196                 default:
3197                         printf("SIGN=0x%x", NUMERIC_SIGN(num));
3198                         break;
3199         }
3200
3201         for (i = 0; i < ndigits; i++)
3202                 printf(" %0*d", DEC_DIGITS, digits[i]);
3203         printf("\n");
3204 }
3205
3206
3207 /*
3208  * dump_var() - Dump a value in the variable format for debugging
3209  */
3210 static void
3211 dump_var(const char *str, NumericVar *var)
3212 {
3213         int                     i;
3214
3215         printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale);
3216         switch (var->sign)
3217         {
3218                 case NUMERIC_POS:
3219                         printf("POS");
3220                         break;
3221                 case NUMERIC_NEG:
3222                         printf("NEG");
3223                         break;
3224                 case NUMERIC_NAN:
3225                         printf("NaN");
3226                         break;
3227                 default:
3228                         printf("SIGN=0x%x", var->sign);
3229                         break;
3230         }
3231
3232         for (i = 0; i < var->ndigits; i++)
3233                 printf(" %0*d", DEC_DIGITS, var->digits[i]);
3234
3235         printf("\n");
3236 }
3237 #endif   /* NUMERIC_DEBUG */
3238
3239
3240 /* ----------------------------------------------------------------------
3241  *
3242  * Local functions follow
3243  *
3244  * In general, these do not support NaNs --- callers must eliminate
3245  * the possibility of NaN first.  (make_result() is an exception.)
3246  *
3247  * ----------------------------------------------------------------------
3248  */
3249
3250
3251 /*
3252  * alloc_var() -
3253  *
3254  *      Allocate a digit buffer of ndigits digits (plus a spare digit for rounding)
3255  */
3256 static void
3257 alloc_var(NumericVar *var, int ndigits)
3258 {
3259         digitbuf_free(var->buf);
3260         var->buf = digitbuf_alloc(ndigits + 1);
3261         var->buf[0] = 0;                        /* spare digit for rounding */
3262         var->digits = var->buf + 1;
3263         var->ndigits = ndigits;
3264 }
3265
3266
3267 /*
3268  * free_var() -
3269  *
3270  *      Return the digit buffer of a variable to the free pool
3271  */
3272 static void
3273 free_var(NumericVar *var)
3274 {
3275         digitbuf_free(var->buf);
3276         var->buf = NULL;
3277         var->digits = NULL;
3278         var->sign = NUMERIC_NAN;
3279 }
3280
3281
3282 /*
3283  * zero_var() -
3284  *
3285  *      Set a variable to ZERO.
3286  *      Note: its dscale is not touched.
3287  */
3288 static void
3289 zero_var(NumericVar *var)
3290 {
3291         digitbuf_free(var->buf);
3292         var->buf = NULL;
3293         var->digits = NULL;
3294         var->ndigits = 0;
3295         var->weight = 0;                        /* by convention; doesn't really matter */
3296         var->sign = NUMERIC_POS;        /* anything but NAN... */
3297 }
3298
3299
3300 /*
3301  * set_var_from_str()
3302  *
3303  *      Parse a string and put the number into a variable
3304  *
3305  * This function does not handle leading or trailing spaces, and it doesn't
3306  * accept "NaN" either.  It returns the end+1 position so that caller can
3307  * check for trailing spaces/garbage if deemed necessary.
3308  *
3309  * cp is the place to actually start parsing; str is what to use in error
3310  * reports.  (Typically cp would be the same except advanced over spaces.)
3311  */
3312 static const char *
3313 set_var_from_str(const char *str, const char *cp, NumericVar *dest)
3314 {
3315         bool            have_dp = FALSE;
3316         int                     i;
3317         unsigned char *decdigits;
3318         int                     sign = NUMERIC_POS;
3319         int                     dweight = -1;
3320         int                     ddigits;
3321         int                     dscale = 0;
3322         int                     weight;
3323         int                     ndigits;
3324         int                     offset;
3325         NumericDigit *digits;
3326
3327         /*
3328          * We first parse the string to extract decimal digits and determine the
3329          * correct decimal weight.      Then convert to NBASE representation.
3330          */
3331         switch (*cp)
3332         {
3333                 case '+':
3334                         sign = NUMERIC_POS;
3335                         cp++;
3336                         break;
3337
3338                 case '-':
3339                         sign = NUMERIC_NEG;
3340                         cp++;
3341                         break;
3342         }
3343
3344         if (*cp == '.')
3345         {
3346                 have_dp = TRUE;
3347                 cp++;
3348         }
3349
3350         if (!isdigit((unsigned char) *cp))
3351                 ereport(ERROR,
3352                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3353                           errmsg("invalid input syntax for type numeric: \"%s\"", str)));
3354
3355         decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS * 2);
3356
3357         /* leading padding for digit alignment later */
3358         memset(decdigits, 0, DEC_DIGITS);
3359         i = DEC_DIGITS;
3360
3361         while (*cp)
3362         {
3363                 if (isdigit((unsigned char) *cp))
3364                 {
3365                         decdigits[i++] = *cp++ - '0';
3366                         if (!have_dp)
3367                                 dweight++;
3368                         else
3369                                 dscale++;
3370                 }
3371                 else if (*cp == '.')
3372                 {
3373                         if (have_dp)
3374                                 ereport(ERROR,
3375                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3376                                           errmsg("invalid input syntax for type numeric: \"%s\"",
3377                                                          str)));
3378                         have_dp = TRUE;
3379                         cp++;
3380                 }
3381                 else
3382                         break;
3383         }
3384
3385         ddigits = i - DEC_DIGITS;
3386         /* trailing padding for digit alignment later */
3387         memset(decdigits + i, 0, DEC_DIGITS - 1);
3388
3389         /* Handle exponent, if any */
3390         if (*cp == 'e' || *cp == 'E')
3391         {
3392                 long            exponent;
3393                 char       *endptr;
3394
3395                 cp++;
3396                 exponent = strtol(cp, &endptr, 10);
3397                 if (endptr == cp)
3398                         ereport(ERROR,
3399                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3400                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3401                                                         str)));
3402                 cp = endptr;
3403                 if (exponent > NUMERIC_MAX_PRECISION ||
3404                         exponent < -NUMERIC_MAX_PRECISION)
3405                         ereport(ERROR,
3406                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
3407                                          errmsg("invalid input syntax for type numeric: \"%s\"",
3408                                                         str)));
3409                 dweight += (int) exponent;
3410                 dscale -= (int) exponent;
3411                 if (dscale < 0)
3412                         dscale = 0;
3413         }
3414
3415         /*
3416          * Okay, convert pure-decimal representation to base NBASE.  First we need
3417          * to determine the converted weight and ndigits.  offset is the number of
3418          * decimal zeroes to insert before the first given digit to have a
3419          * correctly aligned first NBASE digit.
3420          */
3421         if (dweight >= 0)
3422                 weight = (dweight + 1 + DEC_DIGITS - 1) / DEC_DIGITS - 1;
3423         else
3424                 weight = -((-dweight - 1) / DEC_DIGITS + 1);
3425         offset = (weight + 1) * DEC_DIGITS - (dweight + 1);
3426         ndigits = (ddigits + offset + DEC_DIGITS - 1) / DEC_DIGITS;
3427
3428         alloc_var(dest, ndigits);
3429         dest->sign = sign;
3430         dest->weight = weight;
3431         dest->dscale = dscale;
3432
3433         i = DEC_DIGITS - offset;
3434         digits = dest->digits;
3435
3436         while (ndigits-- > 0)
3437         {
3438 #if DEC_DIGITS == 4
3439                 *digits++ = ((decdigits[i] * 10 + decdigits[i + 1]) * 10 +
3440                                          decdigits[i + 2]) * 10 + decdigits[i + 3];
3441 #elif DEC_DIGITS == 2
3442                 *digits++ = decdigits[i] * 10 + decdigits[i + 1];
3443 #elif DEC_DIGITS == 1
3444                 *digits++ = decdigits[i];
3445 #else
3446 #error unsupported NBASE
3447 #endif
3448                 i += DEC_DIGITS;
3449         }
3450
3451         pfree(decdigits);
3452
3453         /* Strip any leading/trailing zeroes, and normalize weight if zero */
3454         strip_var(dest);
3455
3456         /* Return end+1 position for caller */
3457         return cp;
3458 }
3459
3460
3461 /*
3462  * set_var_from_num() -
3463  *
3464  *      Convert the packed db format into a variable
3465  */
3466 static void
3467 set_var_from_num(Numeric num, NumericVar *dest)
3468 {
3469         int                     ndigits;
3470
3471         ndigits = NUMERIC_NDIGITS(num);
3472
3473         alloc_var(dest, ndigits);
3474
3475         dest->weight = NUMERIC_WEIGHT(num);
3476         dest->sign = NUMERIC_SIGN(num);
3477         dest->dscale = NUMERIC_DSCALE(num);
3478
3479         memcpy(dest->digits, NUMERIC_DIGITS(num), ndigits * sizeof(NumericDigit));
3480 }
3481
3482
3483 /*
3484  * init_var_from_num() -
3485  *
3486  *      Initialize a variable from packed db format. The digits array is not
3487  *      copied, which saves some cycles when the resulting var is not modified.
3488  *      Also, there's no need to call free_var(), as long as you don't assign any
3489  *      other value to it (with set_var_* functions, or by using the var as the
3490  *      destination of a function like add_var())
3491  *
3492  *      CAUTION: Do not modify the digits buffer of a var initialized with this
3493  *      function, e.g by calling round_var() or trunc_var(), as the changes will
3494  *      propagate to the original Numeric! It's OK to use it as the destination
3495  *      argument of one of the calculational functions, though.
3496  */
3497 static void
3498 init_var_from_num(Numeric num, NumericVar *dest)
3499 {
3500         dest->ndigits = NUMERIC_NDIGITS(num);
3501         dest->weight = NUMERIC_WEIGHT(num);
3502         dest->sign = NUMERIC_SIGN(num);
3503         dest->dscale = NUMERIC_DSCALE(num);
3504         dest->digits = NUMERIC_DIGITS(num);
3505         dest->buf = NULL;                       /* digits array is not palloc'd */
3506 }
3507
3508
3509 /*
3510  * set_var_from_var() -
3511  *
3512  *      Copy one variable into another
3513  */
3514 static void
3515 set_var_from_var(NumericVar *value, NumericVar *dest)
3516 {
3517         NumericDigit *newbuf;
3518
3519         newbuf = digitbuf_alloc(value->ndigits + 1);
3520         newbuf[0] = 0;                          /* spare digit for rounding */
3521         memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit));
3522
3523         digitbuf_free(dest->buf);
3524
3525         memmove(dest, value, sizeof(NumericVar));
3526         dest->buf = newbuf;
3527         dest->digits = newbuf + 1;
3528 }
3529
3530
3531 /*
3532  * get_str_from_var() -
3533  *
3534  *      Convert a var to text representation (guts of numeric_out).
3535  *      The var is displayed to the number of digits indicated by its dscale.
3536  *      Returns a palloc'd string.
3537  */
3538 static char *
3539 get_str_from_var(NumericVar *var)
3540 {
3541         int                     dscale;
3542         char       *str;
3543         char       *cp;
3544         char       *endcp;
3545         int                     i;
3546         int                     d;
3547         NumericDigit dig;
3548
3549 #if DEC_DIGITS > 1
3550         NumericDigit d1;
3551 #endif
3552
3553         dscale = var->dscale;
3554
3555         /*
3556          * Allocate space for the result.
3557          *
3558          * i is set to the # of decimal digits before decimal point. dscale is the
3559          * # of decimal digits we will print after decimal point. We may generate
3560          * as many as DEC_DIGITS-1 excess digits at the end, and in addition we
3561          * need room for sign, decimal point, null terminator.
3562          */
3563         i = (var->weight + 1) * DEC_DIGITS;
3564         if (i <= 0)
3565                 i = 1;
3566
3567         str = palloc(i + dscale + DEC_DIGITS + 2);
3568         cp = str;
3569
3570         /*
3571          * Output a dash for negative values
3572          */
3573         if (var->sign == NUMERIC_NEG)
3574                 *cp++ = '-';
3575
3576         /*
3577          * Output all digits before the decimal point
3578          */
3579         if (var->weight < 0)
3580         {
3581                 d = var->weight + 1;
3582                 *cp++ = '0';
3583         }
3584         else
3585         {
3586                 for (d = 0; d <= var->weight; d++)
3587                 {
3588                         dig = (d < var->ndigits) ? var->digits[d] : 0;
3589                         /* In the first digit, suppress extra leading decimal zeroes */
3590 #if DEC_DIGITS == 4
3591                         {
3592                                 bool            putit = (d > 0);
3593
3594                                 d1 = dig / 1000;
3595                                 dig -= d1 * 1000;
3596                                 putit |= (d1 > 0);
3597                                 if (putit)
3598                                         *cp++ = d1 + '0';
3599                                 d1 = dig / 100;
3600                                 dig -= d1 * 100;
3601                                 putit |= (d1 > 0);
3602                                 if (putit)
3603                                         *cp++ = d1 + '0';
3604                                 d1 = dig / 10;
3605                                 dig -= d1 * 10;
3606                                 putit |= (d1 > 0);
3607                                 if (putit)
3608                                         *cp++ = d1 + '0';
3609                                 *cp++ = dig + '0';
3610                         }
3611 #elif DEC_DIGITS == 2
3612                         d1 = dig / 10;
3613                         dig -= d1 * 10;
3614                         if (d1 > 0 || d > 0)
3615                                 *cp++ = d1 + '0';
3616                         *cp++ = dig + '0';
3617 #elif DEC_DIGITS == 1
3618                         *cp++ = dig + '0';
3619 #else
3620 #error unsupported NBASE
3621 #endif
3622                 }
3623         }
3624
3625         /*
3626          * If requested, output a decimal point and all the digits that follow it.
3627          * We initially put out a multiple of DEC_DIGITS digits, then truncate if
3628          * needed.
3629          */
3630         if (dscale > 0)
3631         {
3632                 *cp++ = '.';
3633                 endcp = cp + dscale;
3634                 for (i = 0; i < dscale; d++, i += DEC_DIGITS)
3635                 {
3636                         dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0;
3637 #if DEC_DIGITS == 4
3638                         d1 = dig / 1000;
3639                         dig -= d1 * 1000;
3640                         *cp++ = d1 + '0';
3641                         d1 = dig / 100;
3642                         dig -= d1 * 100;
3643                         *cp++ = d1 + '0';
3644                         d1 = dig / 10;
3645                         dig -= d1 * 10;
3646                         *cp++ = d1 + '0';
3647                         *cp++ = dig + '0';
3648 #elif DEC_DIGITS == 2
3649                         d1 = dig / 10;
3650                         dig -= d1 * 10;
3651                         *cp++ = d1 + '0';
3652                         *cp++ = dig + '0';
3653 #elif DEC_DIGITS == 1
3654                         *cp++ = dig + '0';
3655 #else
3656 #error unsupported NBASE
3657 #endif
3658                 }
3659                 cp = endcp;
3660         }
3661
3662         /*
3663          * terminate the string and return it
3664          */
3665         *cp = '\0';
3666         return str;
3667 }
3668
3669 /*
3670  * get_str_from_var_sci() -
3671  *
3672  *      Convert a var to a normalised scientific notation text representation.
3673  *      This function does the heavy lifting for numeric_out_sci().
3674  *
3675  *      This notation has the general form a * 10^b, where a is known as the
3676  *      "significand" and b is known as the "exponent".
3677  *
3678  *      Because we can't do superscript in ASCII (and because we want to copy
3679  *      printf's behaviour) we display the exponent using E notation, with a
3680  *      minimum of two exponent digits.
3681  *
3682  *      For example, the value 1234 could be output as 1.2e+03.
3683  *
3684  *      We assume that the exponent can fit into an int32.
3685  *
3686  *      rscale is the number of decimal digits desired after the decimal point in
3687  *      the output, negative values will be treated as meaning zero.
3688  *
3689  *      Returns a palloc'd string.
3690  */
3691 static char *
3692 get_str_from_var_sci(NumericVar *var, int rscale)
3693 {
3694         int32           exponent;
3695         NumericVar      denominator;
3696         NumericVar      significand;
3697         int                     denom_scale;
3698         size_t          len;
3699         char       *str;
3700         char       *sig_out;
3701
3702         if (rscale < 0)
3703                 rscale = 0;
3704
3705         /*
3706          * Determine the exponent of this number in normalised form.
3707          *
3708          * This is the exponent required to represent the number with only one
3709          * significant digit before the decimal place.
3710          */
3711         if (var->ndigits > 0)
3712         {
3713                 exponent = (var->weight + 1) * DEC_DIGITS;
3714
3715                 /*
3716                  * Compensate for leading decimal zeroes in the first numeric digit by
3717                  * decrementing the exponent.
3718                  */
3719                 exponent -= DEC_DIGITS - (int) log10(var->digits[0]);
3720         }
3721         else
3722         {
3723                 /*
3724                  * If var has no digits, then it must be zero.
3725                  *
3726                  * Zero doesn't technically have a meaningful exponent in normalised
3727                  * notation, but we just display the exponent as zero for consistency
3728                  * of output.
3729                  */
3730                 exponent = 0;
3731         }
3732
3733         /*
3734          * The denominator is set to 10 raised to the power of the exponent.
3735          *
3736          * We then divide var by the denominator to get the significand, rounding
3737          * to rscale decimal digits in the process.
3738          */
3739         if (exponent < 0)
3740                 denom_scale = -exponent;
3741         else
3742                 denom_scale = 0;
3743
3744         init_var(&denominator);
3745         init_var(&significand);
3746
3747         power_var_int(&const_ten, exponent, &denominator, denom_scale);
3748         div_var(var, &denominator, &significand, rscale, true);
3749         sig_out = get_str_from_var(&significand);
3750
3751         free_var(&denominator);
3752         free_var(&significand);
3753
3754         /*
3755          * Allocate space for the result.
3756          *
3757          * In addition to the significand, we need room for the exponent
3758          * decoration ("e"), the sign of the exponent, up to 10 digits for the
3759          * exponent itself, and of course the null terminator.
3760          */
3761         len = strlen(sig_out) + 13;
3762         str = palloc(len);
3763         snprintf(str, len, "%se%+03d", sig_out, exponent);
3764
3765         pfree(sig_out);
3766
3767         return str;
3768 }
3769
3770
3771 /*
3772  * make_result() -
3773  *
3774  *      Create the packed db numeric format in palloc()'d memory from
3775  *      a variable.
3776  */
3777 static Numeric
3778 make_result(NumericVar *var)
3779 {
3780         Numeric         result;
3781         NumericDigit *digits = var->digits;
3782         int                     weight = var->weight;
3783         int                     sign = var->sign;
3784         int                     n;
3785         Size            len;
3786
3787         if (sign == NUMERIC_NAN)
3788         {
3789                 result = (Numeric) palloc(NUMERIC_HDRSZ_SHORT);
3790
3791                 SET_VARSIZE(result, NUMERIC_HDRSZ_SHORT);
3792                 result->choice.n_header = NUMERIC_NAN;
3793                 /* the header word is all we need */
3794
3795                 dump_numeric("make_result()", result);
3796                 return result;
3797         }
3798
3799         n = var->ndigits;
3800
3801         /* truncate leading zeroes */
3802         while (n > 0 && *digits == 0)
3803         {
3804                 digits++;
3805                 weight--;
3806                 n--;
3807         }
3808         /* truncate trailing zeroes */
3809         while (n > 0 && digits[n - 1] == 0)
3810                 n--;
3811
3812         /* If zero result, force to weight=0 and positive sign */
3813         if (n == 0)
3814         {
3815                 weight = 0;
3816                 sign = NUMERIC_POS;
3817         }
3818
3819         /* Build the result */
3820         if (NUMERIC_CAN_BE_SHORT(var->dscale, weight))
3821         {
3822                 len = NUMERIC_HDRSZ_SHORT + n * sizeof(NumericDigit);
3823                 result = (Numeric) palloc(len);
3824                 SET_VARSIZE(result, len);
3825                 result->choice.n_short.n_header =
3826                         (sign == NUMERIC_NEG ? (NUMERIC_SHORT | NUMERIC_SHORT_SIGN_MASK)
3827                          : NUMERIC_SHORT)
3828                         | (var->dscale << NUMERIC_SHORT_DSCALE_SHIFT)
3829                         | (weight < 0 ? NUMERIC_SHORT_WEIGHT_SIGN_MASK : 0)
3830                         | (weight & NUMERIC_SHORT_WEIGHT_MASK);
3831         }
3832         else
3833         {
3834                 len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
3835                 result = (Numeric) palloc(len);
3836                 SET_VARSIZE(result, len);
3837                 result->choice.n_long.n_sign_dscale =
3838                         sign | (var->dscale & NUMERIC_DSCALE_MASK);
3839                 result->choice.n_long.n_weight = weight;
3840         }
3841
3842         memcpy(NUMERIC_DIGITS(result), digits, n * sizeof(NumericDigit));
3843         Assert(NUMERIC_NDIGITS(result) == n);
3844
3845         /* Check for overflow of int16 fields */
3846         if (NUMERIC_WEIGHT(result) != weight ||
3847                 NUMERIC_DSCALE(result) != var->dscale)
3848                 ereport(ERROR,
3849                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3850                                  errmsg("value overflows numeric format")));
3851
3852         dump_numeric("make_result()", result);
3853         return result;
3854 }
3855
3856
3857 /*
3858  * apply_typmod() -
3859  *
3860  *      Do bounds checking and rounding according to the attributes
3861  *      typmod field.
3862  */
3863 static void
3864 apply_typmod(NumericVar *var, int32 typmod)
3865 {
3866         int                     precision;
3867         int                     scale;
3868         int                     maxdigits;
3869         int                     ddigits;
3870         int                     i;
3871
3872         /* Do nothing if we have a default typmod (-1) */
3873         if (typmod < (int32) (VARHDRSZ))
3874                 return;
3875
3876         typmod -= VARHDRSZ;
3877         precision = (typmod >> 16) & 0xffff;
3878         scale = typmod & 0xffff;
3879         maxdigits = precision - scale;
3880
3881         /* Round to target scale (and set var->dscale) */
3882         round_var(var, scale);
3883
3884         /*
3885          * Check for overflow - note we can't do this before rounding, because
3886          * rounding could raise the weight.  Also note that the var's weight could
3887          * be inflated by leading zeroes, which will be stripped before storage
3888          * but perhaps might not have been yet. In any case, we must recognize a
3889          * true zero, whose weight doesn't mean anything.
3890          */
3891         ddigits = (var->weight + 1) * DEC_DIGITS;
3892         if (ddigits > maxdigits)
3893         {
3894                 /* Determine true weight; and check for all-zero result */
3895                 for (i = 0; i < var->ndigits; i++)
3896                 {
3897                         NumericDigit dig = var->digits[i];
3898
3899                         if (dig)
3900                         {
3901                                 /* Adjust for any high-order decimal zero digits */
3902 #if DEC_DIGITS == 4
3903                                 if (dig < 10)
3904                                         ddigits -= 3;
3905                                 else if (dig < 100)
3906                                         ddigits -= 2;
3907                                 else if (dig < 1000)
3908                                         ddigits -= 1;
3909 #elif DEC_DIGITS == 2
3910                                 if (dig < 10)
3911                                         ddigits -= 1;
3912 #elif DEC_DIGITS == 1
3913                                 /* no adjustment */
3914 #else
3915 #error unsupported NBASE
3916 #endif
3917                                 if (ddigits > maxdigits)
3918                                         ereport(ERROR,
3919                                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
3920                                                          errmsg("numeric field overflow"),
3921                                                          errdetail("A field with precision %d, scale %d must round to an absolute value less than %s%d.",
3922                                                                            precision, scale,
3923                                         /* Display 10^0 as 1 */
3924                                                                            maxdigits ? "10^" : "",
3925                                                                            maxdigits ? maxdigits : 1
3926                                                                            )));
3927                                 break;
3928                         }
3929                         ddigits -= DEC_DIGITS;
3930                 }
3931         }
3932 }
3933
3934 /*
3935  * Convert numeric to int8, rounding if needed.
3936  *
3937  * If overflow, return FALSE (no error is raised).      Return TRUE if okay.
3938  */
3939 static bool
3940 numericvar_to_int8(NumericVar *var, int64 *result)
3941 {
3942         NumericDigit *digits;
3943         int                     ndigits;
3944         int                     weight;
3945         int                     i;
3946         int64           val,
3947                                 oldval;
3948         bool            neg;
3949         NumericVar      rounded;
3950
3951         /* Round to nearest integer */
3952         init_var(&rounded);
3953         set_var_from_var(var, &rounded);
3954         round_var(&rounded, 0);
3955
3956         /* Check for zero input */
3957         strip_var(&rounded);
3958         ndigits = rounded.ndigits;
3959         if (ndigits == 0)
3960         {
3961                 *result = 0;
3962                 free_var(&rounded);
3963                 return true;
3964         }
3965
3966         /*
3967          * For input like 10000000000, we must treat stripped digits as real. So
3968          * the loop assumes there are weight+1 digits before the decimal point.
3969          */
3970         weight = rounded.weight;
3971         Assert(weight >= 0 && ndigits <= weight + 1);
3972
3973         /* Construct the result */
3974         digits = rounded.digits;
3975         neg = (rounded.sign == NUMERIC_NEG);
3976         val = digits[0];
3977         for (i = 1; i <= weight; i++)
3978         {
3979                 oldval = val;
3980                 val *= NBASE;
3981                 if (i < ndigits)
3982                         val += digits[i];
3983
3984                 /*
3985                  * The overflow check is a bit tricky because we want to accept
3986                  * INT64_MIN, which will overflow the positive accumulator.  We can
3987                  * detect this case easily though because INT64_MIN is the only
3988                  * nonzero value for which -val == val (on a two's complement machine,
3989                  * anyway).
3990                  */
3991                 if ((val / NBASE) != oldval)    /* possible overflow? */
3992                 {
3993                         if (!neg || (-val) != val || val == 0 || oldval < 0)
3994                         {
3995                                 free_var(&rounded);
3996                                 return false;
3997                         }
3998                 }
3999         }
4000
4001         free_var(&rounded);
4002
4003         *result = neg ? -val : val;
4004         return true;
4005 }
4006
4007 /*
4008  * Convert int8 value to numeric.
4009  */
4010 static void
4011 int8_to_numericvar(int64 val, NumericVar *var)
4012 {
4013         uint64          uval,
4014                                 newuval;
4015         NumericDigit *ptr;
4016         int                     ndigits;
4017
4018         /* int8 can require at most 19 decimal digits; add one for safety */
4019         alloc_var(var, 20 / DEC_DIGITS);
4020         if (val < 0)
4021         {
4022                 var->sign = NUMERIC_NEG;
4023                 uval = -val;
4024         }
4025         else
4026         {
4027                 var->sign = NUMERIC_POS;
4028                 uval = val;
4029         }
4030         var->dscale = 0;
4031         if (val == 0)
4032         {
4033                 var->ndigits = 0;
4034                 var->weight = 0;
4035                 return;
4036         }
4037         ptr = var->digits + var->ndigits;
4038         ndigits = 0;
4039         do
4040         {
4041                 ptr--;
4042                 ndigits++;
4043                 newuval = uval / NBASE;
4044                 *ptr = uval - newuval * NBASE;
4045                 uval = newuval;
4046         } while (uval);
4047         var->digits = ptr;
4048         var->ndigits = ndigits;
4049         var->weight = ndigits - 1;
4050 }
4051
4052 /*
4053  * Convert numeric to float8; if out of range, return +/- HUGE_VAL
4054  */
4055 static double
4056 numeric_to_double_no_overflow(Numeric num)
4057 {
4058         char       *tmp;
4059         double          val;
4060         char       *endptr;
4061
4062         tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
4063                                                                                           NumericGetDatum(num)));
4064
4065         /* unlike float8in, we ignore ERANGE from strtod */
4066         val = strtod(tmp, &endptr);
4067         if (*endptr != '\0')
4068         {
4069                 /* shouldn't happen ... */
4070                 ereport(ERROR,
4071                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
4072                          errmsg("invalid input syntax for type double precision: \"%s\"",
4073                                         tmp)));
4074         }
4075
4076         pfree(tmp);
4077
4078         return val;
4079 }
4080
4081 /* As above, but work from a NumericVar */
4082 static double
4083 numericvar_to_double_no_overflow(NumericVar *var)
4084 {
4085         char       *tmp;
4086         double          val;
4087         char       *endptr;
4088
4089         tmp = get_str_from_var(var);
4090
4091         /* unlike float8in, we ignore ERANGE from strtod */
4092         val = strtod(tmp, &endptr);
4093         if (*endptr != '\0')
4094         {
4095                 /* shouldn't happen ... */
4096                 ereport(ERROR,
4097                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
4098                          errmsg("invalid input syntax for type double precision: \"%s\"",
4099                                         tmp)));
4100         }
4101
4102         pfree(tmp);
4103
4104         return val;
4105 }
4106
4107
4108 /*
4109  * cmp_var() -
4110  *
4111  *      Compare two values on variable level.  We assume zeroes have been
4112  *      truncated to no digits.
4113  */
4114 static int
4115 cmp_var(NumericVar *var1, NumericVar *var2)
4116 {
4117         return cmp_var_common(var1->digits, var1->ndigits,
4118                                                   var1->weight, var1->sign,
4119                                                   var2->digits, var2->ndigits,
4120                                                   var2->weight, var2->sign);
4121 }
4122
4123 /*
4124  * cmp_var_common() -
4125  *
4126  *      Main routine of cmp_var(). This function can be used by both
4127  *      NumericVar and Numeric.
4128  */
4129 static int
4130 cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
4131                            int var1weight, int var1sign,
4132                            const NumericDigit *var2digits, int var2ndigits,
4133                            int var2weight, int var2sign)
4134 {
4135         if (var1ndigits == 0)
4136         {
4137                 if (var2ndigits == 0)
4138                         return 0;
4139                 if (var2sign == NUMERIC_NEG)
4140                         return 1;
4141                 return -1;
4142         }
4143         if (var2ndigits == 0)
4144         {
4145                 if (var1sign == NUMERIC_POS)
4146                         return 1;
4147                 return -1;
4148         }
4149
4150         if (var1sign == NUMERIC_POS)
4151         {
4152                 if (var2sign == NUMERIC_NEG)
4153                         return 1;
4154                 return cmp_abs_common(var1digits, var1ndigits, var1weight,
4155                                                           var2digits, var2ndigits, var2weight);
4156         }
4157
4158         if (var2sign == NUMERIC_POS)
4159                 return -1;
4160
4161         return cmp_abs_common(var2digits, var2ndigits, var2weight,
4162                                                   var1digits, var1ndigits, var1weight);
4163 }
4164
4165
4166 /*
4167  * add_var() -
4168  *
4169  *      Full version of add functionality on variable level (handling signs).
4170  *      result might point to one of the operands too without danger.
4171  */
4172 static void
4173 add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
4174 {
4175         /*
4176          * Decide on the signs of the two variables what to do
4177          */
4178         if (var1->sign == NUMERIC_POS)
4179         {
4180                 if (var2->sign == NUMERIC_POS)
4181                 {
4182                         /*
4183                          * Both are positive result = +(ABS(var1) + ABS(var2))
4184                          */
4185                         add_abs(var1, var2, result);
4186                         result->sign = NUMERIC_POS;
4187                 }
4188                 else
4189                 {
4190                         /*
4191                          * var1 is positive, var2 is negative Must compare absolute values
4192                          */
4193                         switch (cmp_abs(var1, var2))
4194                         {
4195                                 case 0:
4196                                         /* ----------
4197                                          * ABS(var1) == ABS(var2)
4198                                          * result = ZERO
4199                                          * ----------
4200                                          */
4201                                         zero_var(result);
4202                                         result->dscale = Max(var1->dscale, var2->dscale);
4203                                         break;
4204
4205                                 case 1:
4206                                         /* ----------
4207                                          * ABS(var1) > ABS(var2)
4208                                          * result = +(ABS(var1) - ABS(var2))
4209                                          * ----------
4210                                          */
4211                                         sub_abs(var1, var2, result);
4212                                         result->sign = NUMERIC_POS;
4213                                         break;
4214
4215                                 case -1:
4216                                         /* ----------
4217                                          * ABS(var1) < ABS(var2)
4218                                          * result = -(ABS(var2) - ABS(var1))
4219                                          * ----------
4220                                          */
4221                                         sub_abs(var2, var1, result);
4222                                         result->sign = NUMERIC_NEG;
4223                                         break;
4224                         }
4225                 }
4226         }
4227         else
4228         {
4229                 if (var2->sign == NUMERIC_POS)
4230                 {
4231                         /* ----------
4232                          * var1 is negative, var2 is positive
4233                          * Must compare absolute values
4234                          * ----------
4235                          */
4236                         switch (cmp_abs(var1, var2))
4237                         {
4238                                 case 0:
4239                                         /* ----------
4240                                          * ABS(var1) == ABS(var2)
4241                                          * result = ZERO
4242                                          * ----------
4243                                          */
4244                                         zero_var(result);
4245                                         result->dscale = Max(var1->dscale, var2->dscale);
4246                                         break;
4247
4248                                 case 1:
4249                                         /* ----------
4250                                          * ABS(var1) > ABS(var2)
4251                                          * result = -(ABS(var1) - ABS(var2))
4252                                          * ----------
4253                                          */
4254                                         sub_abs(var1, var2, result);
4255                                         result->sign = NUMERIC_NEG;
4256                                         break;
4257
4258                                 case -1:
4259                                         /* ----------
4260                                          * ABS(var1) < ABS(var2)
4261                                          * result = +(ABS(var2) - ABS(var1))
4262                                          * ----------
4263                                          */
4264                                         sub_abs(var2, var1, result);
4265                                         result->sign = NUMERIC_POS;
4266                                         break;
4267                         }
4268                 }
4269                 else
4270                 {
4271                         /* ----------
4272                          * Both are negative
4273                          * result = -(ABS(var1) + ABS(var2))
4274                          * ----------
4275                          */
4276                         add_abs(var1, var2, result);
4277                         result->sign = NUMERIC_NEG;
4278                 }
4279         }
4280 }
4281
4282
4283 /*
4284  * sub_var() -
4285  *
4286  *      Full version of sub functionality on variable level (handling signs).
4287  *      result might point to one of the operands too without danger.
4288  */
4289 static void
4290 sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
4291 {
4292         /*
4293          * Decide on the signs of the two variables what to do
4294          */
4295         if (var1->sign == NUMERIC_POS)
4296         {
4297                 if (var2->sign == NUMERIC_NEG)
4298                 {
4299                         /* ----------
4300                          * var1 is positive, var2 is negative
4301                          * result = +(ABS(var1) + ABS(var2))
4302                          * ----------
4303                          */
4304                         add_abs(var1, var2, result);
4305                         result->sign = NUMERIC_POS;
4306                 }
4307                 else
4308                 {
4309                         /* ----------
4310                          * Both are positive
4311                          * Must compare absolute values
4312                          * ----------
4313                          */
4314                         switch (cmp_abs(var1, var2))
4315                         {
4316                                 case 0:
4317                                         /* ----------
4318                                          * ABS(var1) == ABS(var2)
4319                                          * result = ZERO
4320                                          * ----------
4321                                          */
4322                                         zero_var(result);
4323                                         result->dscale = Max(var1->dscale, var2->dscale);
4324                                         break;
4325
4326                                 case 1:
4327                                         /* ----------
4328                                          * ABS(var1) > ABS(var2)
4329                                          * result = +(ABS(var1) - ABS(var2))
4330                                          * ----------
4331                                          */
4332                                         sub_abs(var1, var2, result);
4333                                         result->sign = NUMERIC_POS;
4334                                         break;
4335
4336                                 case -1:
4337                                         /* ----------
4338                                          * ABS(var1) < ABS(var2)
4339                                          * result = -(ABS(var2) - ABS(var1))
4340                                          * ----------
4341                                          */
4342                                         sub_abs(var2, var1, result);
4343                                         result->sign = NUMERIC_NEG;
4344                                         break;
4345                         }
4346                 }
4347         }
4348         else
4349         {
4350                 if (var2->sign == NUMERIC_NEG)
4351                 {
4352                         /* ----------
4353                          * Both are negative
4354                          * Must compare absolute values
4355                          * ----------
4356                          */
4357                         switch (cmp_abs(var1, var2))
4358                         {
4359                                 case 0:
4360                                         /* ----------
4361                                          * ABS(var1) == ABS(var2)
4362                                          * result = ZERO
4363                                          * ----------
4364                                          */
4365                                         zero_var(result);
4366                                         result->dscale = Max(var1->dscale, var2->dscale);
4367                                         break;
4368
4369                                 case 1:
4370                                         /* ----------
4371                                          * ABS(var1) > ABS(var2)
4372                                          * result = -(ABS(var1) - ABS(var2))
4373                                          * ----------
4374                                          */
4375                                         sub_abs(var1, var2, result);
4376                                         result->sign = NUMERIC_NEG;
4377                                         break;
4378
4379                                 case -1:
4380                                         /* ----------
4381                                          * ABS(var1) < ABS(var2)
4382                                          * result = +(ABS(var2) - ABS(var1))
4383                                          * ----------
4384                                          */
4385                                         sub_abs(var2, var1, result);
4386                                         result->sign = NUMERIC_POS;
4387                                         break;
4388                         }
4389                 }
4390                 else
4391                 {
4392                         /* ----------
4393                          * var1 is negative, var2 is positive
4394                          * result = -(ABS(var1) + ABS(var2))
4395                          * ----------
4396                          */
4397                         add_abs(var1, var2, result);
4398                         result->sign = NUMERIC_NEG;
4399                 }
4400         }
4401 }
4402
4403
4404 /*
4405  * mul_var() -
4406  *
4407  *      Multiplication on variable level. Product of var1 * var2 is stored
4408  *      in result.      Result is rounded to no more than rscale fractional digits.
4409  */
4410 static void
4411 mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
4412                 int rscale)
4413 {
4414         int                     res_ndigits;
4415         int                     res_sign;
4416         int                     res_weight;
4417         int                     maxdigits;
4418         int                *dig;
4419         int                     carry;
4420         int                     maxdig;
4421         int                     newdig;
4422         NumericDigit *res_digits;
4423         int                     i,
4424                                 ri,
4425                                 i1,
4426                                 i2;
4427
4428         /* copy these values into local vars for speed in inner loop */
4429         int                     var1ndigits = var1->ndigits;
4430         int                     var2ndigits = var2->ndigits;
4431         NumericDigit *var1digits = var1->digits;
4432         NumericDigit *var2digits = var2->digits;
4433
4434         if (var1ndigits == 0 || var2ndigits == 0)
4435         {
4436                 /* one or both inputs is zero; so is result */
4437                 zero_var(result);
4438                 result->dscale = rscale;
4439                 return;
4440         }
4441
4442         /* Determine result sign and (maximum possible) weight */
4443         if (var1->sign == var2->sign)
4444                 res_sign = NUMERIC_POS;
4445         else
4446                 res_sign = NUMERIC_NEG;
4447         res_weight = var1->weight + var2->weight + 2;
4448
4449         /*
4450          * Determine number of result digits to compute.  If the exact result
4451          * would have more than rscale fractional digits, truncate the computation
4452          * with MUL_GUARD_DIGITS guard digits.  We do that by pretending that one
4453          * or both inputs have fewer digits than they really do.
4454          */
4455         res_ndigits = var1ndigits + var2ndigits + 1;
4456         maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS;
4457         if (res_ndigits > maxdigits)
4458         {
4459                 if (maxdigits < 3)
4460                 {
4461                         /* no useful precision at all in the result... */
4462                         zero_var(result);
4463                         result->dscale = rscale;
4464                         return;
4465                 }
4466                 /* force maxdigits odd so that input ndigits can be equal */
4467                 if ((maxdigits & 1) == 0)
4468                         maxdigits++;
4469                 if (var1ndigits > var2ndigits)
4470                 {
4471                         var1ndigits -= res_ndigits - maxdigits;
4472                         if (var1ndigits < var2ndigits)
4473                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
4474                 }
4475                 else
4476                 {
4477                         var2ndigits -= res_ndigits - maxdigits;
4478                         if (var2ndigits < var1ndigits)
4479                                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
4480                 }
4481                 res_ndigits = maxdigits;
4482                 Assert(res_ndigits == var1ndigits + var2ndigits + 1);
4483         }
4484
4485         /*
4486          * We do the arithmetic in an array "dig[]" of signed int's.  Since
4487          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
4488          * to avoid normalizing carries immediately.
4489          *
4490          * maxdig tracks the maximum possible value of any dig[] entry; when this
4491          * threatens to exceed INT_MAX, we take the time to propagate carries. To
4492          * avoid overflow in maxdig itself, it actually represents the max
4493          * possible value divided by NBASE-1.
4494          */
4495         dig = (int *) palloc0(res_ndigits * sizeof(int));
4496         maxdig = 0;
4497
4498         ri = res_ndigits - 1;
4499         for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--)
4500         {
4501                 int                     var1digit = var1digits[i1];
4502
4503                 if (var1digit == 0)
4504                         continue;
4505
4506                 /* Time to normalize? */
4507                 maxdig += var1digit;
4508                 if (maxdig > INT_MAX / (NBASE - 1))
4509                 {
4510                         /* Yes, do it */
4511                         carry = 0;
4512                         for (i = res_ndigits - 1; i >= 0; i--)
4513                         {
4514                                 newdig = dig[i] + carry;
4515                                 if (newdig >= NBASE)
4516                                 {
4517                                         carry = newdig / NBASE;
4518                                         newdig -= carry * NBASE;
4519                                 }
4520                                 else
4521                                         carry = 0;
4522                                 dig[i] = newdig;
4523                         }
4524                         Assert(carry == 0);
4525                         /* Reset maxdig to indicate new worst-case */
4526                         maxdig = 1 + var1digit;
4527                 }
4528
4529                 /* Add appropriate multiple of var2 into the accumulator */
4530                 i = ri;
4531                 for (i2 = var2ndigits - 1; i2 >= 0; i2--)
4532                         dig[i--] += var1digit * var2digits[i2];
4533         }
4534
4535         /*
4536          * Now we do a final carry propagation pass to normalize the result, which
4537          * we combine with storing the result digits into the output. Note that
4538          * this is still done at full precision w/guard digits.
4539          */
4540         alloc_var(result, res_ndigits);
4541         res_digits = result->digits;
4542         carry = 0;
4543         for (i = res_ndigits - 1; i >= 0; i--)
4544         {
4545                 newdig = dig[i] + carry;
4546                 if (newdig >= NBASE)
4547                 {
4548                         carry = newdig / NBASE;
4549                         newdig -= carry * NBASE;
4550                 }
4551                 else
4552                         carry = 0;
4553                 res_digits[i] = newdig;
4554         }
4555         Assert(carry == 0);
4556
4557         pfree(dig);
4558
4559         /*
4560          * Finally, round the result to the requested precision.
4561          */
4562         result->weight = res_weight;
4563         result->sign = res_sign;
4564
4565         /* Round to target rscale (and set result->dscale) */
4566         round_var(result, rscale);
4567
4568         /* Strip leading and trailing zeroes */
4569         strip_var(result);
4570 }
4571
4572
4573 /*
4574  * div_var() -
4575  *
4576  *      Division on variable level. Quotient of var1 / var2 is stored in result.
4577  *      The quotient is figured to exactly rscale fractional digits.
4578  *      If round is true, it is rounded at the rscale'th digit; if false, it
4579  *      is truncated (towards zero) at that digit.
4580  */
4581 static void
4582 div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
4583                 int rscale, bool round)
4584 {
4585         int                     div_ndigits;
4586         int                     res_ndigits;
4587         int                     res_sign;
4588         int                     res_weight;
4589         int                     carry;
4590         int                     borrow;
4591         int                     divisor1;
4592         int                     divisor2;
4593         NumericDigit *dividend;
4594         NumericDigit *divisor;
4595         NumericDigit *res_digits;
4596         int                     i;
4597         int                     j;
4598
4599         /* copy these values into local vars for speed in inner loop */
4600         int                     var1ndigits = var1->ndigits;
4601         int                     var2ndigits = var2->ndigits;
4602
4603         /*
4604          * First of all division by zero check; we must not be handed an
4605          * unnormalized divisor.
4606          */
4607         if (var2ndigits == 0 || var2->digits[0] == 0)
4608                 ereport(ERROR,
4609                                 (errcode(ERRCODE_DIVISION_BY_ZERO),
4610                                  errmsg("division by zero")));
4611
4612         /*
4613          * Now result zero check
4614          */
4615         if (var1ndigits == 0)
4616         {
4617                 zero_var(result);
4618                 result->dscale = rscale;
4619                 return;
4620         }
4621
4622         /*
4623          * Determine the result sign, weight and number of digits to calculate.
4624          * The weight figured here is correct if the emitted quotient has no
4625          * leading zero digits; otherwise strip_var() will fix things up.
4626          */
4627         if (var1->sign == var2->sign)
4628                 res_sign = NUMERIC_POS;
4629         else
4630                 res_sign = NUMERIC_NEG;
4631         res_weight = var1->weight - var2->weight;
4632         /* The number of accurate result digits we need to produce: */
4633         res_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
4634         /* ... but always at least 1 */
4635         res_ndigits = Max(res_ndigits, 1);
4636         /* If rounding needed, figure one more digit to ensure correct result */
4637         if (round)
4638                 res_ndigits++;
4639
4640         /*
4641          * The working dividend normally requires res_ndigits + var2ndigits
4642          * digits, but make it at least var1ndigits so we can load all of var1
4643          * into it.  (There will be an additional digit dividend[0] in the
4644          * dividend space, but for consistency with Knuth's notation we don't
4645          * count that in div_ndigits.)
4646          */
4647         div_ndigits = res_ndigits + var2ndigits;
4648         div_ndigits = Max(div_ndigits, var1ndigits);
4649
4650         /*
4651          * We need a workspace with room for the working dividend (div_ndigits+1
4652          * digits) plus room for the possibly-normalized divisor (var2ndigits
4653          * digits).  It is convenient also to have a zero at divisor[0] with the
4654          * actual divisor data in divisor[1 .. var2ndigits].  Transferring the
4655          * digits into the workspace also allows us to realloc the result (which
4656          * might be the same as either input var) before we begin the main loop.
4657          * Note that we use palloc0 to ensure that divisor[0], dividend[0], and
4658          * any additional dividend positions beyond var1ndigits, start out 0.
4659          */
4660         dividend = (NumericDigit *)
4661                 palloc0((div_ndigits + var2ndigits + 2) * sizeof(NumericDigit));
4662         divisor = dividend + (div_ndigits + 1);
4663         memcpy(dividend + 1, var1->digits, var1ndigits * sizeof(NumericDigit));
4664         memcpy(divisor + 1, var2->digits, var2ndigits * sizeof(NumericDigit));
4665
4666         /*
4667          * Now we can realloc the result to hold the generated quotient digits.
4668          */
4669         alloc_var(result, res_ndigits);
4670         res_digits = result->digits;
4671
4672         if (var2ndigits == 1)
4673         {
4674                 /*
4675                  * If there's only a single divisor digit, we can use a fast path (cf.
4676                  * Knuth section 4.3.1 exercise 16).
4677                  */
4678                 divisor1 = divisor[1];
4679                 carry = 0;
4680                 for (i = 0; i < res_ndigits; i++)
4681                 {
4682                         carry = carry * NBASE + dividend[i + 1];
4683                         res_digits[i] = carry / divisor1;
4684                         carry = carry % divisor1;
4685                 }
4686         }
4687         else
4688         {
4689                 /*
4690                  * The full multiple-place algorithm is taken from Knuth volume 2,
4691                  * Algorithm 4.3.1D.
4692                  *
4693                  * We need the first divisor digit to be >= NBASE/2.  If it isn't,
4694                  * make it so by scaling up both the divisor and dividend by the
4695                  * factor "d".  (The reason for allocating dividend[0] above is to
4696                  * leave room for possible carry here.)
4697                  */
4698                 if (divisor[1] < HALF_NBASE)
4699                 {
4700                         int                     d = NBASE / (divisor[1] + 1);
4701
4702                         carry = 0;
4703                         for (i = var2ndigits; i > 0; i--)
4704                         {
4705                                 carry += divisor[i] * d;
4706                                 divisor[i] = carry % NBASE;
4707                                 carry = carry / NBASE;
4708                         }
4709                         Assert(carry == 0);
4710                         carry = 0;
4711                         /* at this point only var1ndigits of dividend can be nonzero */
4712                         for (i = var1ndigits; i >= 0; i--)
4713                         {
4714                                 carry += dividend[i] * d;
4715                                 dividend[i] = carry % NBASE;
4716                                 carry = carry / NBASE;
4717                         }
4718                         Assert(carry == 0);
4719                         Assert(divisor[1] >= HALF_NBASE);
4720                 }
4721                 /* First 2 divisor digits are used repeatedly in main loop */
4722                 divisor1 = divisor[1];
4723                 divisor2 = divisor[2];
4724
4725                 /*
4726                  * Begin the main loop.  Each iteration of this loop produces the j'th
4727                  * quotient digit by dividing dividend[j .. j + var2ndigits] by the
4728                  * divisor; this is essentially the same as the common manual
4729                  * procedure for long division.
4730                  */
4731                 for (j = 0; j < res_ndigits; j++)
4732                 {
4733                         /* Estimate quotient digit from the first two dividend digits */
4734                         int                     next2digits = dividend[j] * NBASE + dividend[j + 1];
4735                         int                     qhat;
4736
4737                         /*
4738                          * If next2digits are 0, then quotient digit must be 0 and there's
4739                          * no need to adjust the working dividend.      It's worth testing
4740                          * here to fall out ASAP when processing trailing zeroes in a
4741                          * dividend.
4742                          */
4743                         if (next2digits == 0)
4744                         {
4745                                 res_digits[j] = 0;
4746                                 continue;
4747                         }
4748
4749                         if (dividend[j] == divisor1)
4750                                 qhat = NBASE - 1;
4751                         else
4752                                 qhat = next2digits / divisor1;
4753
4754                         /*
4755                          * Adjust quotient digit if it's too large.  Knuth proves that
4756                          * after this step, the quotient digit will be either correct or
4757                          * just one too large.  (Note: it's OK to use dividend[j+2] here
4758                          * because we know the divisor length is at least 2.)
4759                          */
4760                         while (divisor2 * qhat >
4761                                    (next2digits - qhat * divisor1) * NBASE + dividend[j + 2])
4762                                 qhat--;
4763
4764                         /* As above, need do nothing more when quotient digit is 0 */
4765                         if (qhat > 0)
4766                         {
4767                                 /*
4768                                  * Multiply the divisor by qhat, and subtract that from the
4769                                  * working dividend.  "carry" tracks the multiplication,
4770                                  * "borrow" the subtraction (could we fold these together?)
4771                                  */
4772                                 carry = 0;
4773                                 borrow = 0;
4774                                 for (i = var2ndigits; i >= 0; i--)
4775                                 {
4776                                         carry += divisor[i] * qhat;
4777                                         borrow -= carry % NBASE;
4778                                         carry = carry / NBASE;
4779                                         borrow += dividend[j + i];
4780                                         if (borrow < 0)
4781                                         {
4782                                                 dividend[j + i] = borrow + NBASE;
4783                                                 borrow = -1;
4784                                         }
4785                                         else
4786                                         {
4787                                                 dividend[j + i] = borrow;
4788                                                 borrow = 0;
4789                                         }
4790                                 }
4791                                 Assert(carry == 0);
4792
4793                                 /*
4794                                  * If we got a borrow out of the top dividend digit, then
4795                                  * indeed qhat was one too large.  Fix it, and add back the
4796                                  * divisor to correct the working dividend.  (Knuth proves
4797                                  * that this will occur only about 3/NBASE of the time; hence,
4798                                  * it's a good idea to test this code with small NBASE to be
4799                                  * sure this section gets exercised.)
4800                                  */
4801                                 if (borrow)
4802                                 {
4803                                         qhat--;
4804                                         carry = 0;
4805                                         for (i = var2ndigits; i >= 0; i--)
4806                                         {
4807                                                 carry += dividend[j + i] + divisor[i];
4808                                                 if (carry >= NBASE)
4809                                                 {
4810                                                         dividend[j + i] = carry - NBASE;
4811                                                         carry = 1;
4812                                                 }
4813                                                 else
4814                                                 {
4815                                                         dividend[j + i] = carry;
4816                                                         carry = 0;
4817                                                 }
4818                                         }
4819                                         /* A carry should occur here to cancel the borrow above */
4820                                         Assert(carry == 1);
4821                                 }
4822                         }
4823
4824                         /* And we're done with this quotient digit */
4825                         res_digits[j] = qhat;
4826                 }
4827         }
4828
4829         pfree(dividend);
4830
4831         /*
4832          * Finally, round or truncate the result to the requested precision.
4833          */
4834         result->weight = res_weight;
4835         result->sign = res_sign;
4836
4837         /* Round or truncate to target rscale (and set result->dscale) */
4838         if (round)
4839                 round_var(result, rscale);
4840         else
4841                 trunc_var(result, rscale);
4842
4843         /* Strip leading and trailing zeroes */
4844         strip_var(result);
4845 }
4846
4847
4848 /*
4849  * div_var_fast() -
4850  *
4851  *      This has the same API as div_var, but is implemented using the division
4852  *      algorithm from the "FM" library, rather than Knuth's schoolbook-division
4853  *      approach.  This is significantly faster but can produce inaccurate
4854  *      results, because it sometimes has to propagate rounding to the left,
4855  *      and so we can never be entirely sure that we know the requested digits
4856  *      exactly.  We compute DIV_GUARD_DIGITS extra digits, but there is
4857  *      no certainty that that's enough.  We use this only in the transcendental
4858  *      function calculation routines, where everything is approximate anyway.
4859  */
4860 static void
4861 div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result,
4862                          int rscale, bool round)
4863 {
4864         int                     div_ndigits;
4865         int                     res_sign;
4866         int                     res_weight;
4867         int                *div;
4868         int                     qdigit;
4869         int                     carry;
4870         int                     maxdiv;
4871         int                     newdig;
4872         NumericDigit *res_digits;
4873         double          fdividend,
4874                                 fdivisor,
4875                                 fdivisorinverse,
4876                                 fquotient;
4877         int                     qi;
4878         int                     i;
4879
4880         /* copy these values into local vars for speed in inner loop */
4881         int                     var1ndigits = var1->ndigits;
4882         int                     var2ndigits = var2->ndigits;
4883         NumericDigit *var1digits = var1->digits;
4884         NumericDigit *var2digits = var2->digits;
4885
4886         /*
4887          * First of all division by zero check; we must not be handed an
4888          * unnormalized divisor.
4889          */
4890         if (var2ndigits == 0 || var2digits[0] == 0)
4891                 ereport(ERROR,
4892                                 (errcode(ERRCODE_DIVISION_BY_ZERO),
4893                                  errmsg("division by zero")));
4894
4895         /*
4896          * Now result zero check
4897          */
4898         if (var1ndigits == 0)
4899         {
4900                 zero_var(result);
4901                 result->dscale = rscale;
4902                 return;
4903         }
4904
4905         /*
4906          * Determine the result sign, weight and number of digits to calculate
4907          */
4908         if (var1->sign == var2->sign)
4909                 res_sign = NUMERIC_POS;
4910         else
4911                 res_sign = NUMERIC_NEG;
4912         res_weight = var1->weight - var2->weight + 1;
4913         /* The number of accurate result digits we need to produce: */
4914         div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
4915         /* Add guard digits for roundoff error */
4916         div_ndigits += DIV_GUARD_DIGITS;
4917         if (div_ndigits < DIV_GUARD_DIGITS)
4918                 div_ndigits = DIV_GUARD_DIGITS;
4919         /* Must be at least var1ndigits, too, to simplify data-loading loop */
4920         if (div_ndigits < var1ndigits)
4921                 div_ndigits = var1ndigits;
4922
4923         /*
4924          * We do the arithmetic in an array "div[]" of signed int's.  Since
4925          * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
4926          * to avoid normalizing carries immediately.
4927          *
4928          * We start with div[] containing one zero digit followed by the
4929          * dividend's digits (plus appended zeroes to reach the desired precision
4930          * including guard digits).  Each step of the main loop computes an
4931          * (approximate) quotient digit and stores it into div[], removing one
4932          * position of dividend space.  A final pass of carry propagation takes
4933          * care of any mistaken quotient digits.
4934          */
4935         div = (int *) palloc0((div_ndigits + 1) * sizeof(int));
4936         for (i = 0; i < var1ndigits; i++)
4937                 div[i + 1] = var1digits[i];
4938
4939         /*
4940          * We estimate each quotient digit using floating-point arithmetic, taking
4941          * the first four digits of the (current) dividend and divisor. This must
4942          * be float to avoid overflow.
4943          */
4944         fdivisor = (double) var2digits[0];
4945         for (i = 1; i < 4; i++)
4946         {
4947                 fdivisor *= NBASE;
4948                 if (i < var2ndigits)
4949                         fdivisor += (double) var2digits[i];
4950         }
4951         fdivisorinverse = 1.0 / fdivisor;
4952
4953         /*
4954          * maxdiv tracks the maximum possible absolute value of any div[] entry;
4955          * when this threatens to exceed INT_MAX, we take the time to propagate
4956          * carries.  To avoid overflow in maxdiv itself, it actually represents
4957          * the max possible abs. value divided by NBASE-1.
4958          */
4959         maxdiv = 1;
4960
4961         /*
4962          * Outer loop computes next quotient digit, which will go into div[qi]
4963          */
4964         for (qi = 0; qi < div_ndigits; qi++)
4965         {
4966                 /* Approximate the current dividend value */
4967                 fdividend = (double) div[qi];
4968                 for (i = 1; i < 4; i++)
4969                 {
4970                         fdividend *= NBASE;
4971                         if (qi + i <= div_ndigits)
4972                                 fdividend += (double) div[qi + i];
4973                 }
4974                 /* Compute the (approximate) quotient digit */
4975                 fquotient = fdividend * fdivisorinverse;
4976                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
4977                         (((int) fquotient) - 1);        /* truncate towards -infinity */
4978
4979                 if (qdigit != 0)
4980                 {
4981                         /* Do we need to normalize now? */
4982                         maxdiv += Abs(qdigit);
4983                         if (maxdiv > INT_MAX / (NBASE - 1))
4984                         {
4985                                 /* Yes, do it */
4986                                 carry = 0;
4987                                 for (i = div_ndigits; i > qi; i--)
4988                                 {
4989                                         newdig = div[i] + carry;
4990                                         if (newdig < 0)
4991                                         {
4992                                                 carry = -((-newdig - 1) / NBASE) - 1;
4993                                                 newdig -= carry * NBASE;
4994                                         }
4995                                         else if (newdig >= NBASE)
4996                                         {
4997                                                 carry = newdig / NBASE;
4998                                                 newdig -= carry * NBASE;
4999                                         }
5000                                         else
5001                                                 carry = 0;
5002                                         div[i] = newdig;
5003                                 }
5004                                 newdig = div[qi] + carry;
5005                                 div[qi] = newdig;
5006
5007                                 /*
5008                                  * All the div[] digits except possibly div[qi] are now in the
5009                                  * range 0..NBASE-1.
5010                                  */
5011                                 maxdiv = Abs(newdig) / (NBASE - 1);
5012                                 maxdiv = Max(maxdiv, 1);
5013
5014                                 /*
5015                                  * Recompute the quotient digit since new info may have
5016                                  * propagated into the top four dividend digits
5017                                  */
5018                                 fdividend = (double) div[qi];
5019                                 for (i = 1; i < 4; i++)
5020                                 {
5021                                         fdividend *= NBASE;
5022                                         if (qi + i <= div_ndigits)
5023                                                 fdividend += (double) div[qi + i];
5024                                 }
5025                                 /* Compute the (approximate) quotient digit */
5026                                 fquotient = fdividend * fdivisorinverse;
5027                                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
5028                                         (((int) fquotient) - 1);        /* truncate towards -infinity */
5029                                 maxdiv += Abs(qdigit);
5030                         }
5031
5032                         /* Subtract off the appropriate multiple of the divisor */
5033                         if (qdigit != 0)
5034                         {
5035                                 int                     istop = Min(var2ndigits, div_ndigits - qi + 1);
5036
5037                                 for (i = 0; i < istop; i++)
5038                                         div[qi + i] -= qdigit * var2digits[i];
5039                         }
5040                 }
5041
5042                 /*
5043                  * The dividend digit we are about to replace might still be nonzero.
5044                  * Fold it into the next digit position.  We don't need to worry about
5045                  * overflow here since this should nearly cancel with the subtraction
5046                  * of the divisor.
5047                  */
5048                 div[qi + 1] += div[qi] * NBASE;
5049
5050                 div[qi] = qdigit;
5051         }
5052
5053         /*
5054          * Approximate and store the last quotient digit (div[div_ndigits])
5055          */
5056         fdividend = (double) div[qi];
5057         for (i = 1; i < 4; i++)
5058                 fdividend *= NBASE;
5059         fquotient = fdividend * fdivisorinverse;
5060         qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
5061                 (((int) fquotient) - 1);        /* truncate towards -infinity */
5062         div[qi] = qdigit;
5063
5064         /*
5065          * Now we do a final carry propagation pass to normalize the result, which
5066          * we combine with storing the result digits into the output. Note that
5067          * this is still done at full precision w/guard digits.
5068          */
5069         alloc_var(result, div_ndigits + 1);
5070         res_digits = result->digits;
5071         carry = 0;
5072         for (i = div_ndigits; i >= 0; i--)
5073         {
5074                 newdig = div[i] + carry;
5075                 if (newdig < 0)
5076                 {
5077                         carry = -((-newdig - 1) / NBASE) - 1;
5078                         newdig -= carry * NBASE;
5079                 }
5080                 else if (newdig >= NBASE)
5081                 {
5082                         carry = newdig / NBASE;
5083                         newdig -= carry * NBASE;
5084                 }
5085                 else
5086                         carry = 0;
5087                 res_digits[i] = newdig;
5088         }
5089         Assert(carry == 0);
5090
5091         pfree(div);
5092
5093         /*
5094          * Finally, round the result to the requested precision.
5095          */
5096         result->weight = res_weight;
5097         result->sign = res_sign;
5098
5099         /* Round to target rscale (and set result->dscale) */
5100         if (round)
5101                 round_var(result, rscale);
5102         else
5103                 trunc_var(result, rscale);
5104
5105         /* Strip leading and trailing zeroes */
5106         strip_var(result);
5107 }
5108
5109
5110 /*
5111  * Default scale selection for division
5112  *
5113  * Returns the appropriate result scale for the division result.
5114  */
5115 static int
5116 select_div_scale(NumericVar *var1, NumericVar *var2)
5117 {
5118         int                     weight1,
5119                                 weight2,
5120                                 qweight,
5121                                 i;
5122         NumericDigit firstdigit1,
5123                                 firstdigit2;
5124         int                     rscale;
5125
5126         /*
5127          * The result scale of a division isn't specified in any SQL standard. For
5128          * PostgreSQL we select a result scale that will give at least
5129          * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a
5130          * result no less accurate than float8; but use a scale not less than
5131          * either input's display scale.
5132          */
5133
5134         /* Get the actual (normalized) weight and first digit of each input */
5135
5136         weight1 = 0;                            /* values to use if var1 is zero */
5137         firstdigit1 = 0;
5138         for (i = 0; i < var1->ndigits; i++)
5139         {
5140                 firstdigit1 = var1->digits[i];
5141                 if (firstdigit1 != 0)
5142                 {
5143                         weight1 = var1->weight - i;
5144                         break;
5145                 }
5146         }
5147
5148         weight2 = 0;                            /* values to use if var2 is zero */
5149         firstdigit2 = 0;
5150         for (i = 0; i < var2->ndigits; i++)
5151         {
5152                 firstdigit2 = var2->digits[i];
5153                 if (firstdigit2 != 0)
5154                 {
5155                         weight2 = var2->weight - i;
5156                         break;
5157                 }
5158         }
5159
5160         /*
5161          * Estimate weight of quotient.  If the two first digits are equal, we
5162          * can't be sure, but assume that var1 is less than var2.
5163          */
5164         qweight = weight1 - weight2;
5165         if (firstdigit1 <= firstdigit2)
5166                 qweight--;
5167
5168         /* Select result scale */
5169         rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS;
5170         rscale = Max(rscale, var1->dscale);
5171         rscale = Max(rscale, var2->dscale);
5172         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
5173         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
5174
5175         return rscale;
5176 }
5177
5178
5179 /*
5180  * mod_var() -
5181  *
5182  *      Calculate the modulo of two numerics at variable level
5183  */
5184 static void
5185 mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
5186 {
5187         NumericVar      tmp;
5188
5189         init_var(&tmp);
5190
5191         /* ---------
5192          * We do this using the equation
5193          *              mod(x,y) = x - trunc(x/y)*y
5194          * div_var can be persuaded to give us trunc(x/y) directly.
5195          * ----------
5196          */
5197         div_var(var1, var2, &tmp, 0, false);
5198
5199         mul_var(var2, &tmp, &tmp, var2->dscale);
5200
5201         sub_var(var1, &tmp, result);
5202
5203         free_var(&tmp);
5204 }
5205
5206
5207 /*
5208  * ceil_var() -
5209  *
5210  *      Return the smallest integer greater than or equal to the argument
5211  *      on variable level
5212  */
5213 static void
5214 ceil_var(NumericVar *var, NumericVar *result)
5215 {
5216         NumericVar      tmp;
5217
5218         init_var(&tmp);
5219         set_var_from_var(var, &tmp);
5220
5221         trunc_var(&tmp, 0);
5222
5223         if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
5224                 add_var(&tmp, &const_one, &tmp);
5225
5226         set_var_from_var(&tmp, result);
5227         free_var(&tmp);
5228 }
5229
5230
5231 /*
5232  * floor_var() -
5233  *
5234  *      Return the largest integer equal to or less than the argument
5235  *      on variable level
5236  */
5237 static void
5238 floor_var(NumericVar *var, NumericVar *result)
5239 {
5240         NumericVar      tmp;
5241
5242         init_var(&tmp);
5243         set_var_from_var(var, &tmp);
5244
5245         trunc_var(&tmp, 0);
5246
5247         if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
5248                 sub_var(&tmp, &const_one, &tmp);
5249
5250         set_var_from_var(&tmp, result);
5251         free_var(&tmp);
5252 }
5253
5254
5255 /*
5256  * sqrt_var() -
5257  *
5258  *      Compute the square root of x using Newton's algorithm
5259  */
5260 static void
5261 sqrt_var(NumericVar *arg, NumericVar *result, int rscale)
5262 {
5263         NumericVar      tmp_arg;
5264         NumericVar      tmp_val;
5265         NumericVar      last_val;
5266         int                     local_rscale;
5267         int                     stat;
5268
5269         local_rscale = rscale + 8;
5270
5271         stat = cmp_var(arg, &const_zero);
5272         if (stat == 0)
5273         {
5274                 zero_var(result);
5275                 result->dscale = rscale;
5276                 return;
5277         }
5278
5279         /*
5280          * SQL2003 defines sqrt() in terms of power, so we need to emit the right
5281          * SQLSTATE error code if the operand is negative.
5282          */
5283         if (stat < 0)
5284                 ereport(ERROR,
5285                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
5286                                  errmsg("cannot take square root of a negative number")));
5287
5288         init_var(&tmp_arg);
5289         init_var(&tmp_val);
5290         init_var(&last_val);
5291
5292         /* Copy arg in case it is the same var as result */
5293         set_var_from_var(arg, &tmp_arg);
5294
5295         /*
5296          * Initialize the result to the first guess
5297          */
5298         alloc_var(result, 1);
5299         result->digits[0] = tmp_arg.digits[0] / 2;
5300         if (result->digits[0] == 0)
5301                 result->digits[0] = 1;
5302         result->weight = tmp_arg.weight / 2;
5303         result->sign = NUMERIC_POS;
5304
5305         set_var_from_var(result, &last_val);
5306
5307         for (;;)
5308         {
5309                 div_var_fast(&tmp_arg, result, &tmp_val, local_rscale, true);
5310
5311                 add_var(result, &tmp_val, result);
5312                 mul_var(result, &const_zero_point_five, result, local_rscale);
5313
5314                 if (cmp_var(&last_val, result) == 0)
5315                         break;
5316                 set_var_from_var(result, &last_val);
5317         }
5318
5319         free_var(&last_val);
5320         free_var(&tmp_val);
5321         free_var(&tmp_arg);
5322
5323         /* Round to requested precision */
5324         round_var(result, rscale);
5325 }
5326
5327
5328 /*
5329  * exp_var() -
5330  *
5331  *      Raise e to the power of x
5332  */
5333 static void
5334 exp_var(NumericVar *arg, NumericVar *result, int rscale)
5335 {
5336         NumericVar      x;
5337         int                     xintval;
5338         bool            xneg = FALSE;
5339         int                     local_rscale;
5340
5341         /*----------
5342          * We separate the integral and fraction parts of x, then compute
5343          *              e^x = e^xint * e^xfrac
5344          * where e = exp(1) and e^xfrac = exp(xfrac) are computed by
5345          * exp_var_internal; the limited range of inputs allows that routine
5346          * to do a good job with a simple Taylor series.  Raising e^xint is
5347          * done by repeated multiplications in power_var_int.
5348          *----------
5349          */
5350         init_var(&x);
5351
5352         set_var_from_var(arg, &x);
5353
5354         if (x.sign == NUMERIC_NEG)
5355         {
5356                 xneg = TRUE;
5357                 x.sign = NUMERIC_POS;
5358         }
5359
5360         /* Extract the integer part, remove it from x */
5361         xintval = 0;
5362         while (x.weight >= 0)
5363         {
5364                 xintval *= NBASE;
5365                 if (x.ndigits > 0)
5366                 {
5367                         xintval += x.digits[0];
5368                         x.digits++;
5369                         x.ndigits--;
5370                 }
5371                 x.weight--;
5372                 /* Guard against overflow */
5373                 if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3)
5374                         ereport(ERROR,
5375                                         (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
5376                                          errmsg("argument for function \"exp\" too big")));
5377         }
5378
5379         /* Select an appropriate scale for internal calculation */
5380         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
5381
5382         /* Compute e^xfrac */
5383         exp_var_internal(&x, result, local_rscale);
5384
5385         /* If there's an integer part, multiply by e^xint */
5386         if (xintval > 0)
5387         {
5388                 NumericVar      e;
5389
5390                 init_var(&e);
5391                 exp_var_internal(&const_one, &e, local_rscale);
5392                 power_var_int(&e, xintval, &e, local_rscale);
5393                 mul_var(&e, result, result, local_rscale);
5394                 free_var(&e);
5395         }
5396
5397         /* Compensate for input sign, and round to requested rscale */
5398         if (xneg)
5399                 div_var_fast(&const_one, result, result, rscale, true);
5400         else
5401                 round_var(result, rscale);
5402
5403         free_var(&x);
5404 }
5405
5406
5407 /*
5408  * exp_var_internal() -
5409  *
5410  *      Raise e to the power of x, where 0 <= x <= 1
5411  *
5412  * NB: the result should be good to at least rscale digits, but it has
5413  * *not* been rounded off; the caller must do that if wanted.
5414  */
5415 static void
5416 exp_var_internal(NumericVar *arg, NumericVar *result, int rscale)
5417 {
5418         NumericVar      x;
5419         NumericVar      xpow;
5420         NumericVar      ifac;
5421         NumericVar      elem;
5422         NumericVar      ni;
5423         int                     ndiv2 = 0;
5424         int                     local_rscale;
5425
5426         init_var(&x);
5427         init_var(&xpow);
5428         init_var(&ifac);
5429         init_var(&elem);
5430         init_var(&ni);
5431
5432         set_var_from_var(arg, &x);
5433
5434         Assert(x.sign == NUMERIC_POS);
5435
5436         local_rscale = rscale + 8;
5437
5438         /* Reduce input into range 0 <= x <= 0.01 */
5439         while (cmp_var(&x, &const_zero_point_01) > 0)
5440         {
5441                 ndiv2++;
5442                 local_rscale++;
5443                 mul_var(&x, &const_zero_point_five, &x, x.dscale + 1);
5444         }
5445
5446         /*
5447          * Use the Taylor series
5448          *
5449          * exp(x) = 1 + x + x^2/2! + x^3/3! + ...
5450          *
5451          * Given the limited range of x, this should converge reasonably quickly.
5452          * We run the series until the terms fall below the local_rscale limit.
5453          */
5454         add_var(&const_one, &x, result);
5455         set_var_from_var(&x, &xpow);
5456         set_var_from_var(&const_one, &ifac);
5457         set_var_from_var(&const_one, &ni);
5458
5459         for (;;)
5460         {
5461                 add_var(&ni, &const_one, &ni);
5462                 mul_var(&xpow, &x, &xpow, local_rscale);
5463                 mul_var(&ifac, &ni, &ifac, 0);
5464                 div_var_fast(&xpow, &ifac, &elem, local_rscale, true);
5465
5466                 if (elem.ndigits == 0)
5467                         break;
5468
5469                 add_var(result, &elem, result);
5470         }
5471
5472         /* Compensate for argument range reduction */
5473         while (ndiv2-- > 0)
5474                 mul_var(result, result, result, local_rscale);
5475
5476         free_var(&x);
5477         free_var(&xpow);
5478         free_var(&ifac);
5479         free_var(&elem);
5480         free_var(&ni);
5481 }
5482
5483
5484 /*
5485  * ln_var() -
5486  *
5487  *      Compute the natural log of x
5488  */
5489 static void
5490 ln_var(NumericVar *arg, NumericVar *result, int rscale)
5491 {
5492         NumericVar      x;
5493         NumericVar      xx;
5494         NumericVar      ni;
5495         NumericVar      elem;
5496         NumericVar      fact;
5497         int                     local_rscale;
5498         int                     cmp;
5499
5500         cmp = cmp_var(arg, &const_zero);
5501         if (cmp == 0)
5502                 ereport(ERROR,
5503                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
5504                                  errmsg("cannot take logarithm of zero")));
5505         else if (cmp < 0)
5506                 ereport(ERROR,
5507                                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
5508                                  errmsg("cannot take logarithm of a negative number")));
5509
5510         local_rscale = rscale + 8;
5511
5512         init_var(&x);
5513         init_var(&xx);
5514         init_var(&ni);
5515         init_var(&elem);
5516         init_var(&fact);
5517
5518         set_var_from_var(arg, &x);
5519         set_var_from_var(&const_two, &fact);
5520
5521         /* Reduce input into range 0.9 < x < 1.1 */
5522         while (cmp_var(&x, &const_zero_point_nine) <= 0)
5523         {
5524                 local_rscale++;
5525                 sqrt_var(&x, &x, local_rscale);
5526                 mul_var(&fact, &const_two, &fact, 0);
5527         }
5528         while (cmp_var(&x, &const_one_point_one) >= 0)
5529         {
5530                 local_rscale++;
5531                 sqrt_var(&x, &x, local_rscale);
5532                 mul_var(&fact, &const_two, &fact, 0);
5533         }
5534
5535         /*
5536          * We use the Taylor series for 0.5 * ln((1+z)/(1-z)),
5537          *
5538          * z + z^3/3 + z^5/5 + ...
5539          *
5540          * where z = (x-1)/(x+1) is in the range (approximately) -0.053 .. 0.048
5541          * due to the above range-reduction of x.
5542          *
5543          * The convergence of this is not as fast as one would like, but is
5544          * tolerable given that z is small.
5545          */
5546         sub_var(&x, &const_one, result);
5547         add_var(&x, &const_one, &elem);
5548         div_var_fast(result, &elem, result, local_rscale, true);
5549         set_var_from_var(result, &xx);
5550         mul_var(result, result, &x, local_rscale);
5551
5552         set_var_from_var(&const_one, &ni);
5553
5554         for (;;)
5555         {
5556                 add_var(&ni, &const_two, &ni);
5557                 mul_var(&xx, &x, &xx, local_rscale);
5558                 div_var_fast(&xx, &ni, &elem, local_rscale, true);
5559
5560                 if (elem.ndigits == 0)
5561                         break;
5562
5563                 add_var(result, &elem, result);
5564
5565                 if (elem.weight < (result->weight - local_rscale * 2 / DEC_DIGITS))
5566                         break;
5567         }
5568
5569         /* Compensate for argument range reduction, round to requested rscale */
5570         mul_var(result, &fact, result, rscale);
5571
5572         free_var(&x);
5573         free_var(&xx);
5574         free_var(&ni);
5575         free_var(&elem);
5576         free_var(&fact);
5577 }
5578
5579
5580 /*
5581  * log_var() -
5582  *
5583  *      Compute the logarithm of num in a given base.
5584  *
5585  *      Note: this routine chooses dscale of the result.
5586  */
5587 static void
5588 log_var(NumericVar *base, NumericVar *num, NumericVar *result)
5589 {
5590         NumericVar      ln_base;
5591         NumericVar      ln_num;
5592         int                     dec_digits;
5593         int                     rscale;
5594         int                     local_rscale;
5595
5596         init_var(&ln_base);
5597         init_var(&ln_num);
5598
5599         /* Set scale for ln() calculations --- compare numeric_ln() */
5600
5601         /* Approx decimal digits before decimal point */
5602         dec_digits = (num->weight + 1) * DEC_DIGITS;
5603
5604         if (dec_digits > 1)
5605                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
5606         else if (dec_digits < 1)
5607                 rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
5608         else
5609                 rscale = NUMERIC_MIN_SIG_DIGITS;
5610
5611         rscale = Max(rscale, base->dscale);
5612         rscale = Max(rscale, num->dscale);
5613         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
5614         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
5615
5616         local_rscale = rscale + 8;
5617
5618         /* Form natural logarithms */
5619         ln_var(base, &ln_base, local_rscale);
5620         ln_var(num, &ln_num, local_rscale);
5621
5622         ln_base.dscale = rscale;
5623         ln_num.dscale = rscale;
5624
5625         /* Select scale for division result */
5626         rscale = select_div_scale(&ln_num, &ln_base);
5627
5628         div_var_fast(&ln_num, &ln_base, result, rscale, true);
5629
5630         free_var(&ln_num);
5631         free_var(&ln_base);
5632 }
5633
5634
5635 /*
5636  * power_var() -
5637  *
5638  *      Raise base to the power of exp
5639  *
5640  *      Note: this routine chooses dscale of the result.
5641  */
5642 static void
5643 power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
5644 {
5645         NumericVar      ln_base;
5646         NumericVar      ln_num;
5647         int                     dec_digits;
5648         int                     rscale;
5649         int                     local_rscale;
5650         double          val;
5651
5652         /* If exp can be represented as an integer, use power_var_int */
5653         if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1)
5654         {
5655                 /* exact integer, but does it fit in int? */
5656                 int64           expval64;
5657
5658                 if (numericvar_to_int8(exp, &expval64))
5659                 {
5660                         int                     expval = (int) expval64;
5661
5662                         /* Test for overflow by reverse-conversion. */
5663                         if ((int64) expval == expval64)
5664                         {
5665                                 /* Okay, select rscale */
5666                                 rscale = NUMERIC_MIN_SIG_DIGITS;
5667                                 rscale = Max(rscale, base->dscale);
5668                                 rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
5669                                 rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
5670
5671                                 power_var_int(base, expval, result, rscale);
5672                                 return;
5673                         }
5674                 }
5675         }
5676
5677         /*
5678          * This avoids log(0) for cases of 0 raised to a non-integer. 0 ^ 0
5679          * handled by power_var_int().
5680          */
5681         if (cmp_var(base, &const_zero) == 0)
5682         {
5683                 set_var_from_var(&const_zero, result);
5684                 result->dscale = NUMERIC_MIN_SIG_DIGITS;                /* no need to round */
5685                 return;
5686         }
5687
5688         init_var(&ln_base);
5689         init_var(&ln_num);
5690
5691         /* Set scale for ln() calculation --- need extra accuracy here */
5692
5693         /* Approx decimal digits before decimal point */
5694         dec_digits = (base->weight + 1) * DEC_DIGITS;
5695
5696         if (dec_digits > 1)
5697                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(dec_digits - 1);
5698         else if (dec_digits < 1)
5699                 rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(1 - dec_digits);
5700         else
5701                 rscale = NUMERIC_MIN_SIG_DIGITS * 2;
5702
5703         rscale = Max(rscale, base->dscale * 2);
5704         rscale = Max(rscale, exp->dscale * 2);
5705         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2);
5706         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2);
5707
5708         local_rscale = rscale + 8;
5709
5710         ln_var(base, &ln_base, local_rscale);
5711
5712         mul_var(&ln_base, exp, &ln_num, local_rscale);
5713
5714         /* Set scale for exp() -- compare numeric_exp() */
5715
5716         /* convert input to float8, ignoring overflow */
5717         val = numericvar_to_double_no_overflow(&ln_num);
5718
5719         /*
5720          * log10(result) = num * log10(e), so this is approximately the weight:
5721          */
5722         val *= 0.434294481903252;
5723
5724         /* limit to something that won't cause integer overflow */
5725         val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
5726         val = Min(val, NUMERIC_MAX_RESULT_SCALE);
5727
5728         rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
5729         rscale = Max(rscale, base->dscale);
5730         rscale = Max(rscale, exp->dscale);
5731         rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
5732         rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
5733
5734         exp_var(&ln_num, result, rscale);
5735
5736         free_var(&ln_num);
5737         free_var(&ln_base);
5738 }
5739
5740 /*
5741  * power_var_int() -
5742  *
5743  *      Raise base to the power of exp, where exp is an integer.
5744  */
5745 static void
5746 power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale)
5747 {
5748         bool            neg;
5749         NumericVar      base_prod;
5750         int                     local_rscale;
5751
5752         switch (exp)
5753         {
5754                 case 0:
5755
5756                         /*
5757                          * While 0 ^ 0 can be either 1 or indeterminate (error), we treat
5758                          * it as 1 because most programming languages do this. SQL:2003
5759                          * also requires a return value of 1.
5760                          * http://en.wikipedia.org/wiki/Exponentiation#Zero_to_the_zero_pow
5761                          * er
5762                          */
5763                         set_var_from_var(&const_one, result);
5764                         result->dscale = rscale;        /* no need to round */
5765                         return;
5766                 case 1:
5767                         set_var_from_var(base, result);
5768                         round_var(result, rscale);
5769                         return;
5770                 case -1:
5771                         div_var(&const_one, base, result, rscale, true);
5772                         return;
5773                 case 2:
5774                         mul_var(base, base, result, rscale);
5775                         return;
5776                 default:
5777                         break;
5778         }
5779
5780         /*
5781          * The general case repeatedly multiplies base according to the bit
5782          * pattern of exp.      We do the multiplications with some extra precision.
5783          */
5784         neg = (exp < 0);
5785         exp = Abs(exp);
5786
5787         local_rscale = rscale + MUL_GUARD_DIGITS * 2;
5788
5789         init_var(&base_prod);
5790         set_var_from_var(base, &base_prod);
5791
5792         if (exp & 1)
5793                 set_var_from_var(base, result);
5794         else
5795                 set_var_from_var(&const_one, result);
5796
5797         while ((exp >>= 1) > 0)
5798         {
5799                 mul_var(&base_prod, &base_prod, &base_prod, local_rscale);
5800                 if (exp & 1)
5801                         mul_var(&base_prod, result, result, local_rscale);
5802         }
5803
5804         free_var(&base_prod);
5805
5806         /* Compensate for input sign, and round to requested rscale */
5807         if (neg)
5808                 div_var_fast(&const_one, result, result, rscale, true);
5809         else
5810                 round_var(result, rscale);
5811 }
5812
5813
5814 /* ----------------------------------------------------------------------
5815  *
5816  * Following are the lowest level functions that operate unsigned
5817  * on the variable level
5818  *
5819  * ----------------------------------------------------------------------
5820  */
5821
5822
5823 /* ----------
5824  * cmp_abs() -
5825  *
5826  *      Compare the absolute values of var1 and var2
5827  *      Returns:        -1 for ABS(var1) < ABS(var2)
5828  *                              0  for ABS(var1) == ABS(var2)
5829  *                              1  for ABS(var1) > ABS(var2)
5830  * ----------
5831  */
5832 static int
5833 cmp_abs(NumericVar *var1, NumericVar *var2)
5834 {
5835         return cmp_abs_common(var1->digits, var1->ndigits, var1->weight,
5836                                                   var2->digits, var2->ndigits, var2->weight);
5837 }
5838
5839 /* ----------
5840  * cmp_abs_common() -
5841  *
5842  *      Main routine of cmp_abs(). This function can be used by both
5843  *      NumericVar and Numeric.
5844  * ----------
5845  */
5846 static int
5847 cmp_abs_common(const NumericDigit *var1digits, int var1ndigits, int var1weight,
5848                          const NumericDigit *var2digits, int var2ndigits, int var2weight)
5849 {
5850         int                     i1 = 0;
5851         int                     i2 = 0;
5852
5853         /* Check any digits before the first common digit */
5854
5855         while (var1weight > var2weight && i1 < var1ndigits)
5856         {
5857                 if (var1digits[i1++] != 0)
5858                         return 1;
5859                 var1weight--;
5860         }
5861         while (var2weight > var1weight && i2 < var2ndigits)
5862         {
5863                 if (var2digits[i2++] != 0)
5864                         return -1;
5865                 var2weight--;
5866         }
5867
5868         /* At this point, either w1 == w2 or we've run out of digits */
5869
5870         if (var1weight == var2weight)
5871         {
5872                 while (i1 < var1ndigits && i2 < var2ndigits)
5873                 {
5874                         int                     stat = var1digits[i1++] - var2digits[i2++];
5875
5876                         if (stat)
5877                         {
5878                                 if (stat > 0)
5879                                         return 1;
5880                                 return -1;
5881                         }
5882                 }
5883         }
5884
5885         /*
5886          * At this point, we've run out of digits on one side or the other; so any
5887          * remaining nonzero digits imply that side is larger
5888          */
5889         while (i1 < var1ndigits)
5890         {
5891                 if (var1digits[i1++] != 0)
5892                         return 1;
5893         }
5894         while (i2 < var2ndigits)
5895         {
5896                 if (var2digits[i2++] != 0)
5897                         return -1;
5898         }
5899
5900         return 0;
5901 }
5902
5903
5904 /*
5905  * add_abs() -
5906  *
5907  *      Add the absolute values of two variables into result.
5908  *      result might point to one of the operands without danger.
5909  */
5910 static void
5911 add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5912 {
5913         NumericDigit *res_buf;
5914         NumericDigit *res_digits;
5915         int                     res_ndigits;
5916         int                     res_weight;
5917         int                     res_rscale,
5918                                 rscale1,
5919                                 rscale2;
5920         int                     res_dscale;
5921         int                     i,
5922                                 i1,
5923                                 i2;
5924         int                     carry = 0;
5925
5926         /* copy these values into local vars for speed in inner loop */
5927         int                     var1ndigits = var1->ndigits;
5928         int                     var2ndigits = var2->ndigits;
5929         NumericDigit *var1digits = var1->digits;
5930         NumericDigit *var2digits = var2->digits;
5931
5932         res_weight = Max(var1->weight, var2->weight) + 1;
5933
5934         res_dscale = Max(var1->dscale, var2->dscale);
5935
5936         /* Note: here we are figuring rscale in base-NBASE digits */
5937         rscale1 = var1->ndigits - var1->weight - 1;
5938         rscale2 = var2->ndigits - var2->weight - 1;
5939         res_rscale = Max(rscale1, rscale2);
5940
5941         res_ndigits = res_rscale + res_weight + 1;
5942         if (res_ndigits <= 0)
5943                 res_ndigits = 1;
5944
5945         res_buf = digitbuf_alloc(res_ndigits + 1);
5946         res_buf[0] = 0;                         /* spare digit for later rounding */
5947         res_digits = res_buf + 1;
5948
5949         i1 = res_rscale + var1->weight + 1;
5950         i2 = res_rscale + var2->weight + 1;
5951         for (i = res_ndigits - 1; i >= 0; i--)
5952         {
5953                 i1--;
5954                 i2--;
5955                 if (i1 >= 0 && i1 < var1ndigits)
5956                         carry += var1digits[i1];
5957                 if (i2 >= 0 && i2 < var2ndigits)
5958                         carry += var2digits[i2];
5959
5960                 if (carry >= NBASE)
5961                 {
5962                         res_digits[i] = carry - NBASE;
5963                         carry = 1;
5964                 }
5965                 else
5966                 {
5967                         res_digits[i] = carry;
5968                         carry = 0;
5969                 }
5970         }
5971
5972         Assert(carry == 0);                     /* else we failed to allow for carry out */
5973
5974         digitbuf_free(result->buf);
5975         result->ndigits = res_ndigits;
5976         result->buf = res_buf;
5977         result->digits = res_digits;
5978         result->weight = res_weight;
5979         result->dscale = res_dscale;
5980
5981         /* Remove leading/trailing zeroes */
5982         strip_var(result);
5983 }
5984
5985
5986 /*
5987  * sub_abs()
5988  *
5989  *      Subtract the absolute value of var2 from the absolute value of var1
5990  *      and store in result. result might point to one of the operands
5991  *      without danger.
5992  *
5993  *      ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!!
5994  */
5995 static void
5996 sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
5997 {
5998         NumericDigit *res_buf;
5999         NumericDigit *res_digits;
6000         int                     res_ndigits;
6001         int                     res_weight;
6002         int                     res_rscale,
6003                                 rscale1,
6004                                 rscale2;
6005         int                     res_dscale;
6006         int                     i,
6007                                 i1,
6008                                 i2;
6009         int                     borrow = 0;
6010
6011         /* copy these values into local vars for speed in inner loop */
6012         int                     var1ndigits = var1->ndigits;
6013         int                     var2ndigits = var2->ndigits;
6014         NumericDigit *var1digits = var1->digits;
6015         NumericDigit *var2digits = var2->digits;
6016
6017         res_weight = var1->weight;
6018
6019         res_dscale = Max(var1->dscale, var2->dscale);
6020
6021         /* Note: here we are figuring rscale in base-NBASE digits */
6022         rscale1 = var1->ndigits - var1->weight - 1;
6023         rscale2 = var2->ndigits - var2->weight - 1;
6024         res_rscale = Max(rscale1, rscale2);
6025
6026         res_ndigits = res_rscale + res_weight + 1;
6027         if (res_ndigits <= 0)
6028                 res_ndigits = 1;
6029
6030         res_buf = digitbuf_alloc(res_ndigits + 1);
6031         res_buf[0] = 0;                         /* spare digit for later rounding */
6032         res_digits = res_buf + 1;
6033
6034         i1 = res_rscale + var1->weight + 1;
6035         i2 = res_rscale + var2->weight + 1;
6036         for (i = res_ndigits - 1; i >= 0; i--)
6037         {
6038                 i1--;
6039                 i2--;
6040                 if (i1 >= 0 && i1 < var1ndigits)
6041                         borrow += var1digits[i1];
6042                 if (i2 >= 0 && i2 < var2ndigits)
6043                         borrow -= var2digits[i2];
6044
6045                 if (borrow < 0)
6046                 {
6047                         res_digits[i] = borrow + NBASE;
6048                         borrow = -1;
6049                 }
6050                 else
6051                 {
6052                         res_digits[i] = borrow;
6053                         borrow = 0;
6054                 }
6055         }
6056
6057         Assert(borrow == 0);            /* else caller gave us var1 < var2 */
6058
6059         digitbuf_free(result->buf);
6060         result->ndigits = res_ndigits;
6061         result->buf = res_buf;
6062         result->digits = res_digits;
6063         result->weight = res_weight;
6064         result->dscale = res_dscale;
6065
6066         /* Remove leading/trailing zeroes */
6067         strip_var(result);
6068 }
6069
6070 /*
6071  * round_var
6072  *
6073  * Round the value of a variable to no more than rscale decimal digits
6074  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
6075  * rounding before the decimal point.
6076  */
6077 static void
6078 round_var(NumericVar *var, int rscale)
6079 {
6080         NumericDigit *digits = var->digits;
6081         int                     di;
6082         int                     ndigits;
6083         int                     carry;
6084
6085         var->dscale = rscale;
6086
6087         /* decimal digits wanted */
6088         di = (var->weight + 1) * DEC_DIGITS + rscale;
6089
6090         /*
6091          * If di = 0, the value loses all digits, but could round up to 1 if its
6092          * first extra digit is >= 5.  If di < 0 the result must be 0.
6093          */
6094         if (di < 0)
6095         {
6096                 var->ndigits = 0;
6097                 var->weight = 0;
6098                 var->sign = NUMERIC_POS;
6099         }
6100         else
6101         {
6102                 /* NBASE digits wanted */
6103                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
6104
6105                 /* 0, or number of decimal digits to keep in last NBASE digit */
6106                 di %= DEC_DIGITS;
6107
6108                 if (ndigits < var->ndigits ||
6109                         (ndigits == var->ndigits && di > 0))
6110                 {
6111                         var->ndigits = ndigits;
6112
6113 #if DEC_DIGITS == 1
6114                         /* di must be zero */
6115                         carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
6116 #else
6117                         if (di == 0)
6118                                 carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
6119                         else
6120                         {
6121                                 /* Must round within last NBASE digit */
6122                                 int                     extra,
6123                                                         pow10;
6124
6125 #if DEC_DIGITS == 4
6126                                 pow10 = round_powers[di];
6127 #elif DEC_DIGITS == 2
6128                                 pow10 = 10;
6129 #else
6130 #error unsupported NBASE
6131 #endif
6132                                 extra = digits[--ndigits] % pow10;
6133                                 digits[ndigits] -= extra;
6134                                 carry = 0;
6135                                 if (extra >= pow10 / 2)
6136                                 {
6137                                         pow10 += digits[ndigits];
6138                                         if (pow10 >= NBASE)
6139                                         {
6140                                                 pow10 -= NBASE;
6141                                                 carry = 1;
6142                                         }
6143                                         digits[ndigits] = pow10;
6144                                 }
6145                         }
6146 #endif
6147
6148                         /* Propagate carry if needed */
6149                         while (carry)
6150                         {
6151                                 carry += digits[--ndigits];
6152                                 if (carry >= NBASE)
6153                                 {
6154                                         digits[ndigits] = carry - NBASE;
6155                                         carry = 1;
6156                                 }
6157                                 else
6158                                 {
6159                                         digits[ndigits] = carry;
6160                                         carry = 0;
6161                                 }
6162                         }
6163
6164                         if (ndigits < 0)
6165                         {
6166                                 Assert(ndigits == -1);  /* better not have added > 1 digit */
6167                                 Assert(var->digits > var->buf);
6168                                 var->digits--;
6169                                 var->ndigits++;
6170                                 var->weight++;
6171                         }
6172                 }
6173         }
6174 }
6175
6176 /*
6177  * trunc_var
6178  *
6179  * Truncate (towards zero) the value of a variable at rscale decimal digits
6180  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
6181  * truncation before the decimal point.
6182  */
6183 static void
6184 trunc_var(NumericVar *var, int rscale)
6185 {
6186         int                     di;
6187         int                     ndigits;
6188
6189         var->dscale = rscale;
6190
6191         /* decimal digits wanted */
6192         di = (var->weight + 1) * DEC_DIGITS + rscale;
6193
6194         /*
6195          * If di <= 0, the value loses all digits.
6196          */
6197         if (di <= 0)
6198         {
6199                 var->ndigits = 0;
6200                 var->weight = 0;
6201                 var->sign = NUMERIC_POS;
6202         }
6203         else
6204         {
6205                 /* NBASE digits wanted */
6206                 ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
6207
6208                 if (ndigits <= var->ndigits)
6209                 {
6210                         var->ndigits = ndigits;
6211
6212 #if DEC_DIGITS == 1
6213                         /* no within-digit stuff to worry about */
6214 #else
6215                         /* 0, or number of decimal digits to keep in last NBASE digit */
6216                         di %= DEC_DIGITS;
6217
6218                         if (di > 0)
6219                         {
6220                                 /* Must truncate within last NBASE digit */
6221                                 NumericDigit *digits = var->digits;
6222                                 int                     extra,
6223                                                         pow10;
6224
6225 #if DEC_DIGITS == 4
6226                                 pow10 = round_powers[di];
6227 #elif DEC_DIGITS == 2
6228                                 pow10 = 10;
6229 #else
6230 #error unsupported NBASE
6231 #endif
6232                                 extra = digits[--ndigits] % pow10;
6233                                 digits[ndigits] -= extra;
6234                         }
6235 #endif
6236                 }
6237         }
6238 }
6239
6240 /*
6241  * strip_var
6242  *
6243  * Strip any leading and trailing zeroes from a numeric variable
6244  */
6245 static void
6246 strip_var(NumericVar *var)
6247 {
6248         NumericDigit *digits = var->digits;
6249         int                     ndigits = var->ndigits;
6250
6251         /* Strip leading zeroes */
6252         while (ndigits > 0 && *digits == 0)
6253         {
6254                 digits++;
6255                 var->weight--;
6256                 ndigits--;
6257         }
6258
6259         /* Strip trailing zeroes */
6260         while (ndigits > 0 && digits[ndigits - 1] == 0)
6261                 ndigits--;
6262
6263         /* If it's zero, normalize the sign and weight */
6264         if (ndigits == 0)
6265         {
6266                 var->sign = NUMERIC_POS;
6267                 var->weight = 0;
6268         }
6269
6270         var->digits = digits;
6271         var->ndigits = ndigits;
6272 }