hyperLogLogState abbr_card; /* cardinality estimator */
} NumericSortSupport;
+
+/* ----------
+ * Fast sum accumulator.
+ *
+ * NumericSumAccum is used to implement SUM(), and other standard aggregates
+ * that track the sum of input values. It uses 32-bit integers to store the
+ * digits, instead of the normal 16-bit integers (with NBASE=10000). This
+ * way, we can safely accumulate up to NBASE - 1 values without propagating
+ * carry, before risking overflow of any of the digits. 'num_uncarried'
+ * tracks how many values have been accumulated without propagating carry.
+ *
+ * Positive and negative values are accumulated separately, in 'pos_digits'
+ * and 'neg_digits'. This is simpler and faster than deciding whether to add
+ * or subtract from the current value, for each new value (see sub_var() for
+ * the logic we avoid by doing this). Both buffers are of same size, and
+ * have the same weight and scale. In accum_sum_final(), the positive and
+ * negative sums are added together to produce the final result.
+ *
+ * When a new value has a larger ndigits or weight than the accumulator
+ * currently does, the accumulator is enlarged to accommodate the new value.
+ * We normally have one zero digit reserved for carry propagation, and that
+ * is indicated by the 'have_carry_space' flag. When accum_sum_carry() uses
+ * up the reserved digit, it clears the 'have_carry_space' flag. The next
+ * call to accum_sum_add() will enlarge the buffer, to make room for the
+ * extra digit, and set the flag again.
+ *
+ * To initialize a new accumulator, simply reset all fields to zeros.
+ *
+ * The accumulator does not handle NaNs.
+ * ----------
+ */
+typedef struct NumericSumAccum
+{
+ int ndigits;
+ int weight;
+ int dscale;
+ int num_uncarried;
+ bool have_carry_space;
+ int32 *pos_digits;
+ int32 *neg_digits;
+} NumericSumAccum;
+
+
/*
* We define our own macros for packing and unpacking abbreviated-key
* representations for numeric values in order to avoid depending on
static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
NumericVar *count_var, NumericVar *result_var);
+static void accum_sum_add(NumericSumAccum *accum, NumericVar *var1);
+static void accum_sum_rescale(NumericSumAccum *accum, NumericVar *val);
+static void accum_sum_carry(NumericSumAccum *accum);
+static void accum_sum_reset(NumericSumAccum *accum);
+static void accum_sum_final(NumericSumAccum *accum, NumericVar *result);
+static void accum_sum_copy(NumericSumAccum *dst, NumericSumAccum *src);
+static void accum_sum_combine(NumericSumAccum *accum, NumericSumAccum *accum2);
+
/* ----------------------------------------------------------------------
*
bool calcSumX2; /* if true, calculate sumX2 */
MemoryContext agg_context; /* context we're calculating in */
int64 N; /* count of processed numbers */
- NumericVar sumX; /* sum of processed numbers */
- NumericVar sumX2; /* sum of squares of processed numbers */
+ NumericSumAccum sumX; /* sum of processed numbers */
+ NumericSumAccum sumX2; /* sum of squares of processed numbers */
int maxScale; /* maximum scale seen so far */
int64 maxScaleCount; /* number of values seen with maximum scale */
int64 NaNcount; /* count of NaN values (not included in N!) */
/* The rest of this needs to work in the aggregate context */
old_context = MemoryContextSwitchTo(state->agg_context);
- if (state->N++ > 0)
- {
- /* Accumulate sums */
- add_var(&X, &(state->sumX), &(state->sumX));
+ state->N++;
- if (state->calcSumX2)
- add_var(&X2, &(state->sumX2), &(state->sumX2));
- }
- else
- {
- /* First input, so initialize sums */
- set_var_from_var(&X, &(state->sumX));
+ /* Accumulate sums */
+ accum_sum_add(&(state->sumX), &X);
- if (state->calcSumX2)
- set_var_from_var(&X2, &(state->sumX2));
- }
+ if (state->calcSumX2)
+ accum_sum_add(&(state->sumX2), &X2);
MemoryContextSwitchTo(old_context);
}
if (state->N-- > 1)
{
- /* De-accumulate sums */
- sub_var(&(state->sumX), &X, &(state->sumX));
+ /* Negate X, to subtract it from the sum */
+ X.sign = (X.sign == NUMERIC_POS ? NUMERIC_NEG : NUMERIC_POS);
+ accum_sum_add(&(state->sumX), &X);
if (state->calcSumX2)
- sub_var(&(state->sumX2), &X2, &(state->sumX2));
+ {
+ /* Negate X^2. X^2 is always positive */
+ X2.sign = NUMERIC_NEG;
+ accum_sum_add(&(state->sumX2), &X2);
+ }
}
else
{
- /* Sums will be reset by next call to do_numeric_accum */
+ /* Zero the sums */
Assert(state->N == 0);
+
+ accum_sum_reset(&state->sumX);
+ if (state->calcSumX2)
+ accum_sum_reset(&state->sumX2);
}
MemoryContextSwitchTo(old_context);
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
- init_var(&state1->sumX);
- set_var_from_var(&state2->sumX, &state1->sumX);
-
- init_var(&state1->sumX2);
- set_var_from_var(&state2->sumX2, &state1->sumX2);
+ accum_sum_copy(&state1->sumX, &state2->sumX);
+ accum_sum_copy(&state1->sumX2, &state2->sumX2);
MemoryContextSwitchTo(old_context);
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
- add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
- add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+ accum_sum_combine(&state1->sumX, &state2->sumX);
+ accum_sum_combine(&state1->sumX2, &state2->sumX2);
MemoryContextSwitchTo(old_context);
}
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
- init_var(&state1->sumX);
- set_var_from_var(&state2->sumX, &state1->sumX);
+ accum_sum_copy(&state1->sumX, &state2->sumX);
MemoryContextSwitchTo(old_context);
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
- add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+ accum_sum_combine(&state1->sumX, &state2->sumX);
MemoryContextSwitchTo(old_context);
}
Datum temp;
bytea *sumX;
bytea *result;
+ NumericVar tmp_var;
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
* splitting the tasks in numeric_send into separate functions to stop
* this? Doing so would also remove the fmgr call overhead.
*/
+ init_var(&tmp_var);
+ accum_sum_final(&state->sumX, &tmp_var);
+
temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX)));
+ NumericGetDatum(make_result(&tmp_var)));
sumX = DatumGetByteaP(temp);
+ free_var(&tmp_var);
pq_begintypsend(&buf);
bytea *sstate;
NumericAggState *result;
Datum temp;
+ NumericVar tmp_var;
StringInfoData buf;
if (!AggCheckCallContext(fcinfo, NULL))
PointerGetDatum(&buf),
InvalidOid,
-1);
- set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+ init_var_from_num(DatumGetNumeric(temp), &tmp_var);
+ accum_sum_add(&(result->sumX), &tmp_var);
/* maxScale */
result->maxScale = pq_getmsgint(&buf, 4);
StringInfoData buf;
Datum temp;
bytea *sumX;
+ NumericVar tmp_var;
bytea *sumX2;
bytea *result;
* splitting the tasks in numeric_send into separate functions to stop
* this? Doing so would also remove the fmgr call overhead.
*/
+ init_var(&tmp_var);
+
+ accum_sum_final(&state->sumX, &tmp_var);
temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX)));
+ NumericGetDatum(make_result(&tmp_var)));
sumX = DatumGetByteaP(temp);
+ accum_sum_final(&state->sumX2, &tmp_var);
temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX2)));
+ NumericGetDatum(make_result(&tmp_var)));
sumX2 = DatumGetByteaP(temp);
+ free_var(&tmp_var);
+
pq_begintypsend(&buf);
/* N */
bytea *sstate;
NumericAggState *result;
Datum temp;
+ NumericVar sumX_var;
+ NumericVar sumX2_var;
StringInfoData buf;
if (!AggCheckCallContext(fcinfo, NULL))
PointerGetDatum(&buf),
InvalidOid,
-1);
- set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+ init_var_from_num(DatumGetNumeric(temp), &sumX_var);
+ accum_sum_add(&(result->sumX), &sumX_var);
/* sumX2 */
temp = DirectFunctionCall3(numeric_recv,
PointerGetDatum(&buf),
InvalidOid,
-1);
- set_var_from_num(DatumGetNumeric(temp), &result->sumX2);
+ init_var_from_num(DatumGetNumeric(temp), &sumX2_var);
+ accum_sum_add(&(result->sumX2), &sumX2_var);
/* maxScale */
result->maxScale = pq_getmsgint(&buf, 4);
state1->sumX = state2->sumX;
state1->sumX2 = state2->sumX2;
#else
- init_var(&(state1->sumX));
- set_var_from_var(&(state2->sumX), &(state1->sumX));
-
- init_var(&state1->sumX2);
- set_var_from_var(&(state2->sumX2), &(state1->sumX2));
+ accum_sum_copy(&state2->sumX, &state1->sumX);
+ accum_sum_copy(&state2->sumX2, &state1->sumX2);
#endif
MemoryContextSwitchTo(old_context);
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
- add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
- add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+ accum_sum_combine(&state1->sumX, &state2->sumX);
+ accum_sum_combine(&state1->sumX2, &state2->sumX2);
MemoryContextSwitchTo(old_context);
#endif
*/
{
Datum temp;
-
-#ifdef HAVE_INT128
NumericVar num;
init_var(&num);
+
+#ifdef HAVE_INT128
int128_to_numericvar(state->sumX, &num);
+#else
+ accum_sum_final(&state->sumX, &num);
+#endif
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&num)));
sumX = DatumGetByteaP(temp);
+#ifdef HAVE_INT128
int128_to_numericvar(state->sumX2, &num);
+#else
+ accum_sum_final(&state->sumX2, &num);
+#endif
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&num)));
sumX2 = DatumGetByteaP(temp);
- free_var(&num);
-#else
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX)));
- sumX = DatumGetByteaP(temp);
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX2)));
- sumX2 = DatumGetByteaP(temp);
-#endif
+ free_var(&num);
}
pq_begintypsend(&buf);
bytea *sstate;
PolyNumAggState *result;
Datum sumX;
+ NumericVar sumX_var;
Datum sumX2;
+ NumericVar sumX2_var;
StringInfoData buf;
if (!AggCheckCallContext(fcinfo, NULL))
InvalidOid,
-1);
+ init_var_from_num(DatumGetNumeric(sumX), &sumX_var);
#ifdef HAVE_INT128
- {
- NumericVar num;
-
- init_var(&num);
- set_var_from_num(DatumGetNumeric(sumX), &num);
- numericvar_to_int128(&num, &result->sumX);
-
- set_var_from_num(DatumGetNumeric(sumX2), &num);
- numericvar_to_int128(&num, &result->sumX2);
+ numericvar_to_int128(&sumX_var, &result->sumX);
+#else
+ accum_sum_add(&result->sumX, &sumX_var);
+#endif
- free_var(&num);
- }
+ set_var_from_num(DatumGetNumeric(sumX2), &sumX2_var);
+#ifdef HAVE_INT128
+ numericvar_to_int128(&sumX2_var, &result->sumX2);
#else
- set_var_from_num(DatumGetNumeric(sumX), &result->sumX);
- set_var_from_num(DatumGetNumeric(sumX2), &result->sumX2);
+ accum_sum_add(&result->sumX2, &sumX_var);
#endif
pq_getmsgend(&buf);
#ifdef HAVE_INT128
state1->sumX = state2->sumX;
#else
- init_var(&state1->sumX);
- set_var_from_var(&state2->sumX, &state1->sumX);
+ accum_sum_copy(&state1->sumX, &state2->sumX);
#endif
MemoryContextSwitchTo(old_context);
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
- add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+ accum_sum_combine(&state1->sumX, &state2->sumX);
MemoryContextSwitchTo(old_context);
#endif
*/
{
Datum temp;
-#ifdef HAVE_INT128
NumericVar num;
init_var(&num);
+
+#ifdef HAVE_INT128
int128_to_numericvar(state->sumX, &num);
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&num)));
- free_var(&num);
- sumX = DatumGetByteaP(temp);
#else
+ accum_sum_final(&state->sumX, &num);
+#endif
temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX)));
+ NumericGetDatum(make_result(&num)));
sumX = DatumGetByteaP(temp);
-#endif
+
+ free_var(&num);
}
pq_begintypsend(&buf);
PolyNumAggState *result;
StringInfoData buf;
Datum temp;
+ NumericVar num;
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
PointerGetDatum(&buf),
InvalidOid,
-1);
-
+ init_var_from_num(DatumGetNumeric(temp), &num);
#ifdef HAVE_INT128
- {
- NumericVar num;
-
- init_var(&num);
- set_var_from_num(DatumGetNumeric(temp), &num);
- numericvar_to_int128(&num, &result->sumX);
- free_var(&num);
- }
+ numericvar_to_int128(&num, &result->sumX);
#else
- set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+ accum_sum_add(&result->sumX, &num);
#endif
pq_getmsgend(&buf);
NumericAggState *state;
Datum N_datum;
Datum sumX_datum;
+ NumericVar sumX_var;
state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
PG_RETURN_NUMERIC(make_result(&const_nan));
N_datum = DirectFunctionCall1(int8_numeric, Int64GetDatum(state->N));
- sumX_datum = NumericGetDatum(make_result(&state->sumX));
+
+ init_var(&sumX_var);
+ accum_sum_final(&state->sumX, &sumX_var);
+ sumX_datum = NumericGetDatum(make_result(&sumX_var));
+ free_var(&sumX_var);
PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumX_datum, N_datum));
}
numeric_sum(PG_FUNCTION_ARGS)
{
NumericAggState *state;
+ NumericVar sumX_var;
+ Numeric result;
state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
if (state->NaNcount > 0) /* there was at least one NaN input */
PG_RETURN_NUMERIC(make_result(&const_nan));
- PG_RETURN_NUMERIC(make_result(&(state->sumX)));
+ init_var(&sumX_var);
+ accum_sum_final(&state->sumX, &sumX_var);
+ result = make_result(&sumX_var);
+ free_var(&sumX_var);
+
+ PG_RETURN_NUMERIC(result);
}
/*
init_var(&vsumX2);
int64_to_numericvar(state->N, &vN);
- set_var_from_var(&(state->sumX), &vsumX);
- set_var_from_var(&(state->sumX2), &vsumX2);
+ accum_sum_final(&(state->sumX), &vsumX);
+ accum_sum_final(&(state->sumX2), &vsumX2);
/*
* Sample stddev and variance are undefined when N <= 1; population stddev
NumericAggState numstate;
Numeric res;
- init_var(&numstate.sumX);
- init_var(&numstate.sumX2);
- numstate.NaNcount = 0;
- numstate.agg_context = NULL;
+ /* Initialize an empty agg state */
+ memset(&numstate, 0, sizeof(NumericAggState));
if (state)
{
+ NumericVar tmp_var;
+
numstate.N = state->N;
- int128_to_numericvar(state->sumX, &numstate.sumX);
- int128_to_numericvar(state->sumX2, &numstate.sumX2);
- }
- else
- {
- numstate.N = 0;
+
+ init_var(&tmp_var);
+
+ int128_to_numericvar(state->sumX, &tmp_var);
+ accum_sum_add(&numstate.sumX, &tmp_var);
+
+ int128_to_numericvar(state->sumX2, &tmp_var);
+ accum_sum_add(&numstate.sumX2, &tmp_var);
+
+ free_var(&tmp_var);
}
res = numeric_stddev_internal(&numstate, variance, sample, is_null);
- free_var(&numstate.sumX);
- free_var(&numstate.sumX2);
+ if (numstate.sumX.ndigits > 0)
+ {
+ pfree(numstate.sumX.pos_digits);
+ pfree(numstate.sumX.neg_digits);
+ }
+ if (numstate.sumX2.ndigits > 0)
+ {
+ pfree(numstate.sumX2.pos_digits);
+ pfree(numstate.sumX2.neg_digits);
+ }
return res;
}
var->digits = digits;
var->ndigits = ndigits;
}
+
+
+/* ----------------------------------------------------------------------
+ *
+ * Fast sum accumulator functions
+ *
+ * ----------------------------------------------------------------------
+ */
+
+/*
+ * Reset the accumulator's value to zero. The buffers to hold the digits
+ * are not free'd.
+ */
+static void
+accum_sum_reset(NumericSumAccum *accum)
+{
+ int i;
+
+ accum->dscale = 0;
+ for (i = 0; i < accum->ndigits; i++)
+ {
+ accum->pos_digits[i] = 0;
+ accum->neg_digits[i] = 0;
+ }
+}
+
+/*
+ * Accumulate a new value.
+ */
+static void
+accum_sum_add(NumericSumAccum *accum, NumericVar *val)
+{
+ int32 *accum_digits;
+ int i,
+ val_i;
+ int val_ndigits;
+ NumericDigit *val_digits;
+
+ /*
+ * If we have accumulated too many values since the last carry
+ * propagation, do it now, to avoid overflowing. (We could allow more
+ * than NBASE - 1, if we reserved two extra digits, rather than one, for
+ * carry propagation. But even with NBASE - 1, this needs to be done so
+ * seldom, that the performance difference is negligible.)
+ */
+ if (accum->num_uncarried == NBASE - 1)
+ accum_sum_carry(accum);
+
+ /*
+ * Adjust the weight or scale of the old value, so that it can accommodate
+ * the new value.
+ */
+ accum_sum_rescale(accum, val);
+
+ /* */
+ if (val->sign == NUMERIC_POS)
+ accum_digits = accum->pos_digits;
+ else
+ accum_digits = accum->neg_digits;
+
+ /* copy these values into local vars for speed in loop */
+ val_ndigits = val->ndigits;
+ val_digits = val->digits;
+
+ i = accum->weight - val->weight;
+ for (val_i = 0; val_i < val_ndigits; val_i++)
+ {
+ accum_digits[i] += (int32) val_digits[val_i];
+ i++;
+ }
+
+ accum->num_uncarried++;
+}
+
+/*
+ * Propagate carries.
+ */
+static void
+accum_sum_carry(NumericSumAccum *accum)
+{
+ int i;
+ int ndigits;
+ int32 *dig;
+ int32 carry;
+ int32 newdig = 0;
+
+ /*
+ * If no new values have been added since last carry propagation, nothing
+ * to do.
+ */
+ if (accum->num_uncarried == 0)
+ return;
+
+ /*
+ * We maintain that the weight of the accumulator is always one larger
+ * than needed to hold the current value, before carrying, to make sure
+ * there is enough space for the possible extra digit when carry is
+ * propagated. We cannot expand the buffer here, unless we require
+ * callers of accum_sum_final() to switch to the right memory context.
+ */
+ Assert(accum->pos_digits[0] == 0 && accum->neg_digits[0] == 0);
+
+ ndigits = accum->ndigits;
+
+ /* Propagate carry in the positive sum */
+ dig = accum->pos_digits;
+ carry = 0;
+ for (i = ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ dig[i] = newdig;
+ }
+ /* Did we use up the digit reserved for carry propagation? */
+ if (newdig > 0)
+ accum->have_carry_space = false;
+
+ /* And the same for the negative sum */
+ dig = accum->neg_digits;
+ carry = 0;
+ for (i = ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ dig[i] = newdig;
+ }
+ if (newdig > 0)
+ accum->have_carry_space = false;
+
+ accum->num_uncarried = 0;
+}
+
+/*
+ * Re-scale accumulator to accommodate new value.
+ *
+ * If the new value has more digits than the current digit buffers in the
+ * accumulator, enlarge the buffers.
+ */
+static void
+accum_sum_rescale(NumericSumAccum *accum, NumericVar *val)
+{
+ int old_weight = accum->weight;
+ int old_ndigits = accum->ndigits;
+ int accum_ndigits;
+ int accum_weight;
+ int accum_rscale;
+ int val_rscale;
+
+ accum_weight = old_weight;
+ accum_ndigits = old_ndigits;
+
+ /*
+ * Does the new value have a larger weight? If so, enlarge the buffers,
+ * and shift the existing value to the new weight, by adding leading
+ * zeros.
+ *
+ * We enforce that the accumulator always has a weight one larger than
+ * needed for the inputs, so that we have space for an extra digit at the
+ * final carry-propagation phase, if necessary.
+ */
+ if (val->weight >= accum_weight)
+ {
+ accum_weight = val->weight + 1;
+ accum_ndigits = accum_ndigits + (accum_weight - old_weight);
+ }
+
+ /*
+ * Even though the new value is small, we might've used up the space
+ * reserved for the carry digit in the last call to accum_sum_carry(). If
+ * so, enlarge to make room for another one.
+ */
+ else if (!accum->have_carry_space)
+ {
+ accum_weight++;
+ accum_ndigits++;
+ }
+
+ /* Is the new value wider on the right side? */
+ accum_rscale = accum_ndigits - accum_weight - 1;
+ val_rscale = val->ndigits - val->weight - 1;
+ if (val_rscale > accum_rscale)
+ accum_ndigits = accum_ndigits + (val_rscale - accum_rscale);
+
+ if (accum_ndigits != old_ndigits ||
+ accum_weight != old_weight)
+ {
+ int32 *new_pos_digits;
+ int32 *new_neg_digits;
+ int weightdiff;
+
+ weightdiff = accum_weight - old_weight;
+
+ new_pos_digits = palloc0(accum_ndigits * sizeof(int32));
+ new_neg_digits = palloc0(accum_ndigits * sizeof(int32));
+
+ if (accum->pos_digits)
+ {
+ memcpy(&new_pos_digits[weightdiff], accum->pos_digits,
+ old_ndigits * sizeof(int32));
+ pfree(accum->pos_digits);
+
+ memcpy(&new_neg_digits[weightdiff], accum->neg_digits,
+ old_ndigits * sizeof(int32));
+ pfree(accum->neg_digits);
+ }
+
+ accum->pos_digits = new_pos_digits;
+ accum->neg_digits = new_neg_digits;
+
+ accum->weight = accum_weight;
+ accum->ndigits = accum_ndigits;
+
+ Assert(accum->pos_digits[0] == 0 && accum->neg_digits[0] == 0);
+ accum->have_carry_space = true;
+ }
+
+ if (val->dscale > accum->dscale)
+ accum->dscale = val->dscale;
+}
+
+/*
+ * Return the current value of the accumulator. This perform final carry
+ * propagation, and adds together the positive and negative sums.
+ *
+ * Unlike all the other routines, the caller is not required to switch to
+ * the memory context that holds the accumulator.
+ */
+static void
+accum_sum_final(NumericSumAccum *accum, NumericVar *result)
+{
+ int i;
+ NumericVar pos_var;
+ NumericVar neg_var;
+
+ if (accum->ndigits == 0)
+ {
+ set_var_from_var(&const_zero, result);
+ return;
+ }
+
+ /* Perform final carry */
+ accum_sum_carry(accum);
+
+ /* Create NumericVars representing the positive and negative sums */
+ init_var(&pos_var);
+ init_var(&neg_var);
+
+ pos_var.ndigits = neg_var.ndigits = accum->ndigits;
+ pos_var.weight = neg_var.weight = accum->weight;
+ pos_var.dscale = neg_var.dscale = accum->dscale;
+ pos_var.sign = NUMERIC_POS;
+ neg_var.sign = NUMERIC_NEG;
+
+ pos_var.buf = pos_var.digits = digitbuf_alloc(accum->ndigits);
+ neg_var.buf = neg_var.digits = digitbuf_alloc(accum->ndigits);
+
+ for (i = 0; i < accum->ndigits; i++)
+ {
+ Assert(accum->pos_digits[i] < NBASE);
+ pos_var.digits[i] = (int16) accum->pos_digits[i];
+
+ Assert(accum->neg_digits[i] < NBASE);
+ neg_var.digits[i] = (int16) accum->neg_digits[i];
+ }
+
+ /* And add them together */
+ add_var(&pos_var, &neg_var, result);
+
+ /* Remove leading/trailing zeroes */
+ strip_var(result);
+}
+
+/*
+ * Copy an accumulator's state.
+ *
+ * 'dst' is assumed to be uninitialized beforehand. No attempt is made at
+ * freeing old values.
+ */
+static void
+accum_sum_copy(NumericSumAccum *dst, NumericSumAccum *src)
+{
+ dst->pos_digits = palloc(src->ndigits * sizeof(int32));
+ dst->neg_digits = palloc(src->ndigits * sizeof(int32));
+
+ memcpy(dst->pos_digits, src->pos_digits, src->ndigits * sizeof(int32));
+ memcpy(dst->neg_digits, src->neg_digits, src->ndigits * sizeof(int32));
+ dst->num_uncarried = src->num_uncarried;
+ dst->ndigits = src->ndigits;
+ dst->weight = src->weight;
+ dst->dscale = src->dscale;
+}
+
+/*
+ * Add the current value of 'accum2' into 'accum'.
+ */
+static void
+accum_sum_combine(NumericSumAccum *accum, NumericSumAccum *accum2)
+{
+ NumericVar tmp_var;
+
+ init_var(&tmp_var);
+
+ accum_sum_final(accum2, &tmp_var);
+ accum_sum_add(accum, &tmp_var);
+
+ free_var(&tmp_var);
+}