]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/rangetypes_gist.c
Improve implementation of range-contains-element tests.
[postgresql] / src / backend / utils / adt / rangetypes_gist.c
1 /*-------------------------------------------------------------------------
2  *
3  * rangetypes_gist.c
4  *        GiST support for range types.
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/utils/adt/rangetypes_gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/gist.h"
18 #include "access/skey.h"
19 #include "utils/builtins.h"
20 #include "utils/rangetypes.h"
21
22
23 /* Operator strategy numbers used in the GiST range opclass */
24 /* Numbers are chosen to match up operator names with existing usages */
25 #define RANGESTRAT_BEFORE                               1
26 #define RANGESTRAT_OVERLEFT                             2
27 #define RANGESTRAT_OVERLAPS                             3
28 #define RANGESTRAT_OVERRIGHT                    4
29 #define RANGESTRAT_AFTER                                5
30 #define RANGESTRAT_ADJACENT                             6
31 #define RANGESTRAT_CONTAINS                             7
32 #define RANGESTRAT_CONTAINED_BY                 8
33 #define RANGESTRAT_CONTAINS_ELEM                16
34 #define RANGESTRAT_EQ                                   18
35 #define RANGESTRAT_NE                                   19
36
37 /*
38  * Auxiliary structure for picksplit method.
39  */
40 typedef struct
41 {
42         int                     index;                  /* original index in entryvec->vector[] */
43         RangeType  *data;                       /* range value to sort */
44         TypeCacheEntry *typcache;       /* range type's info */
45 }       PickSplitSortItem;
46
47 static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType * r1,
48                                   RangeType * r2);
49 static bool range_gist_consistent_int(FmgrInfo *flinfo,
50                                                   StrategyNumber strategy, RangeType *key,
51                                                   Datum query);
52 static bool range_gist_consistent_leaf(FmgrInfo *flinfo,
53                                                    StrategyNumber strategy, RangeType *key,
54                                                    Datum query);
55 static int      sort_item_cmp(const void *a, const void *b);
56
57
58 /* GiST query consistency check */
59 Datum
60 range_gist_consistent(PG_FUNCTION_ARGS)
61 {
62         GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
63         Datum           query = PG_GETARG_DATUM(1);
64         StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2);
65         /* Oid subtype = PG_GETARG_OID(3); */
66         bool       *recheck = (bool *) PG_GETARG_POINTER(4);
67         RangeType  *key = DatumGetRangeType(entry->key);
68
69         /* All operators served by this function are exact */
70         *recheck = false;
71
72         if (GIST_LEAF(entry))
73                 PG_RETURN_BOOL(range_gist_consistent_leaf(fcinfo->flinfo, strategy,
74                                                                                                   key, query));
75         else
76                 PG_RETURN_BOOL(range_gist_consistent_int(fcinfo->flinfo, strategy,
77                                                                                                  key, query));
78 }
79
80 /* form union range */
81 Datum
82 range_gist_union(PG_FUNCTION_ARGS)
83 {
84         GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
85         GISTENTRY  *ent = entryvec->vector;
86         RangeType  *result_range;
87         TypeCacheEntry *typcache;
88         int                     i;
89
90         result_range = DatumGetRangeType(ent[0].key);
91
92         typcache = range_get_typcache(fcinfo, RangeTypeGetOid(result_range));
93
94         for (i = 1; i < entryvec->n; i++)
95         {
96                 result_range = range_super_union(typcache, result_range,
97                                                                                  DatumGetRangeType(ent[i].key));
98         }
99
100         PG_RETURN_RANGE(result_range);
101 }
102
103 /* compress, decompress are no-ops */
104 Datum
105 range_gist_compress(PG_FUNCTION_ARGS)
106 {
107         GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
108
109         PG_RETURN_POINTER(entry);
110 }
111
112 Datum
113 range_gist_decompress(PG_FUNCTION_ARGS)
114 {
115         GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
116
117         PG_RETURN_POINTER(entry);
118 }
119
120 /* page split penalty function */
121 Datum
122 range_gist_penalty(PG_FUNCTION_ARGS)
123 {
124         GISTENTRY  *origentry = (GISTENTRY *) PG_GETARG_POINTER(0);
125         GISTENTRY  *newentry = (GISTENTRY *) PG_GETARG_POINTER(1);
126         float      *penalty = (float *) PG_GETARG_POINTER(2);
127         RangeType  *orig = DatumGetRangeType(origentry->key);
128         RangeType  *new = DatumGetRangeType(newentry->key);
129         TypeCacheEntry *typcache;
130         RangeType  *s_union;
131         FmgrInfo   *subtype_diff;
132         RangeBound      lower1,
133                                 lower2;
134         RangeBound      upper1,
135                                 upper2;
136         bool            empty1,
137                                 empty2;
138         float8          lower_diff,
139                                 upper_diff;
140
141         if (RangeTypeGetOid(orig) != RangeTypeGetOid(new))
142                 elog(ERROR, "range types do not match");
143
144         typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig));
145
146         subtype_diff = &typcache->rng_subdiff_finfo;
147
148         /*
149          * We want to compare the size of "orig" to size of "orig union new".
150          * The penalty will be the sum of the reduction in the lower bound plus
151          * the increase in the upper bound.
152          */
153         s_union = range_super_union(typcache, orig, new);
154
155         range_deserialize(typcache, orig, &lower1, &upper1, &empty1);
156         range_deserialize(typcache, s_union, &lower2, &upper2, &empty2);
157
158         /* handle cases where orig is empty */
159         if (empty1 && empty2)
160         {
161                 *penalty = 0;
162                 PG_RETURN_POINTER(penalty);
163         }
164         else if (empty1)
165         {
166                 if (lower2.infinite || upper2.infinite)
167                 {
168                         /* from empty to infinite */
169                         *penalty = get_float4_infinity();
170                         PG_RETURN_POINTER(penalty);
171                 }
172                 else if (OidIsValid(subtype_diff->fn_oid))
173                 {
174                         /* from empty to upper2-lower2 */
175                         *penalty = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
176                                                                                                                 typcache->rng_collation,
177                                                                                                                 upper2.val,
178                                                                                                                 lower2.val));
179                         /* upper2 must be >= lower2 */
180                         if (*penalty < 0)
181                                 *penalty = 0;           /* subtype_diff is broken */
182                         PG_RETURN_POINTER(penalty);
183                 }
184                 else
185                 {
186                         /* wild guess */
187                         *penalty = 1.0;
188                         PG_RETURN_POINTER(penalty);
189                 }
190         }
191
192         /* if orig isn't empty, s_union can't be either */
193         Assert(!empty2);
194
195         /* similarly, if orig's lower bound is infinite, s_union's must be too */
196         Assert(lower2.infinite || !lower1.infinite);
197
198         if (lower2.infinite && lower1.infinite)
199                 lower_diff = 0;
200         else if (lower2.infinite)
201                 lower_diff = get_float8_infinity();
202         else if (OidIsValid(subtype_diff->fn_oid))
203         {
204                 lower_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
205                                                                                                           typcache->rng_collation,
206                                                                                                           lower1.val,
207                                                                                                           lower2.val));
208                 /* orig's lower bound must be >= s_union's */
209                 if (lower_diff < 0)
210                         lower_diff = 0;         /* subtype_diff is broken */
211         }
212         else
213         {
214                 /* only know whether there is a difference or not */
215                 lower_diff = range_cmp_bounds(typcache, &lower1, &lower2) > 0 ? 1 : 0;
216         }
217
218         /* similarly, if orig's upper bound is infinite, s_union's must be too */
219         Assert(upper2.infinite || !upper1.infinite);
220
221         if (upper2.infinite && upper1.infinite)
222                 upper_diff = 0;
223         else if (upper2.infinite)
224                 upper_diff = get_float8_infinity();
225         else if (OidIsValid(subtype_diff->fn_oid))
226         {
227                 upper_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
228                                                                                                           typcache->rng_collation,
229                                                                                                           upper2.val,
230                                                                                                           upper1.val));
231                 /* orig's upper bound must be <= s_union's */
232                 if (upper_diff < 0)
233                         upper_diff = 0;         /* subtype_diff is broken */
234         }
235         else
236         {
237                 /* only know whether there is a difference or not */
238                 upper_diff = range_cmp_bounds(typcache, &upper2, &upper1) > 0 ? 1 : 0;
239         }
240
241         Assert(lower_diff >= 0 && upper_diff >= 0);
242
243         *penalty = (float) (lower_diff + upper_diff);
244         PG_RETURN_POINTER(penalty);
245 }
246
247 /*
248  * The GiST PickSplit method for ranges
249  *
250  * Algorithm based on sorting.  Incoming array of ranges is sorted using
251  * sort_item_cmp function.  After that first half of ranges goes to the left
252  * output, and the second half of ranges goes to the right output.
253  */
254 Datum
255 range_gist_picksplit(PG_FUNCTION_ARGS)
256 {
257         GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
258         GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
259         TypeCacheEntry *typcache;
260         OffsetNumber i;
261         RangeType  *pred_left;
262         RangeType  *pred_right;
263         PickSplitSortItem *sortItems;
264         int                     nbytes;
265         OffsetNumber split_idx;
266         OffsetNumber *left;
267         OffsetNumber *right;
268         OffsetNumber maxoff;
269
270         /* use first item to look up range type's info */
271         pred_left = DatumGetRangeType(entryvec->vector[FirstOffsetNumber].key);
272         typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left));
273
274         /* allocate result and work arrays */
275         maxoff = entryvec->n - 1;
276         nbytes = (maxoff + 1) * sizeof(OffsetNumber);
277         v->spl_left = (OffsetNumber *) palloc(nbytes);
278         v->spl_right = (OffsetNumber *) palloc(nbytes);
279         sortItems = (PickSplitSortItem *) palloc(maxoff * sizeof(PickSplitSortItem));
280
281         /*
282          * Prepare auxiliary array and sort the values.
283          */
284         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
285         {
286                 sortItems[i - 1].index = i;
287                 sortItems[i - 1].data = DatumGetRangeType(entryvec->vector[i].key);
288                 sortItems[i - 1].typcache = typcache;
289         }
290         qsort(sortItems, maxoff, sizeof(PickSplitSortItem), sort_item_cmp);
291
292         split_idx = maxoff / 2;
293
294         left = v->spl_left;
295         v->spl_nleft = 0;
296         right = v->spl_right;
297         v->spl_nright = 0;
298
299         /*
300          * First half of items goes to the left output.
301          */
302         pred_left = sortItems[0].data;
303         *left++ = sortItems[0].index;
304         v->spl_nleft++;
305         for (i = 1; i < split_idx; i++)
306         {
307                 pred_left = range_super_union(typcache, pred_left, sortItems[i].data);
308                 *left++ = sortItems[i].index;
309                 v->spl_nleft++;
310         }
311
312         /*
313          * Second half of items goes to the right output.
314          */
315         pred_right = sortItems[split_idx].data;
316         *right++ = sortItems[split_idx].index;
317         v->spl_nright++;
318         for (i = split_idx + 1; i < maxoff; i++)
319         {
320                 pred_right = range_super_union(typcache, pred_right, sortItems[i].data);
321                 *right++ = sortItems[i].index;
322                 v->spl_nright++;
323         }
324
325         *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
326
327         v->spl_ldatum = RangeTypeGetDatum(pred_left);
328         v->spl_rdatum = RangeTypeGetDatum(pred_right);
329
330         PG_RETURN_POINTER(v);
331 }
332
333 /* equality comparator for GiST */
334 Datum
335 range_gist_same(PG_FUNCTION_ARGS)
336 {
337         /* Datum r1 = PG_GETARG_DATUM(0); */
338         /* Datum r2 = PG_GETARG_DATUM(1); */
339         bool       *result = (bool *) PG_GETARG_POINTER(2);
340
341         /*
342          * We can safely call range_eq using our fcinfo directly; it won't notice
343          * the third argument.  This allows it to use fn_extra for caching.
344          */
345         *result = DatumGetBool(range_eq(fcinfo));
346
347         PG_RETURN_POINTER(result);
348 }
349
350 /*
351  *----------------------------------------------------------
352  * STATIC FUNCTIONS
353  *----------------------------------------------------------
354  */
355
356 /*
357  * Return the smallest range that contains r1 and r2
358  *
359  * XXX would it be better to redefine range_union as working this way?
360  */
361 static RangeType *
362 range_super_union(TypeCacheEntry *typcache, RangeType * r1, RangeType * r2)
363 {
364         RangeBound      lower1,
365                                 lower2;
366         RangeBound      upper1,
367                                 upper2;
368         bool            empty1,
369                                 empty2;
370         RangeBound *result_lower;
371         RangeBound *result_upper;
372
373         range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
374         range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
375
376         if (empty1)
377                 return r2;
378         if (empty2)
379                 return r1;
380
381         if (range_cmp_bounds(typcache, &lower1, &lower2) <= 0)
382                 result_lower = &lower1;
383         else
384                 result_lower = &lower2;
385
386         if (range_cmp_bounds(typcache, &upper1, &upper2) >= 0)
387                 result_upper = &upper1;
388         else
389                 result_upper = &upper2;
390
391         /* optimization to avoid constructing a new range */
392         if (result_lower == &lower1 && result_upper == &upper1)
393                 return r1;
394         if (result_lower == &lower2 && result_upper == &upper2)
395                 return r2;
396
397         return make_range(typcache, result_lower, result_upper, false);
398 }
399
400 /*
401  * trick function call: call the given function with given FmgrInfo
402  *
403  * To allow the various functions called here to cache lookups of range
404  * datatype information, we use a trick: we pass them the FmgrInfo struct
405  * for the GiST consistent function.  This relies on the knowledge that
406  * none of them consult FmgrInfo for anything but fn_extra, and that they
407  * all use fn_extra the same way, i.e. as a pointer to the typcache entry
408  * for the range data type.  Since the FmgrInfo is long-lived (it's actually
409  * part of the relcache entry for the index, typically) this essentially
410  * eliminates lookup overhead during operations on a GiST range index.
411  */
412 static Datum
413 TrickFunctionCall2(PGFunction proc, FmgrInfo *flinfo, Datum arg1, Datum arg2)
414 {
415         FunctionCallInfoData fcinfo;
416         Datum           result;
417
418         InitFunctionCallInfoData(fcinfo, flinfo, 2, InvalidOid, NULL, NULL);
419
420         fcinfo.arg[0] = arg1;
421         fcinfo.arg[1] = arg2;
422         fcinfo.argnull[0] = false;
423         fcinfo.argnull[1] = false;
424
425         result = (*proc) (&fcinfo);
426
427         if (fcinfo.isnull)
428                 elog(ERROR, "function %p returned NULL", proc);
429
430         return result;
431 }
432
433 /*
434  * GiST consistent test on an index internal page
435  */
436 static bool
437 range_gist_consistent_int(FmgrInfo *flinfo, StrategyNumber strategy,
438                                                   RangeType *key, Datum query)
439 {
440         PGFunction      proc;
441         bool            negate = false;
442         bool            retval;
443
444         switch (strategy)
445         {
446                 case RANGESTRAT_BEFORE:
447                         if (RangeIsEmpty(key))
448                                 return false;
449                         proc = range_overright;
450                         negate = true;
451                         break;
452                 case RANGESTRAT_OVERLEFT:
453                         if (RangeIsEmpty(key))
454                                 return false;
455                         proc = range_after;
456                         negate = true;
457                         break;
458                 case RANGESTRAT_OVERLAPS:
459                         proc = range_overlaps;
460                         break;
461                 case RANGESTRAT_OVERRIGHT:
462                         if (RangeIsEmpty(key))
463                                 return false;
464                         proc = range_before;
465                         negate = true;
466                         break;
467                 case RANGESTRAT_AFTER:
468                         if (RangeIsEmpty(key))
469                                 return false;
470                         proc = range_overleft;
471                         negate = true;
472                         break;
473                 case RANGESTRAT_ADJACENT:
474                         if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
475                                 return false;
476                         if (DatumGetBool(TrickFunctionCall2(range_adjacent, flinfo,
477                                                                                                 RangeTypeGetDatum(key),
478                                                                                                 query)))
479                                 return true;
480                         proc = range_overlaps;
481                         break;
482                 case RANGESTRAT_CONTAINS:
483                         proc = range_contains;
484                         break;
485                 case RANGESTRAT_CONTAINED_BY:
486                         return true;
487                         break;
488                 case RANGESTRAT_CONTAINS_ELEM:
489                         proc = range_contains_elem;
490                         break;
491                 case RANGESTRAT_EQ:
492                         proc = range_contains;
493                         break;
494                 case RANGESTRAT_NE:
495                         return true;
496                         break;
497                 default:
498                         elog(ERROR, "unrecognized range strategy: %d", strategy);
499                         proc = NULL;            /* keep compiler quiet */
500                         break;
501         }
502
503         retval = DatumGetBool(TrickFunctionCall2(proc, flinfo,
504                                                                                          RangeTypeGetDatum(key),
505                                                                                          query));
506         if (negate)
507                 retval = !retval;
508
509         return retval;
510 }
511
512 /*
513  * GiST consistent test on an index leaf page
514  */
515 static bool
516 range_gist_consistent_leaf(FmgrInfo *flinfo, StrategyNumber strategy,
517                                                    RangeType *key, Datum query)
518 {
519         PGFunction      proc;
520
521         switch (strategy)
522         {
523                 case RANGESTRAT_BEFORE:
524                         proc = range_before;
525                         break;
526                 case RANGESTRAT_OVERLEFT:
527                         proc = range_overleft;
528                         break;
529                 case RANGESTRAT_OVERLAPS:
530                         proc = range_overlaps;
531                         break;
532                 case RANGESTRAT_OVERRIGHT:
533                         proc = range_overright;
534                         break;
535                 case RANGESTRAT_AFTER:
536                         proc = range_after;
537                         break;
538                 case RANGESTRAT_ADJACENT:
539                         proc = range_adjacent;
540                         break;
541                 case RANGESTRAT_CONTAINS:
542                         proc = range_contains;
543                         break;
544                 case RANGESTRAT_CONTAINED_BY:
545                         proc = range_contained_by;
546                         break;
547                 case RANGESTRAT_CONTAINS_ELEM:
548                         proc = range_contains_elem;
549                         break;
550                 case RANGESTRAT_EQ:
551                         proc = range_eq;
552                         break;
553                 case RANGESTRAT_NE:
554                         proc = range_ne;
555                         break;
556                 default:
557                         elog(ERROR, "unrecognized range strategy: %d", strategy);
558                         proc = NULL;            /* keep compiler quiet */
559                         break;
560         }
561
562         return DatumGetBool(TrickFunctionCall2(proc, flinfo,
563                                                                                    RangeTypeGetDatum(key),
564                                                                                    query));
565 }
566
567 /*
568  * Compare function for PickSplitSortItem. This is actually the
569  * interesting part of the picksplit algorithm.
570  *
571  * We want to separate out empty ranges, bounded ranges, and unbounded
572  * ranges. We assume that "contains" and "overlaps" are the most
573  * important queries, so empty ranges will rarely match and unbounded
574  * ranges frequently will. Bounded ranges should be in the middle.
575  *
576  * Empty ranges we push all the way to the left, then bounded ranges
577  * (sorted on lower bound, then upper), then ranges with no lower
578  * bound, then ranges with no upper bound; and finally, ranges with no
579  * upper or lower bound all the way to the right.
580  */
581 static int
582 sort_item_cmp(const void *a, const void *b)
583 {
584         PickSplitSortItem *i1 = (PickSplitSortItem *) a;
585         PickSplitSortItem *i2 = (PickSplitSortItem *) b;
586         RangeType  *r1 = i1->data;
587         RangeType  *r2 = i2->data;
588         TypeCacheEntry *typcache = i1->typcache;
589         RangeBound      lower1,
590                                 lower2;
591         RangeBound      upper1,
592                                 upper2;
593         bool            empty1,
594                                 empty2;
595         int                     cmp;
596
597         range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
598         range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
599
600         if (empty1 || empty2)
601         {
602                 if (empty1 && empty2)
603                         return 0;
604                 else if (empty1)
605                         return -1;
606                 else if (empty2)
607                         return 1;
608                 else
609                         Assert(false);
610         }
611
612         /*
613          * If both lower or both upper bounds are infinite, we sort by ascending
614          * range size. That means that if both upper bounds are infinite, we sort
615          * by the lower bound _descending_. That creates a slightly odd total
616          * order, but keeps the pages with very unselective predicates grouped
617          * more closely together on the right.
618          */
619         if (lower1.infinite || upper1.infinite ||
620                 lower2.infinite || upper2.infinite)
621         {
622                 if (lower1.infinite && lower2.infinite)
623                         return range_cmp_bounds(typcache, &upper1, &upper2);
624                 else if (lower1.infinite)
625                         return -1;
626                 else if (lower2.infinite)
627                         return 1;
628                 else if (upper1.infinite && upper2.infinite)
629                         return -(range_cmp_bounds(typcache, &lower1, &lower2));
630                 else if (upper1.infinite)
631                         return 1;
632                 else if (upper2.infinite)
633                         return -1;
634                 else
635                         Assert(false);
636         }
637
638         if ((cmp = range_cmp_bounds(typcache, &lower1, &lower2)) != 0)
639                 return cmp;
640
641         return range_cmp_bounds(typcache, &upper1, &upper2);
642 }