From 145343534c153d1e6c3cff1fa1855787684d9a38 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Tue, 3 Aug 2010 23:09:29 +0000 Subject: [PATCH] Allow numeric to use a more compact, 2-byte header in many cases. Review by Brendan Jurd and Tom Lane. --- src/backend/utils/adt/numeric.c | 258 ++++++++++++++++++++++++-------- 1 file changed, 192 insertions(+), 66 deletions(-) diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c index 327d5f0ddf..8398bb90e4 100644 --- a/src/backend/utils/adt/numeric.c +++ b/src/backend/utils/adt/numeric.c @@ -14,7 +14,7 @@ * Copyright (c) 1998-2010, PostgreSQL Global Development Group * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.124 2010/07/30 04:30:23 rhaas Exp $ + * $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.125 2010/08/03 23:09:29 rhaas Exp $ * *------------------------------------------------------------------------- */ @@ -35,38 +35,6 @@ #include "utils/int8.h" #include "utils/numeric.h" -/* - * Sign values and macros to deal with packing/unpacking n_sign_dscale - */ -#define NUMERIC_SIGN_MASK 0xC000 -#define NUMERIC_POS 0x0000 -#define NUMERIC_NEG 0x4000 -#define NUMERIC_NAN 0xC000 -#define NUMERIC_DSCALE_MASK 0x3FFF -#define NUMERIC_SIGN(n) ((n)->n_sign_dscale & NUMERIC_SIGN_MASK) -#define NUMERIC_DSCALE(n) ((n)->n_sign_dscale & NUMERIC_DSCALE_MASK) -#define NUMERIC_IS_NAN(n) (NUMERIC_SIGN(n) != NUMERIC_POS && \ - NUMERIC_SIGN(n) != NUMERIC_NEG) -#define NUMERIC_HDRSZ (VARHDRSZ + sizeof(uint16) + sizeof(int16)) - - -/* - * The Numeric data type stored in the database - * - * NOTE: by convention, values in the packed form have been stripped of - * all leading and trailing zero digits (where a "digit" is of base NBASE). - * In particular, if the value is zero, there will be no digits at all! - * The weight is arbitrary in that case, but we normally set it to zero. - */ -struct NumericData -{ - int32 vl_len_; /* varlena header (do not touch directly!) */ - uint16 n_sign_dscale; /* Sign + display scale */ - int16 n_weight; /* Weight of 1st digit */ - char n_data[1]; /* Digits (really array of NumericDigit) */ -}; - - /* ---------- * Uncomment the following to enable compilation of dump_numeric() * and dump_var() and to get a dump of any result produced by make_result(). @@ -120,6 +88,122 @@ typedef signed char NumericDigit; typedef int16 NumericDigit; #endif +/* + * The Numeric type as stored on disk. + * + * If the high bits of the first word of a NumericChoice (n_header, or + * n_short.n_header, or n_long.n_sign_dscale) are NUMERIC_SHORT, then the + * numeric follows the NumericShort format; if they are NUMERIC_POS or + * NUMERIC_NEG, it follows the NumericLong format. If they are NUMERIC_NAN, + * it is a NaN. We currently always store a NaN using just two bytes (i.e. + * only n_header), but previous releases used only the NumericLong format, + * so we might find 4-byte NaNs on disk if a database has been migrated using + * pg_upgrade. In either case, when the high bits indicate a NaN, the + * remaining bits are never examined. Currently, we always initialize these + * to zero, but it might be possible to use them for some other purpose in + * the future. + * + * In the NumericShort format, the remaining 14 bits of the header word + * (n_short.n_header) are allocated as follows: 1 for sign (positive or + * negative), 6 for dynamic scale, and 7 for weight. In practice, most + * commonly-encountered values can be represented this way. + * + * In the NumericLong format, the remaining 14 bits of the header word + * (n_long.n_sign_dscale) represent the display scale; and the weight is + * stored separately in n_weight. + * + * NOTE: by convention, values in the packed form have been stripped of + * all leading and trailing zero digits (where a "digit" is of base NBASE). + * In particular, if the value is zero, there will be no digits at all! + * The weight is arbitrary in that case, but we normally set it to zero. + */ + +struct NumericShort +{ + uint16 n_header; /* Sign + display scale + weight */ + NumericDigit n_data[1]; /* Digits */ +}; + +struct NumericLong +{ + uint16 n_sign_dscale; /* Sign + display scale */ + int16 n_weight; /* Weight of 1st digit */ + NumericDigit n_data[1]; /* Digits */ +}; + +union NumericChoice +{ + uint16 n_header; /* Header word */ + struct NumericLong n_long; /* Long form (4-byte header) */ + struct NumericShort n_short; /* Short form (2-byte header) */ +}; + +struct NumericData +{ + int32 vl_len_; /* varlena header (do not touch directly!) */ + union NumericChoice choice; /* choice of format */ +}; + + +/* + * Interpretation of high bits. + */ + +#define NUMERIC_SIGN_MASK 0xC000 +#define NUMERIC_POS 0x0000 +#define NUMERIC_NEG 0x4000 +#define NUMERIC_SHORT 0x8000 +#define NUMERIC_NAN 0xC000 + +#define NUMERIC_FLAGBITS(n) ((n)->choice.n_header & NUMERIC_SIGN_MASK) +#define NUMERIC_IS_NAN(n) (NUMERIC_FLAGBITS(n) == NUMERIC_NAN) +#define NUMERIC_IS_SHORT(n) (NUMERIC_FLAGBITS(n) == NUMERIC_SHORT) + +#define NUMERIC_HDRSZ (VARHDRSZ + sizeof(uint16) + sizeof(int16)) +#define NUMERIC_HDRSZ_SHORT (VARHDRSZ + sizeof(uint16)) + +/* + * If the flag bits are NUMERIC_SHORT or NUMERIC_NAN, we want the short header; + * otherwise, we want the long one. Instead of testing against each value, we + * can just look at the high bit, for a slight efficiency gain. + */ +#define NUMERIC_HEADER_SIZE(n) \ + (VARHDRSZ + sizeof(uint16) + \ + (((NUMERIC_FLAGBITS(n) & 0x8000) == 0) ? sizeof(int16) : 0)) + +/* + * Short format definitions. + */ + +#define NUMERIC_SHORT_SIGN_MASK 0x2000 +#define NUMERIC_SHORT_DSCALE_MASK 0x1F80 +#define NUMERIC_SHORT_DSCALE_SHIFT 7 +#define NUMERIC_SHORT_DSCALE_MAX \ + (NUMERIC_SHORT_DSCALE_MASK >> NUMERIC_SHORT_DSCALE_SHIFT) +#define NUMERIC_SHORT_WEIGHT_SIGN_MASK 0x0040 +#define NUMERIC_SHORT_WEIGHT_MASK 0x003F +#define NUMERIC_SHORT_WEIGHT_MAX NUMERIC_SHORT_WEIGHT_MASK +#define NUMERIC_SHORT_WEIGHT_MIN (-(NUMERIC_SHORT_WEIGHT_MASK+1)) + +/* + * Extract sign, display scale, weight. + */ + +#define NUMERIC_DSCALE_MASK 0x3FFF + +#define NUMERIC_SIGN(n) \ + (NUMERIC_IS_SHORT(n) ? \ + (((n)->choice.n_short.n_header & NUMERIC_SHORT_SIGN_MASK) ? \ + NUMERIC_NEG : NUMERIC_POS) : NUMERIC_FLAGBITS(n)) +#define NUMERIC_DSCALE(n) (NUMERIC_IS_SHORT((n)) ? \ + ((n)->choice.n_short.n_header & NUMERIC_SHORT_DSCALE_MASK) \ + >> NUMERIC_SHORT_DSCALE_SHIFT \ + : ((n)->choice.n_long.n_sign_dscale & NUMERIC_DSCALE_MASK)) +#define NUMERIC_WEIGHT(n) (NUMERIC_IS_SHORT((n)) ? \ + (((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_SIGN_MASK ? \ + ~NUMERIC_SHORT_WEIGHT_MASK : 0) \ + | ((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_MASK)) \ + : ((n)->choice.n_long.n_weight)) /* ---------- * NumericVar is the format we use for arithmetic. The digit-array part @@ -266,9 +350,14 @@ static void dump_var(const char *str, NumericVar *var); #define init_var(v) MemSetAligned(v, 0, sizeof(NumericVar)) -#define NUMERIC_DIGITS(num) ((NumericDigit *)(num)->n_data) +#define NUMERIC_DIGITS(num) (NUMERIC_IS_SHORT(num) ? \ + (num)->choice.n_short.n_data : (num)->choice.n_long.n_data) #define NUMERIC_NDIGITS(num) \ - ((VARSIZE(num) - NUMERIC_HDRSZ) / sizeof(NumericDigit)) + ((VARSIZE(num) - NUMERIC_HEADER_SIZE(num)) / sizeof(NumericDigit)) +#define NUMERIC_CAN_BE_SHORT(scale,weight) \ + ((scale) <= NUMERIC_SHORT_DSCALE_MAX && \ + (weight) <= NUMERIC_SHORT_WEIGHT_MAX && \ + (weight) >= NUMERIC_SHORT_WEIGHT_MIN) static void alloc_var(NumericVar *var, int ndigits); static void free_var(NumericVar *var); @@ -652,15 +741,23 @@ numeric (PG_FUNCTION_ARGS) /* * If the number is certainly in bounds and due to the target scale no * rounding could be necessary, just make a copy of the input and modify - * its scale fields. (Note we assume the existing dscale is honest...) + * its scale fields, unless the larger scale forces us to abandon the + * short representation. (Note we assume the existing dscale is honest...) */ - ddigits = (num->n_weight + 1) * DEC_DIGITS; - if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num)) + ddigits = (NUMERIC_WEIGHT(num) + 1) * DEC_DIGITS; + if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num) + && (NUMERIC_CAN_BE_SHORT(scale, NUMERIC_WEIGHT(num)) + || !NUMERIC_IS_SHORT(num))) { new = (Numeric) palloc(VARSIZE(num)); memcpy(new, num, VARSIZE(num)); - new->n_sign_dscale = NUMERIC_SIGN(new) | - ((uint16) scale & NUMERIC_DSCALE_MASK); + if (NUMERIC_IS_SHORT(num)) + new->choice.n_short.n_header = + (num->choice.n_short.n_header & ~NUMERIC_SHORT_DSCALE_MASK) + | (scale << NUMERIC_SHORT_DSCALE_SHIFT); + else + new->choice.n_long.n_sign_dscale = NUMERIC_SIGN(new) | + ((uint16) scale & NUMERIC_DSCALE_MASK); PG_RETURN_NUMERIC(new); } @@ -766,7 +863,11 @@ numeric_abs(PG_FUNCTION_ARGS) res = (Numeric) palloc(VARSIZE(num)); memcpy(res, num, VARSIZE(num)); - res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num); + if (NUMERIC_IS_SHORT(num)) + res->choice.n_short.n_header = + num->choice.n_short.n_header & ~NUMERIC_SHORT_SIGN_MASK; + else + res->choice.n_long.n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num); PG_RETURN_NUMERIC(res); } @@ -795,13 +896,18 @@ numeric_uminus(PG_FUNCTION_ARGS) * we can identify a ZERO by the fact that there are no digits at all. Do * nothing to a zero. */ - if (VARSIZE(num) != NUMERIC_HDRSZ) + if (NUMERIC_NDIGITS(num) != 0) { /* Else, flip the sign */ - if (NUMERIC_SIGN(num) == NUMERIC_POS) - res->n_sign_dscale = NUMERIC_NEG | NUMERIC_DSCALE(num); + if (NUMERIC_IS_SHORT(num)) + res->choice.n_short.n_header = + num->choice.n_short.n_header ^ NUMERIC_SHORT_SIGN_MASK; + else if (NUMERIC_SIGN(num) == NUMERIC_POS) + res->choice.n_long.n_sign_dscale = + NUMERIC_NEG | NUMERIC_DSCALE(num); else - res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num); + res->choice.n_long.n_sign_dscale = + NUMERIC_POS | NUMERIC_DSCALE(num); } PG_RETURN_NUMERIC(res); @@ -845,7 +951,7 @@ numeric_sign(PG_FUNCTION_ARGS) * The packed format is known to be totally zero digit trimmed always. So * we can identify a ZERO by the fact that there are no digits at all. */ - if (VARSIZE(num) == NUMERIC_HDRSZ) + if (NUMERIC_NDIGITS(num) == 0) set_var_from_var(&const_zero, &result); else { @@ -1283,9 +1389,9 @@ cmp_numerics(Numeric num1, Numeric num2) else { result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1), - num1->n_weight, NUMERIC_SIGN(num1), + NUMERIC_WEIGHT(num1), NUMERIC_SIGN(num1), NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2), - num2->n_weight, NUMERIC_SIGN(num2)); + NUMERIC_WEIGHT(num2), NUMERIC_SIGN(num2)); } return result; @@ -1302,12 +1408,13 @@ hash_numeric(PG_FUNCTION_ARGS) int end_offset; int i; int hash_len; + NumericDigit *digits; /* If it's NaN, don't try to hash the rest of the fields */ if (NUMERIC_IS_NAN(key)) PG_RETURN_UINT32(0); - weight = key->n_weight; + weight = NUMERIC_WEIGHT(key); start_offset = 0; end_offset = 0; @@ -1317,9 +1424,10 @@ hash_numeric(PG_FUNCTION_ARGS) * zeros are suppressed, but we're paranoid. Note that we measure the * starting and ending offsets in units of NumericDigits, not bytes. */ + digits = NUMERIC_DIGITS(key); for (i = 0; i < NUMERIC_NDIGITS(key); i++) { - if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0) + if (digits[i] != (NumericDigit) 0) break; start_offset++; @@ -1340,7 +1448,7 @@ hash_numeric(PG_FUNCTION_ARGS) for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--) { - if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0) + if (digits[i] != (NumericDigit) 0) break; end_offset++; @@ -2536,7 +2644,7 @@ numeric_avg(PG_FUNCTION_ARGS) /* SQL92 defines AVG of no values to be NULL */ /* N is zero iff no digits (cf. numeric_uminus) */ - if (VARSIZE(N) == NUMERIC_HDRSZ) + if (NUMERIC_NDIGITS(N) == 0) PG_RETURN_NULL(); PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, @@ -2974,7 +3082,8 @@ dump_numeric(const char *str, Numeric num) ndigits = NUMERIC_NDIGITS(num); - printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num)); + printf("%s: NUMERIC w=%d d=%d ", str, + NUMERIC_WEIGHT(num), NUMERIC_DSCALE(num)); switch (NUMERIC_SIGN(num)) { case NUMERIC_POS: @@ -3265,11 +3374,11 @@ set_var_from_num(Numeric num, NumericVar *dest) alloc_var(dest, ndigits); - dest->weight = num->n_weight; + dest->weight = NUMERIC_WEIGHT(num); dest->sign = NUMERIC_SIGN(num); dest->dscale = NUMERIC_DSCALE(num); - memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit)); + memcpy(dest->digits, NUMERIC_DIGITS(num), ndigits * sizeof(NumericDigit)); } @@ -3561,11 +3670,11 @@ make_result(NumericVar *var) if (sign == NUMERIC_NAN) { - result = (Numeric) palloc(NUMERIC_HDRSZ); + result = (Numeric) palloc(NUMERIC_HDRSZ_SHORT); - SET_VARSIZE(result, NUMERIC_HDRSZ); - result->n_weight = 0; - result->n_sign_dscale = NUMERIC_NAN; + SET_VARSIZE(result, NUMERIC_HDRSZ_SHORT); + result->choice.n_header = NUMERIC_NAN; + /* the header word is all we need */ dump_numeric("make_result()", result); return result; @@ -3592,16 +3701,33 @@ make_result(NumericVar *var) } /* Build the result */ - len = NUMERIC_HDRSZ + n * sizeof(NumericDigit); - result = (Numeric) palloc(len); - SET_VARSIZE(result, len); - result->n_weight = weight; - result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK); + if (NUMERIC_CAN_BE_SHORT(var->dscale, weight)) + { + len = NUMERIC_HDRSZ_SHORT + n * sizeof(NumericDigit); + result = (Numeric) palloc(len); + SET_VARSIZE(result, len); + result->choice.n_short.n_header = + (sign == NUMERIC_NEG ? (NUMERIC_SHORT | NUMERIC_SHORT_SIGN_MASK) + : NUMERIC_SHORT) + | (var->dscale << NUMERIC_SHORT_DSCALE_SHIFT) + | (weight < 0 ? NUMERIC_SHORT_WEIGHT_SIGN_MASK : 0) + | (weight & NUMERIC_SHORT_WEIGHT_MASK); + } + else + { + len = NUMERIC_HDRSZ + n * sizeof(NumericDigit); + result = (Numeric) palloc(len); + SET_VARSIZE(result, len); + result->choice.n_long.n_sign_dscale = + sign | (var->dscale & NUMERIC_DSCALE_MASK); + result->choice.n_long.n_weight = weight; + } - memcpy(result->n_data, digits, n * sizeof(NumericDigit)); + memcpy(NUMERIC_DIGITS(result), digits, n * sizeof(NumericDigit)); + Assert(NUMERIC_NDIGITS(result) == n); /* Check for overflow of int16 fields */ - if (result->n_weight != weight || + if (NUMERIC_WEIGHT(result) != weight || NUMERIC_DSCALE(result) != var->dscale) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), -- 2.40.0