#define RANGESTRAT_CONTAINS_ELEM 16
#define RANGESTRAT_EQ 18
-/* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */
-#define rangeCopy(r) \
- ((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \
- false, -1)))
+/*
+ * Range class properties used to segregate different classes of ranges in
+ * GiST. Each unique combination of properties is a class. CLS_EMPTY cannot
+ * be combined with anything else.
+ */
+#define CLS_NORMAL 0 /* Ordinary finite range (no bits set) */
+#define CLS_LOWER_INF 1 /* Lower bound is infinity */
+#define CLS_UPPER_INF 2 /* Upper bound is infinity */
+#define CLS_CONTAIN_EMPTY 4 /* Contains underlying empty ranges */
+#define CLS_EMPTY 8 /* Special class for empty ranges */
+
+#define CLS_COUNT 9 /* # of classes; includes all combinations of
+ * properties. CLS_EMPTY doesn't combine with
+ * anything else, so it's only 2^3 + 1. */
+
+/*
+ * Minimum accepted ratio of split for items of the same class. If the items
+ * are of different classes, we will separate along those lines regardless of
+ * the ratio.
+ */
+#define LIMIT_RATIO 0.3
+
+/* Constants for fixed penalty values */
+#define INFINITE_BOUND_PENALTY 2.0
+#define CONTAIN_EMPTY_PENALTY 1.0
+#define DEFAULT_SUBTYPE_DIFF_PENALTY 1.0
/*
- * Auxiliary structure for picksplit method.
+ * Per-item data for range_gist_single_sorting_split.
*/
typedef struct
{
- int index; /* original index in entryvec->vector[] */
- RangeType *data; /* range value to sort */
- TypeCacheEntry *typcache; /* range type's info */
-} PickSplitSortItem;
+ int index;
+ RangeBound bound;
+} SingleBoundSortItem;
+
+/* place on left or right side of split? */
+typedef enum
+{
+ SPLIT_LEFT = 0, /* makes initialization to SPLIT_LEFT easier */
+ SPLIT_RIGHT
+} SplitLR;
+
+/*
+ * Context for range_gist_consider_split.
+ */
+typedef struct
+{
+ TypeCacheEntry *typcache; /* typcache for range type */
+ bool has_subtype_diff; /* does it have subtype_diff? */
+ int entries_count; /* total number of entries being split */
+
+ /* Information about currently selected split follows */
+
+ bool first; /* true if no split was selected yet */
+
+ RangeBound *left_upper; /* upper bound of left interval */
+ RangeBound *right_lower; /* lower bound of right interval */
+
+ float4 ratio; /* split ratio */
+ float4 overlap; /* overlap between left and right predicate */
+ int common_left; /* # common entries destined for each side */
+ int common_right;
+} ConsiderSplitContext;
+
+/*
+ * Bounds extracted from a non-empty range, for use in
+ * range_gist_double_sorting_split.
+ */
+typedef struct
+{
+ RangeBound lower;
+ RangeBound upper;
+} NonEmptyRange;
+
+/*
+ * Represents information about an entry that can be placed in either group
+ * without affecting overlap over selected axis ("common entry").
+ */
+typedef struct
+{
+ /* Index of entry in the initial array */
+ int index;
+ /* Delta between closeness of range to each of the two groups */
+ double delta;
+} CommonEntry;
+
+/* Helper macros to place an entry in the left or right group during split */
+/* Note direct access to variables v, typcache, left_range, right_range */
+#define PLACE_LEFT(range, off) \
+ do { \
+ if (v->spl_nleft > 0) \
+ left_range = range_super_union(typcache, left_range, range); \
+ else \
+ left_range = (range); \
+ v->spl_left[v->spl_nleft++] = (off); \
+ } while(0)
+
+#define PLACE_RIGHT(range, off) \
+ do { \
+ if (v->spl_nright > 0) \
+ right_range = range_super_union(typcache, right_range, range); \
+ else \
+ right_range = (range); \
+ v->spl_right[v->spl_nright++] = (off); \
+ } while(0)
+
+/* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */
+#define rangeCopy(r) \
+ ((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \
+ false, -1)))
static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType * r1,
RangeType * r2);
static bool range_gist_consistent_leaf(FmgrInfo *flinfo,
StrategyNumber strategy, RangeType *key,
Datum query);
-static int sort_item_cmp(const void *a, const void *b);
+static void range_gist_fallback_split(TypeCacheEntry *typcache,
+ GistEntryVector *entryvec,
+ GIST_SPLITVEC *v);
+static void range_gist_class_split(TypeCacheEntry *typcache,
+ GistEntryVector *entryvec,
+ GIST_SPLITVEC *v,
+ SplitLR *classes_groups);
+static void range_gist_single_sorting_split(TypeCacheEntry *typcache,
+ GistEntryVector *entryvec,
+ GIST_SPLITVEC *v,
+ bool use_upper_bound);
+static void range_gist_double_sorting_split(TypeCacheEntry *typcache,
+ GistEntryVector *entryvec,
+ GIST_SPLITVEC *v);
+static void range_gist_consider_split(ConsiderSplitContext *context,
+ RangeBound *right_lower, int min_left_count,
+ RangeBound *left_upper, int max_left_count);
+static int get_gist_range_class(RangeType *range);
+static int single_bound_cmp(const void *a, const void *b, void *arg);
+static int interval_cmp_lower(const void *a, const void *b, void *arg);
+static int interval_cmp_upper(const void *a, const void *b, void *arg);
+static int common_entry_cmp(const void *i1, const void *i2);
+static float8 call_subtype_diff(TypeCacheEntry *typcache,
+ Datum val1, Datum val2);
/* GiST query consistency check */
PG_RETURN_POINTER(entry);
}
-/* page split penalty function */
+/*
+ * GiST page split penalty function.
+ *
+ * The penalty function has the following goals (in order from most to least
+ * important):
+ * - Keep normal ranges separate
+ * - Avoid broadening the class of the original predicate
+ * - Avoid broadening (as determined by subtype_diff) the original predicate
+ * - Favor adding ranges to narrower original predicates
+ */
Datum
range_gist_penalty(PG_FUNCTION_ARGS)
{
RangeType *orig = DatumGetRangeType(origentry->key);
RangeType *new = DatumGetRangeType(newentry->key);
TypeCacheEntry *typcache;
- RangeType *s_union;
- FmgrInfo *subtype_diff;
- RangeBound lower1,
- lower2;
- RangeBound upper1,
- upper2;
- bool empty1,
- empty2;
- float8 lower_diff,
- upper_diff;
+ bool has_subtype_diff;
+ RangeBound orig_lower,
+ new_lower,
+ orig_upper,
+ new_upper;
+ bool orig_empty,
+ new_empty;
if (RangeTypeGetOid(orig) != RangeTypeGetOid(new))
elog(ERROR, "range types do not match");
typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig));
- subtype_diff = &typcache->rng_subdiff_finfo;
+ has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
- /*
- * If new is or contains empty, and orig doesn't, apply infinite penalty.
- * We really don't want to pollute an empty-free subtree with empties.
- */
- if (RangeIsOrContainsEmpty(new) && !RangeIsOrContainsEmpty(orig))
- {
- *penalty = get_float4_infinity();
- PG_RETURN_POINTER(penalty);
- }
+ range_deserialize(typcache, orig, &orig_lower, &orig_upper, &orig_empty);
+ range_deserialize(typcache, new, &new_lower, &new_upper, &new_empty);
/*
- * We want to compare the size of "orig" to size of "orig union new".
- * The penalty will be the sum of the reduction in the lower bound plus
- * the increase in the upper bound.
+ * Distinct branches for handling distinct classes of ranges. Note
+ * that penalty values only need to be commensurate within the same
+ * class of new range.
*/
- s_union = range_super_union(typcache, orig, new);
-
- range_deserialize(typcache, orig, &lower1, &upper1, &empty1);
- range_deserialize(typcache, s_union, &lower2, &upper2, &empty2);
-
- /* handle cases where orig is empty */
- if (empty1 && empty2)
+ if (new_empty)
{
- *penalty = 0;
- PG_RETURN_POINTER(penalty);
+ /* Handle insertion of empty range */
+ if (orig_empty)
+ {
+ /*
+ * The best case is to insert it to empty original
+ * range. Insertion here means no broadening of original range.
+ * Also original range is the most narrow.
+ */
+ *penalty = 0.0;
+ }
+ else if (RangeIsOrContainsEmpty(orig))
+ {
+ /*
+ * The second case is to insert empty range into range which
+ * contains at least one underlying empty range. There is still
+ * no broadening of original range, but original range is not as
+ * narrow as possible.
+ */
+ *penalty = CONTAIN_EMPTY_PENALTY;
+ }
+ else if (orig_lower.infinite && orig_upper.infinite)
+ {
+ /*
+ * Original range requires broadening. (-inf; +inf) is most far
+ * from normal range in this case.
+ */
+ *penalty = 2 * CONTAIN_EMPTY_PENALTY;
+ }
+ else if (orig_lower.infinite || orig_upper.infinite)
+ {
+ /*
+ * (-inf, x) or (x, +inf) original ranges are closer to normal
+ * ranges, so it's worse to mix it with empty ranges.
+ */
+ *penalty = 3 * CONTAIN_EMPTY_PENALTY;
+ }
+ else
+ {
+ /*
+ * The least preferred case is broadening of normal range.
+ */
+ *penalty = 4 * CONTAIN_EMPTY_PENALTY;
+ }
}
- else if (empty1)
+ else if (new_lower.infinite && new_upper.infinite)
{
- /* infinite penalty for pushing non-empty into all-empty subtree */
- *penalty = get_float4_infinity();
- PG_RETURN_POINTER(penalty);
- }
-
- /* if orig isn't empty, s_union can't be either */
- Assert(!empty2);
-
- /* similarly, if orig's lower bound is infinite, s_union's must be too */
- Assert(lower2.infinite || !lower1.infinite);
+ /* Handle insertion of (-inf, +inf) range */
+ if (orig_lower.infinite && orig_upper.infinite)
+ {
+ /*
+ * Best case is inserting to (-inf, +inf) original range.
+ */
+ *penalty = 0.0;
+ }
+ else if (orig_lower.infinite || orig_upper.infinite)
+ {
+ /*
+ * When original range is (-inf, x) or (x, +inf) it requires
+ * broadening of original range (extension of one bound to
+ * infinity).
+ */
+ *penalty = INFINITE_BOUND_PENALTY;
+ }
+ else
+ {
+ /*
+ * Insertion to normal original range is least preferred.
+ */
+ *penalty = 2 * INFINITE_BOUND_PENALTY;
+ }
- if (lower2.infinite && lower1.infinite)
- lower_diff = 0;
- else if (lower2.infinite)
- lower_diff = get_float8_infinity();
- else if (OidIsValid(subtype_diff->fn_oid))
- {
- lower_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
- typcache->rng_collation,
- lower1.val,
- lower2.val));
- /* orig's lower bound must be >= s_union's */
- if (lower_diff < 0)
- lower_diff = 0; /* subtype_diff is broken */
+ if (RangeIsOrContainsEmpty(orig))
+ {
+ /*
+ * Original range is narrower when it doesn't contain empty ranges.
+ * Add additional penalty otherwise.
+ */
+ *penalty += CONTAIN_EMPTY_PENALTY;
+ }
}
- else
+ else if (new_lower.infinite)
{
- /* only know whether there is a difference or not */
- lower_diff = range_cmp_bounds(typcache, &lower1, &lower2) > 0 ? 1 : 0;
+ /* Handle insertion of (-inf, x) range */
+ if (!orig_empty && orig_lower.infinite)
+ {
+ if (orig_upper.infinite)
+ {
+ /*
+ * (-inf, +inf) range won't be extended by insertion of
+ * (-inf, x) range. It's a less desirable case than insertion
+ * to (-inf, y) original range without extension, because in
+ * that case original range is narrower. But we can't express
+ * that in single float value.
+ */
+ *penalty = 0.0;
+ }
+ else
+ {
+ if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
+ {
+ /*
+ * Get extension of original range using subtype_diff.
+ * Use constant if subtype_diff unavailable.
+ */
+ if (has_subtype_diff)
+ *penalty = call_subtype_diff(typcache,
+ new_upper.val,
+ orig_upper.val);
+ else
+ *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
+ }
+ else
+ {
+ /* No extension of original range */
+ *penalty = 0.0;
+ }
+ }
+ }
+ else
+ {
+ /*
+ * If lower bound of original range is not -inf, then extension
+ * of it is infinity.
+ */
+ *penalty = get_float4_infinity();
+ }
}
-
- /* similarly, if orig's upper bound is infinite, s_union's must be too */
- Assert(upper2.infinite || !upper1.infinite);
-
- if (upper2.infinite && upper1.infinite)
- upper_diff = 0;
- else if (upper2.infinite)
- upper_diff = get_float8_infinity();
- else if (OidIsValid(subtype_diff->fn_oid))
+ else if (new_upper.infinite)
{
- upper_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
- typcache->rng_collation,
- upper2.val,
- upper1.val));
- /* orig's upper bound must be <= s_union's */
- if (upper_diff < 0)
- upper_diff = 0; /* subtype_diff is broken */
+ /* Handle insertion of (x, +inf) range */
+ if (!orig_empty && orig_upper.infinite)
+ {
+ if (orig_lower.infinite)
+ {
+ /*
+ * (-inf, +inf) range won't be extended by insertion of
+ * (x, +inf) range. It's a less desirable case than insertion
+ * to (y, +inf) original range without extension, because in
+ * that case original range is narrower. But we can't express
+ * that in single float value.
+ */
+ *penalty = 0.0;
+ }
+ else
+ {
+ if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
+ {
+ /*
+ * Get extension of original range using subtype_diff.
+ * Use constant if subtype_diff unavailable.
+ */
+ if (has_subtype_diff)
+ *penalty = call_subtype_diff(typcache,
+ orig_lower.val,
+ new_lower.val);
+ else
+ *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
+ }
+ else
+ {
+ /* No extension of original range */
+ *penalty = 0.0;
+ }
+ }
+ }
+ else
+ {
+ /*
+ * If upper bound of original range is not +inf, then extension
+ * of it is infinity.
+ */
+ *penalty = get_float4_infinity();
+ }
}
else
{
- /* only know whether there is a difference or not */
- upper_diff = range_cmp_bounds(typcache, &upper2, &upper1) > 0 ? 1 : 0;
+ /* Handle insertion of normal (non-empty, non-infinite) range */
+ if (orig_empty || orig_lower.infinite || orig_upper.infinite)
+ {
+ /*
+ * Avoid mixing normal ranges with infinite and empty ranges.
+ */
+ *penalty = get_float4_infinity();
+ }
+ else
+ {
+ /*
+ * Calculate extension of original range by calling subtype_diff.
+ * Use constant if subtype_diff unavailable.
+ */
+ float8 diff = 0.0;
+
+ if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
+ {
+ if (has_subtype_diff)
+ diff += call_subtype_diff(typcache,
+ orig_lower.val,
+ new_lower.val);
+ else
+ diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
+ }
+ if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
+ {
+ if (has_subtype_diff)
+ diff += call_subtype_diff(typcache,
+ new_upper.val,
+ orig_upper.val);
+ else
+ diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
+ }
+ *penalty = diff;
+ }
}
- Assert(lower_diff >= 0 && upper_diff >= 0);
-
- *penalty = (float) (lower_diff + upper_diff);
PG_RETURN_POINTER(penalty);
}
/*
* The GiST PickSplit method for ranges
*
- * Algorithm based on sorting. Incoming array of ranges is sorted using
- * sort_item_cmp function. After that first half of ranges goes to the left
- * output, and the second half of ranges goes to the right output.
+ * Primarily, we try to segregate ranges of different classes. If splitting
+ * ranges of the same class, use the appropriate split method for that class.
*/
Datum
range_gist_picksplit(PG_FUNCTION_ARGS)
TypeCacheEntry *typcache;
OffsetNumber i;
RangeType *pred_left;
- RangeType *pred_right;
- PickSplitSortItem *sortItems;
int nbytes;
- OffsetNumber split_idx;
- OffsetNumber *left;
- OffsetNumber *right;
OffsetNumber maxoff;
+ int count_in_classes[CLS_COUNT];
+ int j;
+ int non_empty_classes_count = 0;
+ int biggest_class = -1;
+ int biggest_class_count = 0;
+ int total_count;
/* use first item to look up range type's info */
pred_left = DatumGetRangeType(entryvec->vector[FirstOffsetNumber].key);
typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left));
- /* allocate result and work arrays */
maxoff = entryvec->n - 1;
nbytes = (maxoff + 1) * sizeof(OffsetNumber);
v->spl_left = (OffsetNumber *) palloc(nbytes);
v->spl_right = (OffsetNumber *) palloc(nbytes);
- sortItems = (PickSplitSortItem *) palloc(maxoff * sizeof(PickSplitSortItem));
/*
- * Prepare auxiliary array and sort the values.
+ * Get count distribution of range classes.
*/
+ memset(count_in_classes, 0, sizeof(count_in_classes));
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
- sortItems[i - 1].index = i;
- sortItems[i - 1].data = DatumGetRangeType(entryvec->vector[i].key);
- sortItems[i - 1].typcache = typcache;
- }
- qsort(sortItems, maxoff, sizeof(PickSplitSortItem), sort_item_cmp);
-
- split_idx = maxoff / 2;
+ RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
- left = v->spl_left;
- v->spl_nleft = 0;
- right = v->spl_right;
- v->spl_nright = 0;
+ count_in_classes[get_gist_range_class(range)]++;
+ }
/*
- * First half of items goes to the left output.
+ * Count non-empty classes and find biggest class.
*/
- pred_left = sortItems[0].data;
- *left++ = sortItems[0].index;
- v->spl_nleft++;
- for (i = 1; i < split_idx; i++)
+ total_count = maxoff;
+ for (j = 0; j < CLS_COUNT; j++)
{
- pred_left = range_super_union(typcache, pred_left, sortItems[i].data);
- *left++ = sortItems[i].index;
- v->spl_nleft++;
+ if (count_in_classes[j] > 0)
+ {
+ if (count_in_classes[j] > biggest_class_count)
+ {
+ biggest_class_count = count_in_classes[j];
+ biggest_class = j;
+ }
+ non_empty_classes_count++;
+ }
}
- /*
- * Second half of items goes to the right output.
- */
- pred_right = sortItems[split_idx].data;
- *right++ = sortItems[split_idx].index;
- v->spl_nright++;
- for (i = split_idx + 1; i < maxoff; i++)
+ Assert(non_empty_classes_count > 0);
+
+ if (non_empty_classes_count == 1)
{
- pred_right = range_super_union(typcache, pred_right, sortItems[i].data);
- *right++ = sortItems[i].index;
- v->spl_nright++;
+ /* One non-empty class, so split inside class */
+ if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_NORMAL)
+ {
+ /* double sorting split for normal ranges */
+ range_gist_double_sorting_split(typcache, entryvec, v);
+ }
+ else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_LOWER_INF)
+ {
+ /* upper bound sorting split for (-inf, x) ranges */
+ range_gist_single_sorting_split(typcache, entryvec, v, true);
+ }
+ else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_UPPER_INF)
+ {
+ /* lower bound sorting split for (x, +inf) ranges */
+ range_gist_single_sorting_split(typcache, entryvec, v, false);
+ }
+ else
+ {
+ /* trivial split for all (-inf, +inf) or all empty ranges */
+ range_gist_fallback_split(typcache, entryvec, v);
+ }
}
+ else
+ {
+ /*
+ * Class based split.
+ *
+ * To which side of the split should each class go? Initialize them
+ * all to go to the left side.
+ */
+ SplitLR classes_groups[CLS_COUNT];
- *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
+ memset(classes_groups, 0, sizeof(classes_groups));
- v->spl_ldatum = RangeTypeGetDatum(pred_left);
- v->spl_rdatum = RangeTypeGetDatum(pred_right);
+ if (count_in_classes[CLS_NORMAL] > 0)
+ {
+ /* separate normal ranges if any */
+ classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
+ }
+ else
+ {
+ /*----------
+ * Try to split classes in one of two ways:
+ * 1) containing infinities - not containing infinities
+ * 2) containing empty - not containing empty
+ *
+ * Select the way which balances the ranges between left and right
+ * the best. If split in these ways is not possible, there are at
+ * most 3 classes, so just separate biggest class.
+ *----------
+ */
+ int infCount, nonInfCount;
+ int emptyCount, nonEmptyCount;
+
+ nonInfCount =
+ count_in_classes[CLS_NORMAL] +
+ count_in_classes[CLS_CONTAIN_EMPTY] +
+ count_in_classes[CLS_EMPTY];
+ infCount = total_count - nonInfCount;
+
+ nonEmptyCount =
+ count_in_classes[CLS_NORMAL] +
+ count_in_classes[CLS_LOWER_INF] +
+ count_in_classes[CLS_UPPER_INF] +
+ count_in_classes[CLS_LOWER_INF | CLS_UPPER_INF];
+ emptyCount = total_count - nonEmptyCount;
+
+ if (infCount > 0 && nonInfCount > 0 &&
+ (Abs(infCount - nonInfCount) <=
+ Abs(emptyCount - nonEmptyCount)))
+ {
+ classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
+ classes_groups[CLS_CONTAIN_EMPTY] = SPLIT_RIGHT;
+ classes_groups[CLS_EMPTY] = SPLIT_RIGHT;
+ }
+ else if (emptyCount > 0 && nonEmptyCount > 0)
+ {
+ classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
+ classes_groups[CLS_LOWER_INF] = SPLIT_RIGHT;
+ classes_groups[CLS_UPPER_INF] = SPLIT_RIGHT;
+ classes_groups[CLS_LOWER_INF | CLS_UPPER_INF] = SPLIT_RIGHT;
+ }
+ else
+ {
+ /*
+ * Either total_count == emptyCount or total_count == infCount.
+ */
+ classes_groups[biggest_class] = SPLIT_RIGHT;
+ }
+ }
+
+ range_gist_class_split(typcache, entryvec, v, classes_groups);
+ }
PG_RETURN_POINTER(v);
}
}
/*
- * Compare function for PickSplitSortItem. This is actually the
- * interesting part of the picksplit algorithm.
+ * Trivial split: half of entries will be placed on one page
+ * and the other half on the other page.
+ */
+static void
+range_gist_fallback_split(TypeCacheEntry *typcache,
+ GistEntryVector *entryvec,
+ GIST_SPLITVEC *v)
+{
+ RangeType *left_range = NULL;
+ RangeType *right_range = NULL;
+ OffsetNumber i, maxoff, split_idx;
+
+ maxoff = entryvec->n - 1;
+ /* Split entries before this to left page, after to right: */
+ split_idx = (maxoff - FirstOffsetNumber) / 2 + FirstOffsetNumber;
+
+ v->spl_nleft = 0;
+ v->spl_nright = 0;
+ for (i = FirstOffsetNumber; i <= maxoff; i++)
+ {
+ RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
+
+ if (i < split_idx)
+ PLACE_LEFT(range, i);
+ else
+ PLACE_RIGHT(range, i);
+ }
+
+ v->spl_ldatum = RangeTypeGetDatum(left_range);
+ v->spl_rdatum = RangeTypeGetDatum(right_range);
+}
+
+/*
+ * Split based on classes of ranges.
*
- * We want to separate out empty ranges, bounded ranges, and unbounded
- * ranges. We assume that "contains" and "overlaps" are the most
- * important queries, so empty ranges will rarely match and unbounded
- * ranges frequently will. Bounded ranges should be in the middle.
+ * See get_gist_range_class for class definitions.
+ * classes_groups is an array of length CLS_COUNT indicating the side of the
+ * split to which each class should go.
+ */
+static void
+range_gist_class_split(TypeCacheEntry *typcache,
+ GistEntryVector *entryvec,
+ GIST_SPLITVEC *v,
+ SplitLR *classes_groups)
+{
+ RangeType *left_range = NULL;
+ RangeType *right_range = NULL;
+ OffsetNumber i, maxoff;
+
+ maxoff = entryvec->n - 1;
+
+ v->spl_nleft = 0;
+ v->spl_nright = 0;
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ {
+ RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
+ int class;
+
+ /* Get class of range */
+ class = get_gist_range_class(range);
+
+ /* Place range to appropriate page */
+ if (classes_groups[class] == SPLIT_LEFT)
+ PLACE_LEFT(range, i);
+ else
+ {
+ Assert(classes_groups[class] == SPLIT_RIGHT);
+ PLACE_RIGHT(range, i);
+ }
+ }
+
+ v->spl_ldatum = RangeTypeGetDatum(left_range);
+ v->spl_rdatum = RangeTypeGetDatum(right_range);
+}
+
+/*
+ * Sorting based split. First half of entries according to the sort will be
+ * placed to one page, and second half of entries will be placed to other
+ * page. use_upper_bound parameter indicates whether to use upper or lower
+ * bound for sorting.
+ */
+static void
+range_gist_single_sorting_split(TypeCacheEntry *typcache,
+ GistEntryVector *entryvec,
+ GIST_SPLITVEC *v,
+ bool use_upper_bound)
+{
+ SingleBoundSortItem *sortItems;
+ RangeType *left_range = NULL;
+ RangeType *right_range = NULL;
+ OffsetNumber i, maxoff, split_idx;
+
+ maxoff = entryvec->n - 1;
+
+ sortItems = (SingleBoundSortItem *)
+ palloc(maxoff * sizeof(SingleBoundSortItem));
+
+ /*
+ * Prepare auxiliary array and sort the values.
+ */
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ {
+ RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
+ RangeBound bound2;
+ bool empty;
+
+ sortItems[i - 1].index = i;
+ /* Put appropriate bound into array */
+ if (use_upper_bound)
+ range_deserialize(typcache, range, &bound2,
+ &sortItems[i - 1].bound, &empty);
+ else
+ range_deserialize(typcache, range, &sortItems[i - 1].bound,
+ &bound2, &empty);
+ Assert(!empty);
+ }
+
+ qsort_arg(sortItems, maxoff, sizeof(SingleBoundSortItem),
+ single_bound_cmp, typcache);
+
+ split_idx = maxoff / 2;
+
+ v->spl_nleft = 0;
+ v->spl_nright = 0;
+
+ for (i = 0; i < maxoff; i++)
+ {
+ int idx = sortItems[i].index;
+ RangeType *range = DatumGetRangeType(entryvec->vector[idx].key);
+
+ if (i < split_idx)
+ PLACE_LEFT(range, idx);
+ else
+ PLACE_RIGHT(range, idx);
+ }
+
+ v->spl_ldatum = RangeTypeGetDatum(left_range);
+ v->spl_rdatum = RangeTypeGetDatum(right_range);
+}
+
+/*
+ * Double sorting split algorithm.
+ *
+ * The algorithm considers dividing ranges into two groups. The first (left)
+ * group contains general left bound. The second (right) group contains
+ * general right bound. The challenge is to find upper bound of left group
+ * and lower bound of right group so that overlap of groups is minimal and
+ * ratio of distribution is acceptable. Algorithm finds for each lower bound of
+ * right group minimal upper bound of left group, and for each upper bound of
+ * left group maximal lower bound of right group. For each found pair
+ * range_gist_consider_split considers replacement of currently selected
+ * split with the new one.
+ *
+ * After that, all the entries are divided into three groups:
+ * 1) Entries which should be placed to the left group
+ * 2) Entries which should be placed to the right group
+ * 3) "Common entries" which can be placed to either group without affecting
+ * amount of overlap.
*
- * Empty ranges we push all the way to the left, then bounded ranges
- * (sorted on lower bound, then upper), then ranges with no lower
- * bound, then ranges with no upper bound; and finally, ranges with no
- * upper or lower bound all the way to the right.
+ * The common ranges are distributed by difference of distance from lower
+ * bound of common range to lower bound of right group and distance from upper
+ * bound of common range to upper bound of left group.
+ *
+ * For details see:
+ * "A new double sorting-based node splitting algorithm for R-tree",
+ * A. Korotkov
+ * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
*/
-static int
-sort_item_cmp(const void *a, const void *b)
+static void
+range_gist_double_sorting_split(TypeCacheEntry *typcache,
+ GistEntryVector *entryvec,
+ GIST_SPLITVEC *v)
{
- PickSplitSortItem *i1 = (PickSplitSortItem *) a;
- PickSplitSortItem *i2 = (PickSplitSortItem *) b;
- RangeType *r1 = i1->data;
- RangeType *r2 = i2->data;
- TypeCacheEntry *typcache = i1->typcache;
- RangeBound lower1,
- lower2;
- RangeBound upper1,
- upper2;
- bool empty1,
- empty2;
- int cmp;
+ ConsiderSplitContext context;
+ OffsetNumber i, maxoff;
+ RangeType *range,
+ *left_range = NULL,
+ *right_range = NULL;
+ int common_entries_count;
+ NonEmptyRange *by_lower,
+ *by_upper;
+ CommonEntry *common_entries;
+ int nentries, i1, i2;
+ RangeBound *right_lower, *left_upper;
+
+ memset(&context, 0, sizeof(ConsiderSplitContext));
+ context.typcache = typcache;
+ context.has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
- range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
- range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
+ maxoff = entryvec->n - 1;
+ nentries = context.entries_count = maxoff - FirstOffsetNumber + 1;
+ context.first = true;
+
+ /* Allocate arrays for sorted range bounds */
+ by_lower = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
+ by_upper = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
+
+ /* Fill arrays of bounds */
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ {
+ RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
+ bool empty;
+
+ range_deserialize(typcache, range,
+ &by_lower[i - FirstOffsetNumber].lower,
+ &by_lower[i - FirstOffsetNumber].upper,
+ &empty);
+ Assert(!empty);
+ }
+
+ /*
+ * Make two arrays of range bounds: one sorted by lower bound and another
+ * sorted by upper bound.
+ */
+ memcpy(by_upper, by_lower, nentries * sizeof(NonEmptyRange));
+ qsort_arg(by_lower, nentries, sizeof(NonEmptyRange),
+ interval_cmp_lower, typcache);
+ qsort_arg(by_upper, nentries, sizeof(NonEmptyRange),
+ interval_cmp_upper, typcache);
+
+ /*----------
+ * The goal is to form a left and right range, so that every entry
+ * range is contained by either left or right interval (or both).
+ *
+ * For example, with the ranges (0,1), (1,3), (2,3), (2,4):
+ *
+ * 0 1 2 3 4
+ * +-+
+ * +---+
+ * +-+
+ * +---+
+ *
+ * The left and right ranges are of the form (0,a) and (b,4).
+ * We first consider splits where b is the lower bound of an entry.
+ * We iterate through all entries, and for each b, calculate the
+ * smallest possible a. Then we consider splits where a is the
+ * upper bound of an entry, and for each a, calculate the greatest
+ * possible b.
+ *
+ * In the above example, the first loop would consider splits:
+ * b=0: (0,1)-(0,4)
+ * b=1: (0,1)-(1,4)
+ * b=2: (0,3)-(2,4)
+ *
+ * And the second loop:
+ * a=1: (0,1)-(1,4)
+ * a=3: (0,3)-(2,4)
+ * a=4: (0,4)-(2,4)
+ *----------
+ */
+
+ /*
+ * Iterate over lower bound of right group, finding smallest possible
+ * upper bound of left group.
+ */
+ i1 = 0;
+ i2 = 0;
+ right_lower = &by_lower[i1].lower;
+ left_upper = &by_upper[i2].lower;
+ while (true)
+ {
+ /*
+ * Find next lower bound of right group.
+ */
+ while (i1 < nentries &&
+ range_cmp_bounds(typcache, right_lower,
+ &by_lower[i1].lower) == 0)
+ {
+ if (range_cmp_bounds(typcache, &by_lower[i1].upper,
+ left_upper) > 0)
+ left_upper = &by_lower[i1].upper;
+ i1++;
+ }
+ if (i1 >= nentries)
+ break;
+ right_lower = &by_lower[i1].lower;
- if (empty1 || empty2)
+ /*
+ * Find count of ranges which anyway should be placed to the
+ * left group.
+ */
+ while (i2 < nentries &&
+ range_cmp_bounds(typcache, &by_upper[i2].upper,
+ left_upper) <= 0)
+ i2++;
+
+ /*
+ * Consider found split to see if it's better than what we had.
+ */
+ range_gist_consider_split(&context, right_lower, i1, left_upper, i2);
+ }
+
+ /*
+ * Iterate over upper bound of left group finding greatest possible
+ * lower bound of right group.
+ */
+ i1 = nentries - 1;
+ i2 = nentries - 1;
+ right_lower = &by_lower[i1].upper;
+ left_upper = &by_upper[i2].upper;
+ while (true)
+ {
+ /*
+ * Find next upper bound of left group.
+ */
+ while (i2 >= 0 &&
+ range_cmp_bounds(typcache, left_upper,
+ &by_upper[i2].upper) == 0)
+ {
+ if (range_cmp_bounds(typcache, &by_upper[i2].lower,
+ right_lower) < 0)
+ right_lower = &by_upper[i2].lower;
+ i2--;
+ }
+ if (i2 < 0)
+ break;
+ left_upper = &by_upper[i2].upper;
+
+ /*
+ * Find count of intervals which anyway should be placed to the
+ * right group.
+ */
+ while (i1 >= 0 &&
+ range_cmp_bounds(typcache, &by_lower[i1].lower,
+ right_lower) >= 0)
+ i1--;
+
+ /*
+ * Consider found split to see if it's better than what we had.
+ */
+ range_gist_consider_split(&context, right_lower, i1 + 1,
+ left_upper, i2 + 1);
+ }
+
+ /*
+ * If we failed to find any acceptable splits, use trivial split.
+ */
+ if (context.first)
+ {
+ range_gist_fallback_split(typcache, entryvec, v);
+ return;
+ }
+
+ /*
+ * Ok, we have now selected bounds of the groups. Now we have to distribute
+ * entries themselves. At first we distribute entries which can be placed
+ * unambiguously and collect "common entries" to array.
+ */
+
+ /* Allocate vectors for results */
+ v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
+ v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
+ v->spl_nleft = 0;
+ v->spl_nright = 0;
+
+ /*
+ * Allocate an array for "common entries" - entries which can be placed to
+ * either group without affecting overlap along selected axis.
+ */
+ common_entries_count = 0;
+ common_entries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
+
+ /*
+ * Distribute entries which can be distributed unambiguously, and collect
+ * common entries.
+ */
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
- if (empty1 && empty2)
- return 0;
- else if (empty1)
- return -1;
- else if (empty2)
- return 1;
+ RangeBound lower,
+ upper;
+ bool empty;
+
+ /*
+ * Get upper and lower bounds along selected axis.
+ */
+ range = DatumGetRangeType(entryvec->vector[i].key);
+
+ range_deserialize(typcache, range, &lower, &upper, &empty);
+
+ if (range_cmp_bounds(typcache, &upper, context.left_upper) <= 0)
+ {
+ /* Fits in the left group */
+ if (range_cmp_bounds(typcache, &lower, context.right_lower) >= 0)
+ {
+ /* Fits also in the right group, so "common entry" */
+ common_entries[common_entries_count].index = i;
+ if (context.has_subtype_diff)
+ {
+ /*
+ * delta = (lower - context.right_lower) -
+ * (context.left_upper - upper)
+ */
+ common_entries[common_entries_count].delta =
+ call_subtype_diff(typcache,
+ lower.val,
+ context.right_lower->val) -
+ call_subtype_diff(typcache,
+ context.left_upper->val,
+ upper.val);
+ }
+ else
+ {
+ /* Without subtype_diff, take all deltas as zero */
+ common_entries[common_entries_count].delta = 0;
+ }
+ common_entries_count++;
+ }
+ else
+ {
+ /* Doesn't fit to the right group, so join to the left group */
+ PLACE_LEFT(range, i);
+ }
+ }
else
- Assert(false);
+ {
+ /*
+ * Each entry should fit on either left or right group. Since this
+ * entry didn't fit in the left group, it better fit in the right
+ * group.
+ */
+ Assert(range_cmp_bounds(typcache, &lower,
+ context.right_lower) >= 0);
+ PLACE_RIGHT(range, i);
+ }
+ }
+
+ /*
+ * Distribute "common entries", if any.
+ */
+ if (common_entries_count > 0)
+ {
+ /*
+ * Sort "common entries" by calculated deltas in order to distribute
+ * the most ambiguous entries first.
+ */
+ qsort(common_entries, common_entries_count, sizeof(CommonEntry),
+ common_entry_cmp);
+
+ /*
+ * Distribute "common entries" between groups according to sorting.
+ */
+ for (i = 0; i < common_entries_count; i++)
+ {
+ int idx = common_entries[i].index;
+
+ range = DatumGetRangeType(entryvec->vector[idx].key);
+
+ /*
+ * Check if we have to place this entry in either group to achieve
+ * LIMIT_RATIO.
+ */
+ if (i < context.common_left)
+ PLACE_LEFT(range, idx);
+ else
+ PLACE_RIGHT(range, idx);
+ }
}
+ v->spl_ldatum = PointerGetDatum(left_range);
+ v->spl_rdatum = PointerGetDatum(right_range);
+}
+
+/*
+ * Consider replacement of currently selected split with a better one
+ * during range_gist_double_sorting_split.
+ */
+static void
+range_gist_consider_split(ConsiderSplitContext *context,
+ RangeBound *right_lower, int min_left_count,
+ RangeBound *left_upper, int max_left_count)
+{
+ int left_count,
+ right_count;
+ float4 ratio,
+ overlap;
+
+ /*
+ * Calculate entries distribution ratio assuming most uniform distribution
+ * of common entries.
+ */
+ if (min_left_count >= (context->entries_count + 1) / 2)
+ left_count = min_left_count;
+ else if (max_left_count <= context->entries_count / 2)
+ left_count = max_left_count;
+ else
+ left_count = context->entries_count / 2;
+ right_count = context->entries_count - left_count;
+
/*
- * If both lower or both upper bounds are infinite, we sort by ascending
- * range size. That means that if both upper bounds are infinite, we sort
- * by the lower bound _descending_. That creates a slightly odd total
- * order, but keeps the pages with very unselective predicates grouped
- * more closely together on the right.
+ * Ratio of split: quotient between size of smaller group and total
+ * entries count. This is necessarily 0.5 or less; if it's less than
+ * LIMIT_RATIO then we will never accept the new split.
*/
- if (lower1.infinite || upper1.infinite ||
- lower2.infinite || upper2.infinite)
+ ratio = ((float4) Min(left_count, right_count)) /
+ ((float4) context->entries_count);
+
+ if (ratio > LIMIT_RATIO)
{
- if (lower1.infinite && lower2.infinite)
- return range_cmp_bounds(typcache, &upper1, &upper2);
- else if (lower1.infinite)
- return -1;
- else if (lower2.infinite)
- return 1;
- else if (upper1.infinite && upper2.infinite)
- return -(range_cmp_bounds(typcache, &lower1, &lower2));
- else if (upper1.infinite)
- return 1;
- else if (upper2.infinite)
- return -1;
+ bool selectthis = false;
+
+ /*
+ * The ratio is acceptable, so compare current split with previously
+ * selected one. We search for minimal overlap (allowing negative
+ * values) and minimal ratio secondarily. If subtype_diff is
+ * available, it's used for overlap measure. Without subtype_diff we
+ * use number of "common entries" as an overlap measure.
+ */
+ if (context->has_subtype_diff)
+ overlap = call_subtype_diff(context->typcache,
+ left_upper->val,
+ right_lower->val);
+ else
+ overlap = max_left_count - min_left_count;
+
+ /* If there is no previous selection, select this split */
+ if (context->first)
+ selectthis = true;
else
- Assert(false);
+ {
+ /*
+ * Choose the new split if it has a smaller overlap, or same
+ * overlap but better ratio.
+ */
+ if (overlap < context->overlap ||
+ (overlap == context->overlap && ratio > context->ratio))
+ selectthis = true;
+ }
+
+ if (selectthis)
+ {
+ /* save information about selected split */
+ context->first = false;
+ context->ratio = ratio;
+ context->overlap = overlap;
+ context->right_lower = right_lower;
+ context->left_upper = left_upper;
+ context->common_left = max_left_count - left_count;
+ context->common_right = left_count - min_left_count;
+ }
+ }
+}
+
+/*
+ * Find class number for range.
+ *
+ * The class number is a valid combination of the properties of the
+ * range. Note: the highest possible number is 8, because CLS_EMPTY
+ * can't be combined with anything else.
+ */
+static int
+get_gist_range_class(RangeType *range)
+{
+ int classNumber;
+ char flags;
+
+ flags = range_get_flags(range);
+ if (flags & RANGE_EMPTY)
+ {
+ classNumber = CLS_EMPTY;
}
+ else
+ {
+ classNumber = 0;
+ if (flags & RANGE_LB_INF)
+ classNumber |= CLS_LOWER_INF;
+ if (flags & RANGE_UB_INF)
+ classNumber |= CLS_UPPER_INF;
+ if (flags & RANGE_CONTAIN_EMPTY)
+ classNumber |= CLS_CONTAIN_EMPTY;
+ }
+ return classNumber;
+}
+
+/*
+ * Comparison function for range_gist_single_sorting_split.
+ */
+static int
+single_bound_cmp(const void *a, const void *b, void *arg)
+{
+ SingleBoundSortItem *i1 = (SingleBoundSortItem *) a;
+ SingleBoundSortItem *i2 = (SingleBoundSortItem *) b;
+ TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
+
+ return range_cmp_bounds(typcache, &i1->bound, &i2->bound);
+}
+
+/*
+ * Compare NonEmptyRanges by lower bound.
+ */
+static int
+interval_cmp_lower(const void *a, const void *b, void *arg)
+{
+ NonEmptyRange *i1 = (NonEmptyRange *) a;
+ NonEmptyRange *i2 = (NonEmptyRange *) b;
+ TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
+
+ return range_cmp_bounds(typcache, &i1->lower, &i2->lower);
+}
+
+/*
+ * Compare NonEmptyRanges by upper bound.
+ */
+static int
+interval_cmp_upper(const void *a, const void *b, void *arg)
+{
+ NonEmptyRange *i1 = (NonEmptyRange *) a;
+ NonEmptyRange *i2 = (NonEmptyRange *) b;
+ TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
+
+ return range_cmp_bounds(typcache, &i1->upper, &i2->upper);
+}
- if ((cmp = range_cmp_bounds(typcache, &lower1, &lower2)) != 0)
- return cmp;
+/*
+ * Compare CommonEntrys by their deltas.
+ */
+static int
+common_entry_cmp(const void *i1, const void *i2)
+{
+ double delta1 = ((CommonEntry *) i1)->delta;
+ double delta2 = ((CommonEntry *) i2)->delta;
+
+ if (delta1 < delta2)
+ return -1;
+ else if (delta1 > delta2)
+ return 1;
+ else
+ return 0;
+}
- return range_cmp_bounds(typcache, &upper1, &upper2);
+/*
+ * Convenience function to invoke type-specific subtype_diff function.
+ * Caller must have already checked that there is one for the range type.
+ */
+static float8
+call_subtype_diff(TypeCacheEntry *typcache, Datum val1, Datum val2)
+{
+ float8 value;
+
+ value = DatumGetFloat8(FunctionCall2Coll(&typcache->rng_subdiff_finfo,
+ typcache->rng_collation,
+ val1, val2));
+ /* Cope with buggy subtype_diff function by returning zero */
+ if (value >= 0.0)
+ return value;
+ return 0.0;
}