From: Tom Lane Date: Thu, 16 Dec 2010 02:14:24 +0000 (-0500) Subject: Fix contrib/seg's GiST picksplit method. X-Git-Tag: REL9_1_ALPHA3~55 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=2a6ebe70fb2f7ec97a08dc07214fe2ca571d2780;p=postgresql Fix contrib/seg's GiST picksplit method. This patch replaces Guttman's generalized split method with a simple sort-by-center-points algorithm. Since the data is only one-dimensional we don't really need the slow and none-too-stable Guttman method. This is in part a bug fix, since seg has the same size_alpha versus size_beta typo that was recently fixed in contrib/cube. It seems prudent to apply this rather aggressive fix only in HEAD, though. Back branches will just get the typo fix. Alexander Korotkov, reviewed by Yeb Havinga --- diff --git a/contrib/seg/seg.c b/contrib/seg/seg.c index 8de5092fc4..afada2a0aa 100644 --- a/contrib/seg/seg.c +++ b/contrib/seg/seg.c @@ -33,6 +33,16 @@ extern void seg_scanner_finish(void); extern int seg_yydebug; */ +/* + * Auxiliary data structure for picksplit method. + */ +typedef struct +{ + float center; + OffsetNumber index; + SEG *data; +} gseg_picksplit_item; + /* ** Input/Output routines */ @@ -292,152 +302,104 @@ gseg_penalty(GISTENTRY *origentry, GISTENTRY *newentry, float *result) return (result); } +/* + * Compare function for gseg_picksplit_item: sort by center. + */ +static int +gseg_picksplit_item_cmp(const void *a, const void *b) +{ + const gseg_picksplit_item *i1 = (const gseg_picksplit_item *) a; + const gseg_picksplit_item *i2 = (const gseg_picksplit_item *) b; + if (i1->center < i2->center) + return -1; + else if (i1->center == i2->center) + return 0; + else + return 1; +} /* -** The GiST PickSplit method for segments -** We use Guttman's poly time split algorithm -*/ + * The GiST PickSplit method for segments + * + * We used to use Guttman's split algorithm here, but since the data is 1-D + * it's easier and more robust to just sort the segments by center-point and + * split at the middle. + */ GIST_SPLITVEC * gseg_picksplit(GistEntryVector *entryvec, GIST_SPLITVEC *v) { - OffsetNumber i, - j; - SEG *datum_alpha, - *datum_beta; + int i; SEG *datum_l, - *datum_r; - SEG *union_d, - *union_dl, - *union_dr; - SEG *inter_d; - bool firsttime; - float size_alpha, - size_beta, - size_union, - size_inter; - float size_waste, - waste; - float size_l, - size_r; - int nbytes; - OffsetNumber seed_1 = 1, - seed_2 = 2; + *datum_r, + *seg; + gseg_picksplit_item *sort_items; OffsetNumber *left, *right; OffsetNumber maxoff; + OffsetNumber firstright; #ifdef GIST_DEBUG fprintf(stderr, "picksplit\n"); #endif - maxoff = entryvec->n - 2; - nbytes = (maxoff + 2) * sizeof(OffsetNumber); - v->spl_left = (OffsetNumber *) palloc(nbytes); - v->spl_right = (OffsetNumber *) palloc(nbytes); + /* Valid items in entryvec->vector[] are indexed 1..maxoff */ + maxoff = entryvec->n - 1; - firsttime = true; - waste = 0.0; - - for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i)) + /* + * Prepare the auxiliary array and sort it. + */ + sort_items = (gseg_picksplit_item *) + palloc(maxoff * sizeof(gseg_picksplit_item)); + for (i = 1; i <= maxoff; i++) { - datum_alpha = (SEG *) DatumGetPointer(entryvec->vector[i].key); - for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j)) - { - datum_beta = (SEG *) DatumGetPointer(entryvec->vector[j].key); - - /* compute the wasted space by unioning these guys */ - /* size_waste = size_union - size_inter; */ - union_d = seg_union(datum_alpha, datum_beta); - rt_seg_size(union_d, &size_union); - inter_d = seg_inter(datum_alpha, datum_beta); - rt_seg_size(inter_d, &size_inter); - size_waste = size_union - size_inter; - - /* - * are these a more promising split that what we've already seen? - */ - if (size_waste > waste || firsttime) - { - waste = size_waste; - seed_1 = i; - seed_2 = j; - firsttime = false; - } - } + seg = (SEG *) DatumGetPointer(entryvec->vector[i].key); + /* center calculation is done this way to avoid possible overflow */ + sort_items[i - 1].center = seg->lower*0.5f + seg->upper*0.5f; + sort_items[i - 1].index = i; + sort_items[i - 1].data = seg; } + qsort(sort_items, maxoff, sizeof(gseg_picksplit_item), + gseg_picksplit_item_cmp); + /* sort items below "firstright" will go into the left side */ + firstright = maxoff / 2; + + v->spl_left = (OffsetNumber *) palloc(maxoff * sizeof(OffsetNumber)); + v->spl_right = (OffsetNumber *) palloc(maxoff * sizeof(OffsetNumber)); left = v->spl_left; v->spl_nleft = 0; right = v->spl_right; v->spl_nright = 0; - datum_alpha = (SEG *) DatumGetPointer(entryvec->vector[seed_1].key); - datum_l = seg_union(datum_alpha, datum_alpha); - rt_seg_size(datum_l, &size_l); - datum_beta = (SEG *) DatumGetPointer(entryvec->vector[seed_2].key); - datum_r = seg_union(datum_beta, datum_beta); - rt_seg_size(datum_r, &size_r); - /* - * Now split up the regions between the two seeds. An important property - * of this split algorithm is that the split vector v has the indices of - * items to be split in order in its left and right vectors. We exploit - * this property by doing a merge in the code that actually splits the - * page. - * - * For efficiency, we also place the new index tuple in this loop. This is - * handled at the very end, when we have placed all the existing tuples - * and i == maxoff + 1. + * Emit segments to the left output page, and compute its bounding box. */ - - maxoff = OffsetNumberNext(maxoff); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + datum_l = (SEG *) palloc(sizeof(SEG)); + memcpy(datum_l, sort_items[0].data, sizeof(SEG)); + *left++ = sort_items[0].index; + v->spl_nleft++; + for (i = 1; i < firstright; i++) { - /* - * If we've already decided where to place this item, just put it on - * the right list. Otherwise, we need to figure out which page needs - * the least enlargement in order to store the item. - */ - - if (i == seed_1) - { - *left++ = i; - v->spl_nleft++; - continue; - } - else if (i == seed_2) - { - *right++ = i; - v->spl_nright++; - continue; - } - - /* okay, which page needs least enlargement? */ - datum_alpha = (SEG *) DatumGetPointer(entryvec->vector[i].key); - union_dl = seg_union(datum_l, datum_alpha); - union_dr = seg_union(datum_r, datum_alpha); - rt_seg_size(union_dl, &size_alpha); - rt_seg_size(union_dr, &size_beta); + datum_l = seg_union(datum_l, sort_items[i].data); + *left++ = sort_items[i].index; + v->spl_nleft++; + } - /* pick which page to add it to */ - if (size_alpha - size_l < size_beta - size_r) - { - datum_l = union_dl; - size_l = size_alpha; - *left++ = i; - v->spl_nleft++; - } - else - { - datum_r = union_dr; - size_r = size_alpha; - *right++ = i; - v->spl_nright++; - } + /* + * Likewise for the right page. + */ + datum_r = (SEG *) palloc(sizeof(SEG)); + memcpy(datum_r, sort_items[firstright].data, sizeof(SEG)); + *right++ = sort_items[firstright].index; + v->spl_nright++; + for (i = firstright + 1; i < maxoff; i++) + { + datum_r = seg_union(datum_r, sort_items[i].data); + *right++ = sort_items[i].index; + v->spl_nright++; } - *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */ v->spl_ldatum = PointerGetDatum(datum_l); v->spl_rdatum = PointerGetDatum(datum_r);