]> granicus.if.org Git - postgresql/commitdiff
Allow numeric to use a more compact, 2-byte header in many cases.
authorRobert Haas <rhaas@postgresql.org>
Tue, 3 Aug 2010 23:09:29 +0000 (23:09 +0000)
committerRobert Haas <rhaas@postgresql.org>
Tue, 3 Aug 2010 23:09:29 +0000 (23:09 +0000)
Review by Brendan Jurd and Tom Lane.

src/backend/utils/adt/numeric.c

index 327d5f0ddf7488e2bfa9179e5b027a1017e883b9..8398bb90e4f1e9550d8126fdfab142d1eb16b264 100644 (file)
@@ -14,7 +14,7 @@
  * Copyright (c) 1998-2010, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.124 2010/07/30 04:30:23 rhaas Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.125 2010/08/03 23:09:29 rhaas Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "utils/int8.h"
 #include "utils/numeric.h"
 
-/*
- * Sign values and macros to deal with packing/unpacking n_sign_dscale
- */
-#define NUMERIC_SIGN_MASK      0xC000
-#define NUMERIC_POS                    0x0000
-#define NUMERIC_NEG                    0x4000
-#define NUMERIC_NAN                    0xC000
-#define NUMERIC_DSCALE_MASK 0x3FFF
-#define NUMERIC_SIGN(n)                ((n)->n_sign_dscale & NUMERIC_SIGN_MASK)
-#define NUMERIC_DSCALE(n)      ((n)->n_sign_dscale & NUMERIC_DSCALE_MASK)
-#define NUMERIC_IS_NAN(n)      (NUMERIC_SIGN(n) != NUMERIC_POS &&      \
-                                                        NUMERIC_SIGN(n) != NUMERIC_NEG)
-#define NUMERIC_HDRSZ  (VARHDRSZ + sizeof(uint16) + sizeof(int16))
-
-
-/*
- * The Numeric data type stored in the database
- *
- * NOTE: by convention, values in the packed form have been stripped of
- * all leading and trailing zero digits (where a "digit" is of base NBASE).
- * In particular, if the value is zero, there will be no digits at all!
- * The weight is arbitrary in that case, but we normally set it to zero.
- */
-struct NumericData
-{
-       int32           vl_len_;                /* varlena header (do not touch directly!) */
-       uint16          n_sign_dscale;  /* Sign + display scale */
-       int16           n_weight;               /* Weight of 1st digit  */
-       char            n_data[1];              /* Digits (really array of NumericDigit) */
-};
-
-
 /* ----------
  * Uncomment the following to enable compilation of dump_numeric()
  * and dump_var() and to get a dump of any result produced by make_result().
@@ -120,6 +88,122 @@ typedef signed char NumericDigit;
 typedef int16 NumericDigit;
 #endif
 
+/*
+ * The Numeric type as stored on disk.
+ *
+ * If the high bits of the first word of a NumericChoice (n_header, or
+ * n_short.n_header, or n_long.n_sign_dscale) are NUMERIC_SHORT, then the
+ * numeric follows the NumericShort format; if they are NUMERIC_POS or
+ * NUMERIC_NEG, it follows the NumericLong format.  If they are NUMERIC_NAN,
+ * it is a NaN.  We currently always store a NaN using just two bytes (i.e.
+ * only n_header), but previous releases used only the NumericLong format,
+ * so we might find 4-byte NaNs on disk if a database has been migrated using
+ * pg_upgrade.  In either case, when the high bits indicate a NaN, the
+ * remaining bits are never examined.  Currently, we always initialize these
+ * to zero, but it might be possible to use them for some other purpose in
+ * the future.
+ * 
+ * In the NumericShort format, the remaining 14 bits of the header word
+ * (n_short.n_header) are allocated as follows: 1 for sign (positive or
+ * negative), 6 for dynamic scale, and 7 for weight.  In practice, most
+ * commonly-encountered values can be represented this way.
+ *
+ * In the NumericLong format, the remaining 14 bits of the header word
+ * (n_long.n_sign_dscale) represent the display scale; and the weight is
+ * stored separately in n_weight.
+ *
+ * NOTE: by convention, values in the packed form have been stripped of
+ * all leading and trailing zero digits (where a "digit" is of base NBASE).
+ * In particular, if the value is zero, there will be no digits at all!
+ * The weight is arbitrary in that case, but we normally set it to zero.
+ */
+
+struct NumericShort
+{
+       uint16          n_header;               /* Sign + display scale + weight */
+       NumericDigit n_data[1];         /* Digits */
+};
+
+struct NumericLong
+{
+       uint16          n_sign_dscale;  /* Sign + display scale */
+       int16           n_weight;               /* Weight of 1st digit  */
+       NumericDigit n_data[1];         /* Digits */
+};
+
+union NumericChoice
+{
+       uint16          n_header;               /* Header word */
+       struct NumericLong      n_long; /* Long form (4-byte header) */
+       struct NumericShort     n_short;        /* Short form (2-byte header) */
+};
+
+struct NumericData
+{
+       int32           vl_len_;                /* varlena header (do not touch directly!) */
+       union NumericChoice     choice; /* choice of format */
+};
+
+
+/*
+ * Interpretation of high bits.
+ */
+
+#define NUMERIC_SIGN_MASK      0xC000
+#define NUMERIC_POS                    0x0000
+#define NUMERIC_NEG                    0x4000
+#define NUMERIC_SHORT          0x8000
+#define NUMERIC_NAN                    0xC000
+
+#define NUMERIC_FLAGBITS(n) ((n)->choice.n_header & NUMERIC_SIGN_MASK)
+#define NUMERIC_IS_NAN(n)              (NUMERIC_FLAGBITS(n) == NUMERIC_NAN)
+#define NUMERIC_IS_SHORT(n)            (NUMERIC_FLAGBITS(n) == NUMERIC_SHORT)
+
+#define NUMERIC_HDRSZ  (VARHDRSZ + sizeof(uint16) + sizeof(int16))
+#define NUMERIC_HDRSZ_SHORT    (VARHDRSZ + sizeof(uint16))
+
+/*
+ * If the flag bits are NUMERIC_SHORT or NUMERIC_NAN, we want the short header;
+ * otherwise, we want the long one.  Instead of testing against each value, we
+ * can just look at the high bit, for a slight efficiency gain.
+ */
+#define NUMERIC_HEADER_SIZE(n) \
+       (VARHDRSZ + sizeof(uint16) + \
+               (((NUMERIC_FLAGBITS(n) & 0x8000) == 0) ? sizeof(int16) : 0))
+
+/*
+ * Short format definitions.
+ */
+
+#define NUMERIC_SHORT_SIGN_MASK                        0x2000
+#define NUMERIC_SHORT_DSCALE_MASK              0x1F80
+#define NUMERIC_SHORT_DSCALE_SHIFT             7
+#define NUMERIC_SHORT_DSCALE_MAX               \
+       (NUMERIC_SHORT_DSCALE_MASK >> NUMERIC_SHORT_DSCALE_SHIFT)
+#define NUMERIC_SHORT_WEIGHT_SIGN_MASK 0x0040
+#define NUMERIC_SHORT_WEIGHT_MASK              0x003F
+#define NUMERIC_SHORT_WEIGHT_MAX               NUMERIC_SHORT_WEIGHT_MASK
+#define NUMERIC_SHORT_WEIGHT_MIN               (-(NUMERIC_SHORT_WEIGHT_MASK+1))
+
+/*
+ * Extract sign, display scale, weight.
+ */
+
+#define NUMERIC_DSCALE_MASK            0x3FFF
+
+#define NUMERIC_SIGN(n) \
+       (NUMERIC_IS_SHORT(n) ? \
+               (((n)->choice.n_short.n_header & NUMERIC_SHORT_SIGN_MASK) ? \
+               NUMERIC_NEG : NUMERIC_POS) : NUMERIC_FLAGBITS(n))
+#define NUMERIC_DSCALE(n)      (NUMERIC_IS_SHORT((n)) ? \
+       ((n)->choice.n_short.n_header & NUMERIC_SHORT_DSCALE_MASK) \
+               >> NUMERIC_SHORT_DSCALE_SHIFT \
+       : ((n)->choice.n_long.n_sign_dscale & NUMERIC_DSCALE_MASK))
+#define NUMERIC_WEIGHT(n)      (NUMERIC_IS_SHORT((n)) ? \
+       (((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_SIGN_MASK ? \
+               ~NUMERIC_SHORT_WEIGHT_MASK : 0) \
+        | ((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_MASK)) \
+       : ((n)->choice.n_long.n_weight))
 
 /* ----------
  * NumericVar is the format we use for arithmetic.     The digit-array part
@@ -266,9 +350,14 @@ static void dump_var(const char *str, NumericVar *var);
 
 #define init_var(v)            MemSetAligned(v, 0, sizeof(NumericVar))
 
-#define NUMERIC_DIGITS(num) ((NumericDigit *)(num)->n_data)
+#define NUMERIC_DIGITS(num) (NUMERIC_IS_SHORT(num) ? \
+       (num)->choice.n_short.n_data : (num)->choice.n_long.n_data)
 #define NUMERIC_NDIGITS(num) \
-       ((VARSIZE(num) - NUMERIC_HDRSZ) / sizeof(NumericDigit))
+       ((VARSIZE(num) - NUMERIC_HEADER_SIZE(num)) / sizeof(NumericDigit))
+#define NUMERIC_CAN_BE_SHORT(scale,weight) \
+       ((scale) <= NUMERIC_SHORT_DSCALE_MAX && \
+       (weight) <= NUMERIC_SHORT_WEIGHT_MAX && \
+       (weight) >= NUMERIC_SHORT_WEIGHT_MIN)
 
 static void alloc_var(NumericVar *var, int ndigits);
 static void free_var(NumericVar *var);
@@ -652,15 +741,23 @@ numeric           (PG_FUNCTION_ARGS)
        /*
         * If the number is certainly in bounds and due to the target scale no
         * rounding could be necessary, just make a copy of the input and modify
-        * its scale fields.  (Note we assume the existing dscale is honest...)
+        * its scale fields, unless the larger scale forces us to abandon the
+        * short representation.  (Note we assume the existing dscale is honest...)
         */
-       ddigits = (num->n_weight + 1) * DEC_DIGITS;
-       if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num))
+       ddigits = (NUMERIC_WEIGHT(num) + 1) * DEC_DIGITS;
+       if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num)
+               && (NUMERIC_CAN_BE_SHORT(scale, NUMERIC_WEIGHT(num))
+               || !NUMERIC_IS_SHORT(num)))
        {
                new = (Numeric) palloc(VARSIZE(num));
                memcpy(new, num, VARSIZE(num));
-               new->n_sign_dscale = NUMERIC_SIGN(new) |
-                       ((uint16) scale & NUMERIC_DSCALE_MASK);
+               if (NUMERIC_IS_SHORT(num))
+                       new->choice.n_short.n_header =
+                               (num->choice.n_short.n_header & ~NUMERIC_SHORT_DSCALE_MASK)
+                               | (scale << NUMERIC_SHORT_DSCALE_SHIFT);
+               else
+                       new->choice.n_long.n_sign_dscale = NUMERIC_SIGN(new) |
+                               ((uint16) scale & NUMERIC_DSCALE_MASK);
                PG_RETURN_NUMERIC(new);
        }
 
