From: Heikki Linnakangas Date: Thu, 6 Oct 2011 07:03:46 +0000 (+0300) Subject: Replace the "New Linear" GiST split algorithm for boxes and points with a X-Git-Tag: REL9_2_BETA1~1029 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=7f3bd86843e5aad84585a57d3f6b80db3c609916;p=postgresql Replace the "New Linear" GiST split algorithm for boxes and points with a new double-sorting algorithm. The new algorithm produces better quality trees, making searches faster. Alexander Korotkov --- diff --git a/src/backend/access/gist/gistproc.c b/src/backend/access/gist/gistproc.c index 43c4b1251b..f7eb9412f9 100644 --- a/src/backend/access/gist/gistproc.c +++ b/src/backend/access/gist/gistproc.c @@ -27,6 +27,9 @@ static double size_box(Datum dbox); static bool rtree_internal_consistent(BOX *key, BOX *query, StrategyNumber strategy); +/* Minimum accepted ratio of split */ +#define LIMIT_RATIO 0.3 + /************************************************** * Box ops @@ -49,30 +52,6 @@ rt_box_union(PG_FUNCTION_ARGS) PG_RETURN_BOX_P(n); } -static Datum -rt_box_inter(PG_FUNCTION_ARGS) -{ - BOX *a = PG_GETARG_BOX_P(0); - BOX *b = PG_GETARG_BOX_P(1); - BOX *n; - - n = (BOX *) palloc(sizeof(BOX)); - - n->high.x = Min(a->high.x, b->high.x); - n->high.y = Min(a->high.y, b->high.y); - n->low.x = Max(a->low.x, b->low.x); - n->low.y = Max(a->low.y, b->low.y); - - if (n->high.x < n->low.x || n->high.y < n->low.y) - { - pfree(n); - /* Indicate "no intersection" by returning NULL pointer */ - n = NULL; - } - - PG_RETURN_BOX_P(n); -} - /* * The GiST Consistent method for boxes * @@ -194,86 +173,6 @@ gist_box_penalty(PG_FUNCTION_ARGS) PG_RETURN_POINTER(result); } -static void -chooseLR(GIST_SPLITVEC *v, - OffsetNumber *list1, int nlist1, BOX *union1, - OffsetNumber *list2, int nlist2, BOX *union2) -{ - bool firstToLeft = true; - - if (v->spl_ldatum_exists || v->spl_rdatum_exists) - { - if (v->spl_ldatum_exists && v->spl_rdatum_exists) - { - BOX LRl = *union1, - LRr = *union2; - BOX RLl = *union2, - RLr = *union1; - double sizeLR, - sizeRL; - - adjustBox(&LRl, DatumGetBoxP(v->spl_ldatum)); - adjustBox(&LRr, DatumGetBoxP(v->spl_rdatum)); - adjustBox(&RLl, DatumGetBoxP(v->spl_ldatum)); - adjustBox(&RLr, DatumGetBoxP(v->spl_rdatum)); - - sizeLR = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr))); - sizeRL = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr))); - - if (sizeLR > sizeRL) - firstToLeft = false; - - } - else - { - float p1, - p2; - GISTENTRY oldUnion, - addon; - - gistentryinit(oldUnion, (v->spl_ldatum_exists) ? v->spl_ldatum : v->spl_rdatum, - NULL, NULL, InvalidOffsetNumber, FALSE); - - gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE); - DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p1)); - gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE); - DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p2)); - - if ((v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2)) - firstToLeft = false; - } - } - - if (firstToLeft) - { - v->spl_left = list1; - v->spl_right = list2; - v->spl_nleft = nlist1; - v->spl_nright = nlist2; - if (v->spl_ldatum_exists) - adjustBox(union1, DatumGetBoxP(v->spl_ldatum)); - v->spl_ldatum = BoxPGetDatum(union1); - if (v->spl_rdatum_exists) - adjustBox(union2, DatumGetBoxP(v->spl_rdatum)); - v->spl_rdatum = BoxPGetDatum(union2); - } - else - { - v->spl_left = list2; - v->spl_right = list1; - v->spl_nleft = nlist2; - v->spl_nright = nlist1; - if (v->spl_ldatum_exists) - adjustBox(union2, DatumGetBoxP(v->spl_ldatum)); - v->spl_ldatum = BoxPGetDatum(union2); - if (v->spl_rdatum_exists) - adjustBox(union1, DatumGetBoxP(v->spl_rdatum)); - v->spl_rdatum = BoxPGetDatum(union1); - } - - v->spl_ldatum_exists = v->spl_rdatum_exists = false; -} - /* * Trivial split: half of entries will be placed on one page * and another half - to another @@ -338,199 +237,603 @@ fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v) } /* - * The GiST PickSplit method + * Represents information about an entry that can be placed to either group + * without affecting overlap over selected axis ("common entry"). + */ +typedef struct +{ + /* Index of entry in the initial array */ + int index; + /* Delta between penalties of entry insertion into different groups */ + double delta; +} CommonEntry; + +/* + * Context for g_box_consider_split. Contains information about currently + * selected split and some general information. + */ +typedef struct +{ + int entriesCount; /* total number of entries being split */ + BOX boundingBox; /* minimum bounding box across all entries */ + + /* Information about currently selected split follows */ + + bool first; /* true if no split was selected yet */ + + double leftUpper; /* upper bound of left interval */ + double rightLower; /* lower bound of right interval */ + + float4 ratio; + float4 overlap; + int dim; /* axis of this split */ + double range; /* width of general MBR projection to the + * selected axis */ +} ConsiderSplitContext; + +/* + * Interval represents projection of box to axis. + */ +typedef struct +{ + double lower, + upper; +} SplitInterval; + +/* + * Interval comparison function by lower bound of the interval; + */ +static int +interval_cmp_lower(const void *i1, const void *i2) +{ + double lower1 = ((SplitInterval *) i1)->lower, + lower2 = ((SplitInterval *) i2)->lower; + + if (lower1 < lower2) + return -1; + else if (lower1 > lower2) + return 1; + else + return 0; +} + +/* + * Interval comparison function by upper bound of the interval; + */ +static int +interval_cmp_upper(const void *i1, const void *i2) +{ + double upper1 = ((SplitInterval *) i1)->upper, + upper2 = ((SplitInterval *) i2)->upper; + + if (upper1 < upper2) + return -1; + else if (upper1 > upper2) + return 1; + else + return 0; +} + +/* + * Replace negative value with zero. + */ +static inline float +non_negative(float val) +{ + if (val >= 0.0f) + return val; + else + return 0.0f; +} + +/* + * Consider replacement of currently selected split with the better one. + */ +static void inline +g_box_consider_split(ConsiderSplitContext *context, int dimNum, + double rightLower, int minLeftCount, + double leftUpper, int maxLeftCount) +{ + int leftCount, + rightCount; + float4 ratio, + overlap; + double range; + + /* + * Calculate entries distribution ratio assuming most uniform distribution + * of common entries. + */ + if (minLeftCount >= (context->entriesCount + 1) / 2) + { + leftCount = minLeftCount; + } + else + { + if (maxLeftCount <= context->entriesCount / 2) + leftCount = maxLeftCount; + else + leftCount = context->entriesCount / 2; + } + rightCount = context->entriesCount - leftCount; + + /* + * Ratio of split - quotient between size of lesser group and total + * entries count. + */ + ratio = ((float4) Min(leftCount, rightCount)) / + ((float4) context->entriesCount); + + if (ratio > LIMIT_RATIO) + { + bool selectthis = false; + + /* + * The ratio is acceptable, so compare current split with previously + * selected one. Between splits of one dimension we search for minimal + * overlap (allowing negative values) and minimal ration (between same + * overlaps. We switch dimension if find less overlap (non-negative) + * or less range with same overlap. + */ + if (dimNum == 0) + range = context->boundingBox.high.x - context->boundingBox.low.x; + else + range = context->boundingBox.high.y - context->boundingBox.low.y; + + overlap = (leftUpper - rightLower) / range; + + /* If there is no previous selection, select this */ + if (context->first) + selectthis = true; + else if (context->dim == dimNum) + { + /* + * Within the same dimension, choose the new split if it has a + * smaller overlap, or same overlap but better ratio. + */ + if (overlap < context->overlap || + (overlap == context->overlap && ratio > context->ratio)) + selectthis = true; + } + else + { + /* + * Across dimensions, choose the new split if it has a smaller + * *non-negative* overlap, or same *non-negative* overlap but + * bigger range. This condition differs from the one described in + * the article. On the datasets where leaf MBRs don't overlap + * themselves, non-overlapping splits (i.e. splits which have zero + * *non-negative* overlap) are frequently possible. In this case + * splits tends to be along one dimension, because most distant + * non-overlapping splits (i.e. having lowest negative overlap) + * appears to be in the same dimension as in the previous split. + * Therefore MBRs appear to be very prolonged along another + * dimension, which leads to bad search performance. Using range + * as the second split criteria makes MBRs more quadratic. Using + * *non-negative* overlap instead of overlap as the first split + * criteria gives to range criteria a chance to matter, because + * non-overlapping splits are equivalent in this criteria. + */ + if (non_negative(overlap) < non_negative(context->overlap) || + (range > context->range && + non_negative(overlap) <= non_negative(context->overlap))) + selectthis = true; + } + + if (selectthis) + { + /* save information about selected split */ + context->first = false; + context->ratio = ratio; + context->range = range; + context->overlap = overlap; + context->rightLower = rightLower; + context->leftUpper = leftUpper; + context->dim = dimNum; + } + } +} + +/* + * Return increase of original BOX area by new BOX area insertion. + */ +static double +box_penalty(BOX *original, BOX *new) +{ + double union_width, + union_height; + + union_width = Max(original->high.x, new->high.x) - + Min(original->low.x, new->low.x); + union_height = Max(original->high.y, new->high.y) - + Min(original->low.y, new->low.y); + return union_width * union_height - (original->high.x - original->low.x) * + (original->high.y - original->low.y); +} + +/* + * Compare common entries by their deltas. + */ +static int +common_entry_cmp(const void *i1, const void *i2) +{ + double delta1 = ((CommonEntry *) i1)->delta, + delta2 = ((CommonEntry *) i2)->delta; + + if (delta1 < delta2) + return -1; + else if (delta1 > delta2) + return 1; + else + return 0; +} + +/* + * -------------------------------------------------------------------------- + * Double sorting split algorithm. This is used for both boxes and points. * - * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree', - * C.H.Ang and T.C.Tan + * The algorithm finds split of boxes by considering splits along each axis. + * Each entry is first projected as an interval on the X-axis, and different + * ways to split the intervals into two groups are considered, trying to + * minimize the overlap of the groups. Then the same is repeated for the + * Y-axis, and the overall best split is chosen. The quality of a split is + * determined by overlap along that axis and some other criteria (see + * g_box_consider_split). * - * This is used for both boxes and points. + * After that, all the entries are divided into three groups: + * + * 1) Entries which should be placed to the left group + * 2) Entries which should be placed to the right group + * 3) "Common entries" which can be placed to any of groups without affecting + * of overlap along selected axis. + * + * The common entries are distributed by minimizing penalty. + * + * For details see: + * "A new double sorting-based node splitting algorithm for R-tree", A. Korotkov + * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36 + * -------------------------------------------------------------------------- */ Datum gist_box_picksplit(PG_FUNCTION_ARGS) { GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0); GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1); - OffsetNumber i; - OffsetNumber *listL, - *listR, - *listB, - *listT; - BOX *unionL, - *unionR, - *unionB, - *unionT; - int posL, - posR, - posB, - posT; - BOX pageunion; - BOX *cur; - char direction = ' '; - bool allisequal = true; - OffsetNumber maxoff; - int nbytes; + OffsetNumber i, + maxoff; + ConsiderSplitContext context; + BOX *box, + *leftBox, + *rightBox; + int dim, + commonEntriesCount; + SplitInterval *intervalsLower, + *intervalsUpper; + CommonEntry *commonEntries; + int nentries; + + memset(&context, 0, sizeof(ConsiderSplitContext)); - posL = posR = posB = posT = 0; maxoff = entryvec->n - 1; + nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1; - cur = DatumGetBoxP(entryvec->vector[FirstOffsetNumber].key); - memcpy((void *) &pageunion, (void *) cur, sizeof(BOX)); + /* Allocate arrays for intervals along axes */ + intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval)); + intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval)); - /* find MBR */ - for (i = OffsetNumberNext(FirstOffsetNumber); i <= maxoff; i = OffsetNumberNext(i)) + /* + * Calculate the overall minimum bounding box over all the entries. + */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { - cur = DatumGetBoxP(entryvec->vector[i].key); - if (allisequal && ( - pageunion.high.x != cur->high.x || - pageunion.high.y != cur->high.y || - pageunion.low.x != cur->low.x || - pageunion.low.y != cur->low.y - )) - allisequal = false; - - adjustBox(&pageunion, cur); + box = DatumGetBoxP(entryvec->vector[i].key); + if (i == FirstOffsetNumber) + context.boundingBox = *box; + else + adjustBox(&context.boundingBox, box); } - if (allisequal) + /* + * Iterate over axes for optimal split searching. + */ + context.first = true; /* nothing selected yet */ + for (dim = 0; dim < 2; dim++) { + double leftUpper, + rightLower; + int i1, + i2; + + /* Project each entry as an interval on the selected axis. */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + box = DatumGetBoxP(entryvec->vector[i].key); + if (dim == 0) + { + intervalsLower[i - FirstOffsetNumber].lower = box->low.x; + intervalsLower[i - FirstOffsetNumber].upper = box->high.x; + } + else + { + intervalsLower[i - FirstOffsetNumber].lower = box->low.y; + intervalsLower[i - FirstOffsetNumber].upper = box->high.y; + } + } + + /* + * Make two arrays of intervals: one sorted by lower bound and another + * sorted by upper bound. + */ + memcpy(intervalsUpper, intervalsLower, + sizeof(SplitInterval) * nentries); + qsort(intervalsLower, nentries, sizeof(SplitInterval), + interval_cmp_lower); + qsort(intervalsUpper, nentries, sizeof(SplitInterval), + interval_cmp_upper); + + /*---- + * The goal is to form a left and right interval, so that every entry + * interval is contained by either left or right interval (or both). + * + * For example, with the intervals (0,1), (1,3), (2,3), (2,4): + * + * 0 1 2 3 4 + * +-+ + * +---+ + * +-+ + * +---+ + * + * The left and right intervals are of the form (0,a) and (b,4). + * We first consider splits where b is the lower bound of an entry. + * We iterate through all entries, and for each b, calculate the + * smallest possible a. Then we consider splits where a is the + * uppper bound of an entry, and for each a, calculate the greatest + * possible b. + * + * In the above example, the first loop would consider splits: + * b=0: (0,1)-(0,4) + * b=1: (0,1)-(1,4) + * b=2: (0,3)-(2,4) + * + * And the second loop: + * a=1: (0,1)-(1,4) + * a=3: (0,3)-(2,4) + * a=4: (0,4)-(2,4) + */ + + /* + * Iterate over lower bound of right group, finding smallest possible + * upper bound of left group. + */ + i1 = 0; + i2 = 0; + rightLower = intervalsLower[i1].lower; + leftUpper = intervalsUpper[i2].lower; + while (true) + { + /* + * Find next lower bound of right group. + */ + while (i1 < nentries && rightLower == intervalsLower[i1].lower) + { + leftUpper = Max(leftUpper, intervalsLower[i1].upper); + i1++; + } + if (i1 >= nentries) + break; + rightLower = intervalsLower[i1].lower; + + /* + * Find count of intervals which anyway should be placed to the + * left group. + */ + while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper) + i2++; + + /* + * Consider found split. + */ + g_box_consider_split(&context, dim, rightLower, i1, leftUpper, i2); + } + /* - * All entries are the same + * Iterate over upper bound of left group finding greates possible + * lower bound of right group. */ + i1 = nentries - 1; + i2 = nentries - 1; + rightLower = intervalsLower[i1].upper; + leftUpper = intervalsUpper[i2].upper; + while (true) + { + /* + * Find next upper bound of left group. + */ + while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper) + { + rightLower = Min(rightLower, intervalsUpper[i2].lower); + i2--; + } + if (i2 < 0) + break; + leftUpper = intervalsUpper[i2].upper; + + /* + * Find count of intervals which anyway should be placed to the + * right group. + */ + while (i1 >= 0 && intervalsLower[i1].lower >= rightLower) + i1--; + + /* + * Consider found split. + */ + g_box_consider_split(&context, dim, + rightLower, i1 + 1, leftUpper, i2 + 1); + } + } + + /* + * If we failed to find any acceptable splits, use trivial split. + */ + if (context.first) + { fallbackSplit(entryvec, v); PG_RETURN_POINTER(v); } - nbytes = (maxoff + 2) * sizeof(OffsetNumber); - listL = (OffsetNumber *) palloc(nbytes); - listR = (OffsetNumber *) palloc(nbytes); - listB = (OffsetNumber *) palloc(nbytes); - listT = (OffsetNumber *) palloc(nbytes); - unionL = (BOX *) palloc(sizeof(BOX)); - unionR = (BOX *) palloc(sizeof(BOX)); - unionB = (BOX *) palloc(sizeof(BOX)); - unionT = (BOX *) palloc(sizeof(BOX)); - -#define ADDLIST( list, unionD, pos, num ) do { \ - if ( pos ) { \ - if ( (unionD)->high.x < cur->high.x ) (unionD)->high.x = cur->high.x; \ - if ( (unionD)->low.x > cur->low.x ) (unionD)->low.x = cur->low.x; \ - if ( (unionD)->high.y < cur->high.y ) (unionD)->high.y = cur->high.y; \ - if ( (unionD)->low.y > cur->low.y ) (unionD)->low.y = cur->low.y; \ - } else { \ - memcpy( (void*)(unionD), (void*) cur, sizeof( BOX ) ); \ - } \ - (list)[pos] = num; \ - (pos)++; \ -} while(0) + /* + * Ok, we have now selected the split across one axis. + * + * While considering the splits, we already determined that there will be + * enough entries in both groups to reach the desired ratio, but we did + * not memorize which entries go to which group. So determine that now. + */ - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) - { - cur = DatumGetBoxP(entryvec->vector[i].key); - if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x) - ADDLIST(listL, unionL, posL, i); - else - ADDLIST(listR, unionR, posR, i); - if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y) - ADDLIST(listB, unionB, posB, i); - else - ADDLIST(listT, unionT, posT, i); - } + /* Allocate vectors for results */ + v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber)); + v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber)); + v->spl_nleft = 0; + v->spl_nright = 0; + + /* Allocate bounding boxes of left and right groups */ + leftBox = palloc0(sizeof(BOX)); + rightBox = palloc0(sizeof(BOX)); -#define LIMIT_RATIO 0.1 -#define _IS_BADRATIO(x,y) ( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO ) -#define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) ) - /* bad disposition, try to split by centers of boxes */ - if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB)) + /* + * Allocate an array for "common entries" - entries which can be placed to + * either group without affecting overlap along selected axis. + */ + commonEntriesCount = 0; + commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry)); + + /* Helper macros to place an entry in the left or right group */ +#define PLACE_LEFT(box, off) \ + do { \ + if (v->spl_nleft > 0) \ + adjustBox(leftBox, box); \ + else \ + *leftBox = *(box); \ + v->spl_left[v->spl_nleft++] = off; \ + } while(0) + +#define PLACE_RIGHT(box, off) \ + do { \ + if (v->spl_nright > 0) \ + adjustBox(rightBox, box); \ + else \ + *rightBox = *(box); \ + v->spl_right[v->spl_nright++] = off; \ + } while(0) + + /* + * Distribute entries which can be distributed unambiguously, and collect + * common entries. + */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { - double avgCenterX = 0.0, - avgCenterY = 0.0; - double CenterX, - CenterY; + double lower, + upper; - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + /* + * Get upper and lower bounds along selected axis. + */ + box = DatumGetBoxP(entryvec->vector[i].key); + if (context.dim == 0) { - cur = DatumGetBoxP(entryvec->vector[i].key); - avgCenterX += ((double) cur->high.x + (double) cur->low.x) / 2.0; - avgCenterY += ((double) cur->high.y + (double) cur->low.y) / 2.0; + lower = box->low.x; + upper = box->high.x; } - - avgCenterX /= maxoff; - avgCenterY /= maxoff; - - posL = posR = posB = posT = 0; - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + else { - cur = DatumGetBoxP(entryvec->vector[i].key); - - CenterX = ((double) cur->high.x + (double) cur->low.x) / 2.0; - CenterY = ((double) cur->high.y + (double) cur->low.y) / 2.0; + lower = box->low.y; + upper = box->high.y; + } - if (CenterX < avgCenterX) - ADDLIST(listL, unionL, posL, i); - else if (CenterX == avgCenterX) + if (upper <= context.leftUpper) + { + /* Fits to the left group */ + if (lower >= context.rightLower) { - if (posL > posR) - ADDLIST(listR, unionR, posR, i); - else - ADDLIST(listL, unionL, posL, i); + /* Fits also to the right group, so "common entry" */ + commonEntries[commonEntriesCount++].index = i; } else - ADDLIST(listR, unionR, posR, i); - - if (CenterY < avgCenterY) - ADDLIST(listB, unionB, posB, i); - else if (CenterY == avgCenterY) { - if (posB > posT) - ADDLIST(listT, unionT, posT, i); - else - ADDLIST(listB, unionB, posB, i); + /* Doesn't fit to the right group, so join to the left group */ + PLACE_LEFT(box, i); } - else - ADDLIST(listT, unionT, posT, i); } - - if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB)) + else { - fallbackSplit(entryvec, v); - PG_RETURN_POINTER(v); + /* + * Each entry should fit on either left or right group. Since this + * entry didn't fit on the left group, it better fit in the right + * group. + */ + Assert(lower >= context.rightLower); + + /* Doesn't fit to the left group, so join to the right group */ + PLACE_RIGHT(box, i); } } - /* which split more optimal? */ - if (Max(posL, posR) < Max(posB, posT)) - direction = 'x'; - else if (Max(posL, posR) > Max(posB, posT)) - direction = 'y'; - else + /* + * Distribute "common entries", if any. + */ + if (commonEntriesCount > 0) { - Datum interLR = DirectFunctionCall2(rt_box_inter, - BoxPGetDatum(unionL), - BoxPGetDatum(unionR)); - Datum interBT = DirectFunctionCall2(rt_box_inter, - BoxPGetDatum(unionB), - BoxPGetDatum(unionT)); - double sizeLR, - sizeBT; - - sizeLR = size_box(interLR); - sizeBT = size_box(interBT); - - if (sizeLR < sizeBT) - direction = 'x'; - else - direction = 'y'; - } + /* + * Calculate minimum number of entries that must be placed in both + * groups, to reach LIMIT_RATIO. + */ + int m = ceil(LIMIT_RATIO * (double) nentries); - if (direction == 'x') - chooseLR(v, - listL, posL, unionL, - listR, posR, unionR); - else - chooseLR(v, - listB, posB, unionB, - listT, posT, unionT); + /* + * Calculate delta between penalties of join "common entries" to + * different groups. + */ + for (i = 0; i < commonEntriesCount; i++) + { + box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key); + commonEntries[i].delta = Abs(box_penalty(leftBox, box) - + box_penalty(rightBox, box)); + } + + /* + * Sort "common entries" by calculated deltas in order to distribute + * the most ambiguous entries first. + */ + qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp); + + /* + * Distribute "common entries" between groups. + */ + for (i = 0; i < commonEntriesCount; i++) + { + box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key); + + /* + * Check if we have to place this entry in either group to achieve + * LIMIT_RATIO. + */ + if (v->spl_nleft + (commonEntriesCount - i) <= m) + PLACE_LEFT(box, commonEntries[i].index); + else if (v->spl_nright + (commonEntriesCount - i) <= m) + PLACE_RIGHT(box, commonEntries[i].index); + else + { + /* Otherwise select the group by minimal penalty */ + if (box_penalty(leftBox, box) < box_penalty(rightBox, box)) + PLACE_LEFT(box, commonEntries[i].index); + else + PLACE_RIGHT(box, commonEntries[i].index); + } + } + } + v->spl_ldatum = PointerGetDatum(leftBox); + v->spl_rdatum = PointerGetDatum(rightBox); PG_RETURN_POINTER(v); }