]> granicus.if.org Git - postgresql/commitdiff
Fix contrib/seg's GiST picksplit method.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 16 Dec 2010 02:14:24 +0000 (21:14 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 16 Dec 2010 02:24:47 +0000 (21:24 -0500)
This patch replaces Guttman's generalized split method with a simple
sort-by-center-points algorithm.  Since the data is only one-dimensional
we don't really need the slow and none-too-stable Guttman method.

This is in part a bug fix, since seg has the same size_alpha versus
size_beta typo that was recently fixed in contrib/cube.  It seems
prudent to apply this rather aggressive fix only in HEAD, though.
Back branches will just get the typo fix.

Alexander Korotkov, reviewed by Yeb Havinga

contrib/seg/seg.c

index 8de5092fc4f00b62664e2538f4d40aeac73bf000..afada2a0aad0f04f75c74c8309ecfd3f9c97b41c 100644 (file)
@@ -33,6 +33,16 @@ extern void seg_scanner_finish(void);
 extern int      seg_yydebug;
 */
 
+/*
+ * Auxiliary data structure for picksplit method.
+ */
+typedef struct
+{
+       float           center;
+       OffsetNumber index;
+       SEG                *data;
+} gseg_picksplit_item;
+
 /*
 ** Input/Output routines
 */
@@ -292,152 +302,104 @@ gseg_penalty(GISTENTRY *origentry, GISTENTRY *newentry, float *result)
        return (result);
 }
 
+/*
+ * Compare function for gseg_picksplit_item: sort by center.
+ */
+static int
+gseg_picksplit_item_cmp(const void *a, const void *b)
+{
+       const gseg_picksplit_item *i1 = (const gseg_picksplit_item *) a;
+       const gseg_picksplit_item *i2 = (const gseg_picksplit_item *) b;
 
+       if (i1->center < i2->center)
+               return -1;
+       else if (i1->center == i2->center)
+               return 0;
+       else
+               return 1;
+}
 
 /*
-** The GiST PickSplit method for segments
-** We use Guttman's poly time split algorithm
-*/
+ * The GiST PickSplit method for segments
+ *
+ * We used to use Guttman's split algorithm here, but since the data is 1-D
+ * it's easier and more robust to just sort the segments by center-point and
+ * split at the middle.
+ */
 GIST_SPLITVEC *
 gseg_picksplit(GistEntryVector *entryvec,
                           GIST_SPLITVEC *v)
 {
-       OffsetNumber i,
-                               j;
-       SEG                *datum_alpha,
-                          *datum_beta;
+       int                     i;
        SEG                *datum_l,
-                          *datum_r;
-       SEG                *union_d,
-                          *union_dl,
-                          *union_dr;
-       SEG                *inter_d;
-       bool            firsttime;
-       float           size_alpha,
-                               size_beta,
-                               size_union,
-                               size_inter;
-       float           size_waste,
-                               waste;
-       float           size_l,
-                               size_r;
-       int                     nbytes;
-       OffsetNumber seed_1 = 1,
-                               seed_2 = 2;
+                          *datum_r,
+                          *seg;
+       gseg_picksplit_item *sort_items;
        OffsetNumber *left,
                           *right;
        OffsetNumber maxoff;
+       OffsetNumber firstright;
 
 #ifdef GIST_DEBUG
        fprintf(stderr, "picksplit\n");
 #endif
 
-       maxoff = entryvec->n - 2;
-       nbytes = (maxoff + 2) * sizeof(OffsetNumber);
-       v->spl_left = (OffsetNumber *) palloc(nbytes);
-       v->spl_right = (OffsetNumber *) palloc(nbytes);
+       /* Valid items in entryvec->vector[] are indexed 1..maxoff */
+       maxoff = entryvec->n - 1;
 
-       firsttime = true;
-       waste = 0.0;
-
-       for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i))
+       /*
+        * Prepare the auxiliary array and sort it.
+        */
+       sort_items = (gseg_picksplit_item *)
+               palloc(maxoff * sizeof(gseg_picksplit_item));
+       for (i = 1; i <= maxoff; i++)
        {
-               datum_alpha = (SEG *) DatumGetPointer(entryvec->vector[i].key);
-               for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j))
-               {
-                       datum_beta = (SEG *) DatumGetPointer(entryvec->vector[j].key);
-
-                       /* compute the wasted space by unioning these guys */
-                       /* size_waste = size_union - size_inter; */
-                       union_d = seg_union(datum_alpha, datum_beta);
-                       rt_seg_size(union_d, &size_union);
-                       inter_d = seg_inter(datum_alpha, datum_beta);
-                       rt_seg_size(inter_d, &size_inter);
-                       size_waste = size_union - size_inter;
-
-                       /*
-                        * are these a more promising split that what we've already seen?
-                        */
-                       if (size_waste > waste || firsttime)
-                       {
-                               waste = size_waste;
-                               seed_1 = i;
-                               seed_2 = j;
-                               firsttime = false;
-                       }
-               }
+               seg = (SEG *) DatumGetPointer(entryvec->vector[i].key);
+               /* center calculation is done this way to avoid possible overflow */
+               sort_items[i - 1].center = seg->lower*0.5f + seg->upper*0.5f;
+               sort_items[i - 1].index = i;
+               sort_items[i - 1].data = seg;
        }