@@ -766,7 +863,11 @@ numeric_abs(PG_FUNCTION_ARGS)
        res = (Numeric) palloc(VARSIZE(num));
        memcpy(res, num, VARSIZE(num));
 
-       res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
+       if (NUMERIC_IS_SHORT(num))
+               res->choice.n_short.n_header =
+                       num->choice.n_short.n_header & ~NUMERIC_SHORT_SIGN_MASK;
+       else
+               res->choice.n_long.n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
 
        PG_RETURN_NUMERIC(res);
 }
@@ -795,13 +896,18 @@ numeric_uminus(PG_FUNCTION_ARGS)
         * we can identify a ZERO by the fact that there are no digits at all.  Do
         * nothing to a zero.
         */
-       if (VARSIZE(num) != NUMERIC_HDRSZ)
+       if (NUMERIC_NDIGITS(num) != 0)
        {
                /* Else, flip the sign */
-               if (NUMERIC_SIGN(num) == NUMERIC_POS)
-                       res->n_sign_dscale = NUMERIC_NEG | NUMERIC_DSCALE(num);
+               if (NUMERIC_IS_SHORT(num))
+                       res->choice.n_short.n_header =
+                               num->choice.n_short.n_header ^ NUMERIC_SHORT_SIGN_MASK;
+               else if (NUMERIC_SIGN(num) == NUMERIC_POS)
+                       res->choice.n_long.n_sign_dscale =
+                               NUMERIC_NEG | NUMERIC_DSCALE(num);
                else
-                       res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
+                       res->choice.n_long.n_sign_dscale =
+                               NUMERIC_POS | NUMERIC_DSCALE(num);
        }
 
        PG_RETURN_NUMERIC(res);
@@ -845,7 +951,7 @@ numeric_sign(PG_FUNCTION_ARGS)
         * The packed format is known to be totally zero digit trimmed always. So
         * we can identify a ZERO by the fact that there are no digits at all.
         */
-       if (VARSIZE(num) == NUMERIC_HDRSZ)
+       if (NUMERIC_NDIGITS(num) == 0)
                set_var_from_var(&const_zero, &result);
        else
        {
@@ -1283,9 +1389,9 @@ cmp_numerics(Numeric num1, Numeric num2)
        else
        {
                result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
-                                                               num1->n_weight, NUMERIC_SIGN(num1),
+                                                               NUMERIC_WEIGHT(num1), NUMERIC_SIGN(num1),
                                                                NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
-                                                               num2->n_weight, NUMERIC_SIGN(num2));
+                                                               NUMERIC_WEIGHT(num2), NUMERIC_SIGN(num2));
        }
 
        return result;
@@ -1302,12 +1408,13 @@ hash_numeric(PG_FUNCTION_ARGS)
        int                     end_offset;
        int                     i;
        int                     hash_len;
+       NumericDigit   *digits;
 
        /* If it's NaN, don't try to hash the rest of the fields */
        if (NUMERIC_IS_NAN(key))
                PG_RETURN_UINT32(0);
 