+       qsort(sort_items, maxoff, sizeof(gseg_picksplit_item),
+                 gseg_picksplit_item_cmp);
 
+       /* sort items below "firstright" will go into the left side */
+       firstright = maxoff / 2;
+
+       v->spl_left = (OffsetNumber *) palloc(maxoff * sizeof(OffsetNumber));
+       v->spl_right = (OffsetNumber *) palloc(maxoff * sizeof(OffsetNumber));
        left = v->spl_left;
        v->spl_nleft = 0;
        right = v->spl_right;
        v->spl_nright = 0;
 
-       datum_alpha = (SEG *) DatumGetPointer(entryvec->vector[seed_1].key);
-       datum_l = seg_union(datum_alpha, datum_alpha);
-       rt_seg_size(datum_l, &size_l);
-       datum_beta = (SEG *) DatumGetPointer(entryvec->vector[seed_2].key);
-       datum_r = seg_union(datum_beta, datum_beta);
-       rt_seg_size(datum_r, &size_r);
-
        /*
-        * Now split up the regions between the two seeds.      An important property
-        * of this split algorithm is that the split vector v has the indices of
-        * items to be split in order in its left and right vectors.  We exploit
-        * this property by doing a merge in the code that actually splits the
-        * page.
-        *
-        * For efficiency, we also place the new index tuple in this loop. This is
-        * handled at the very end, when we have placed all the existing tuples
-        * and i == maxoff + 1.
+        * Emit segments to the left output page, and compute its bounding box.
         */
-
-       maxoff = OffsetNumberNext(maxoff);
-       for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+       datum_l = (SEG *) palloc(sizeof(SEG));
+       memcpy(datum_l, sort_items[0].data, sizeof(SEG));
+       *left++ = sort_items[0].index;
+       v->spl_nleft++;
+       for (i = 1; i < firstright; i++)
        {
-               /*
-                * If we've already decided where to place this item, just put it on
-                * the right list.      Otherwise, we need to figure out which page needs
-                * the least enlargement in order to store the item.
-                */
-
-               if (i == seed_1)
-               {
-                       *left++ = i;
-                       v->spl_nleft++;
-                       continue;
-               }
-               else if (i == seed_2)
-               {
-                       *right++ = i;
-                       v->spl_nright++;
-                       continue;
-               }
-
-               /* okay, which page needs least enlargement? */
-               datum_alpha = (SEG *) DatumGetPointer(entryvec->vector[i].key);
-               union_dl = seg_union(datum_l, datum_alpha);
-               union_dr = seg_union(datum_r, datum_alpha);
-               rt_seg_size(union_dl, &size_alpha);
-               rt_seg_size(union_dr, &size_beta);
+               datum_l = seg_union(datum_l, sort_items[i].data);
+               *left++ = sort_items[i].index;
+               v->spl_nleft++;
+       }
 
-               /* pick which page to add it to */
-               if (size_alpha - size_l < size_beta - size_r)
-               {
-                       datum_l = union_dl;
-                       size_l = size_alpha;
-                       *left++ = i;
-                       v->spl_nleft++;
-               }
-               else
-               {
-                       datum_r = union_dr;
-                       size_r = size_alpha;
-                       *right++ = i;
-                       v->spl_nright++;
-               }
+       /*
+        * Likewise for the right page.
+        */
+       datum_r = (SEG *) palloc(sizeof(SEG));
+       memcpy(datum_r, sort_items[firstright].data, sizeof(SEG));
+       *right++ = sort_items[firstright].index;
+       v->spl_nright++;
+       for (i = firstright + 1; i < maxoff; i++)
+       {
+               datum_r = seg_union(datum_r, sort_items[i].data);
+               *right++ = sort_items[i].index;
+               v->spl_nright++;
        }
-       *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
 
        v->spl_ldatum = PointerGetDatum(datum_l);
        v->spl_rdatum = PointerGetDatum(datum_r);