-       weight = key->n_weight;
+       weight = NUMERIC_WEIGHT(key);
        start_offset = 0;
        end_offset = 0;
 
@@ -1317,9 +1424,10 @@ hash_numeric(PG_FUNCTION_ARGS)
         * zeros are suppressed, but we're paranoid. Note that we measure the
         * starting and ending offsets in units of NumericDigits, not bytes.
         */
+       digits = NUMERIC_DIGITS(key);
        for (i = 0; i < NUMERIC_NDIGITS(key); i++)
        {
-               if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
+               if (digits[i] != (NumericDigit) 0)
                        break;
 
                start_offset++;
@@ -1340,7 +1448,7 @@ hash_numeric(PG_FUNCTION_ARGS)
 
        for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--)
        {
-               if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
+               if (digits[i] != (NumericDigit) 0)
                        break;
 
                end_offset++;
@@ -2536,7 +2644,7 @@ numeric_avg(PG_FUNCTION_ARGS)
 
        /* SQL92 defines AVG of no values to be NULL */
        /* N is zero iff no digits (cf. numeric_uminus) */
-       if (VARSIZE(N) == NUMERIC_HDRSZ)
+       if (NUMERIC_NDIGITS(N) == 0)
                PG_RETURN_NULL();
 
        PG_RETURN_DATUM(DirectFunctionCall2(numeric_div,
@@ -2974,7 +3082,8 @@ dump_numeric(const char *str, Numeric num)
 
        ndigits = NUMERIC_NDIGITS(num);
 
-       printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num));
+       printf("%s: NUMERIC w=%d d=%d ", str,
+                  NUMERIC_WEIGHT(num), NUMERIC_DSCALE(num));
        switch (NUMERIC_SIGN(num))
        {
                case NUMERIC_POS:
@@ -3265,11 +3374,11 @@ set_var_from_num(Numeric num, NumericVar *dest)
 
        alloc_var(dest, ndigits);
 
-       dest->weight = num->n_weight;
+       dest->weight = NUMERIC_WEIGHT(num);
        dest->sign = NUMERIC_SIGN(num);
        dest->dscale = NUMERIC_DSCALE(num);
 
-       memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit));
+       memcpy(dest->digits, NUMERIC_DIGITS(num), ndigits * sizeof(NumericDigit));
 }
 
 
@@ -3561,11 +3670,11 @@ make_result(NumericVar *var)
 
        if (sign == NUMERIC_NAN)
        {
-               result = (Numeric) palloc(NUMERIC_HDRSZ);
+               result = (Numeric) palloc(NUMERIC_HDRSZ_SHORT);
 
-               SET_VARSIZE(result, NUMERIC_HDRSZ);
-               result->n_weight = 0;
-               result->n_sign_dscale = NUMERIC_NAN;
+               SET_VARSIZE(result, NUMERIC_HDRSZ_SHORT);
+               result->choice.n_header = NUMERIC_NAN;
+               /* the header word is all we need */
 
                dump_numeric("make_result()", result);
                return result;
@@ -3592,16 +3701,33 @@ make_result(NumericVar *var)
        }
 
        /* Build the result */
-       len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
-       result = (Numeric) palloc(len);
-       SET_VARSIZE(result, len);
-       result->n_weight = weight;
-       result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK);
+       if (NUMERIC_CAN_BE_SHORT(var->dscale, weight))
+       {
+               len = NUMERIC_HDRSZ_SHORT + n * sizeof(NumericDigit);
+               result = (Numeric) palloc(len);
+               SET_VARSIZE(result, len);
+               result->choice.n_short.n_header = 
+                       (sign == NUMERIC_NEG ? (NUMERIC_SHORT | NUMERIC_SHORT_SIGN_MASK)
+                               : NUMERIC_SHORT)
+                       | (var->dscale << NUMERIC_SHORT_DSCALE_SHIFT)
+                       | (weight < 0 ? NUMERIC_SHORT_WEIGHT_SIGN_MASK : 0)
+                       | (weight & NUMERIC_SHORT_WEIGHT_MASK);
+       }
+       else
+       {
+               len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
+               result = (Numeric) palloc(len);
+               SET_VARSIZE(result, len);
+               result->choice.n_long.n_sign_dscale =
+                       sign | (var->dscale & NUMERIC_DSCALE_MASK);
+               result->choice.n_long.n_weight = weight;
+       }
 
-       memcpy(result->n_data, digits, n * sizeof(NumericDigit));
+       memcpy(NUMERIC_DIGITS(result), digits, n * sizeof(NumericDigit));
+       Assert(NUMERIC_NDIGITS(result) == n);
 
        /* Check for overflow of int16 fields */
-       if (result->n_weight != weight ||
+       if (NUMERIC_WEIGHT(result) != weight ||
                NUMERIC_DSCALE(result) != var->dscale)
                ereport(ERROR,
                                (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),