]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/rangetypes_gist.c
815d2bf8d0a08770e3516898eb27ccb9027d52b9
[postgresql] / src / backend / utils / adt / rangetypes_gist.c
1 /*-------------------------------------------------------------------------
2  *
3  * rangetypes_gist.c
4  *        GiST support for range types.
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/utils/adt/rangetypes_gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/gist.h"
18 #include "access/skey.h"
19 #include "utils/builtins.h"
20 #include "utils/rangetypes.h"
21
22
23 /* Operator strategy numbers used in the GiST range opclass */
24 /* Numbers are chosen to match up operator names with existing usages */
25 #define RANGESTRAT_BEFORE                               1
26 #define RANGESTRAT_OVERLEFT                             2
27 #define RANGESTRAT_OVERLAPS                             3
28 #define RANGESTRAT_OVERRIGHT                    4
29 #define RANGESTRAT_AFTER                                5
30 #define RANGESTRAT_ADJACENT                             6
31 #define RANGESTRAT_CONTAINS                             7
32 #define RANGESTRAT_CONTAINED_BY                 8
33 #define RANGESTRAT_CONTAINS_ELEM                16
34 #define RANGESTRAT_ELEM_CONTAINED_BY    17
35 #define RANGESTRAT_EQ                                   18
36 #define RANGESTRAT_NE                                   19
37
38 /*
39  * Auxiliary structure for picksplit method.
40  */
41 typedef struct
42 {
43         int                     index;                  /* original index in entryvec->vector[] */
44         RangeType  *data;                       /* range value to sort */
45         TypeCacheEntry *typcache;       /* range type's info */
46 }       PickSplitSortItem;
47
48 static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType * r1,
49                                   RangeType * r2);
50 static bool range_gist_consistent_int(FmgrInfo *flinfo,
51                                                   StrategyNumber strategy, RangeType * key,
52                                                   RangeType * query);
53 static bool range_gist_consistent_leaf(FmgrInfo *flinfo,
54                                                    StrategyNumber strategy, RangeType * key,
55                                                    RangeType * query);
56 static int      sort_item_cmp(const void *a, const void *b);
57
58
59 /* GiST query consistency check */
60 Datum
61 range_gist_consistent(PG_FUNCTION_ARGS)
62 {
63         GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
64         Datum           dquery = PG_GETARG_DATUM(1);
65         StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2);
66         /* Oid subtype = PG_GETARG_OID(3); */
67         bool       *recheck = (bool *) PG_GETARG_POINTER(4);
68         RangeType  *key = DatumGetRangeType(entry->key);
69         TypeCacheEntry *typcache;
70         RangeType  *query;
71
72         /* All operators served by this function are exact */
73         *recheck = false;
74
75         switch (strategy)
76         {
77                 /*
78                  * For element contains and contained by operators, the other operand
79                  * is a "point" of the subtype.  Construct a singleton range
80                  * containing just that value.  (Since range_contains_elem and
81                  * elem_contained_by_range would do that anyway, it's actually more
82                  * efficient not less so to merge these cases into range containment
83                  * at this step.  But revisit this if we ever change the implementation
84                  * of those functions.)
85                  */
86                 case RANGESTRAT_CONTAINS_ELEM:
87                 case RANGESTRAT_ELEM_CONTAINED_BY:
88                         typcache = range_get_typcache(fcinfo, RangeTypeGetOid(key));
89                         query = make_singleton_range(typcache, dquery);
90                         break;
91
92                 default:
93                         query = DatumGetRangeType(dquery);
94                         break;
95         }
96
97         if (GIST_LEAF(entry))
98                 PG_RETURN_BOOL(range_gist_consistent_leaf(fcinfo->flinfo, strategy,
99                                                                                                   key, query));
100         else
101                 PG_RETURN_BOOL(range_gist_consistent_int(fcinfo->flinfo, strategy,
102                                                                                                  key, query));
103 }
104
105 /* form union range */
106 Datum
107 range_gist_union(PG_FUNCTION_ARGS)
108 {
109         GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
110         GISTENTRY  *ent = entryvec->vector;
111         RangeType  *result_range;
112         TypeCacheEntry *typcache;
113         int                     i;
114
115         result_range = DatumGetRangeType(ent[0].key);
116
117         typcache = range_get_typcache(fcinfo, RangeTypeGetOid(result_range));
118
119         for (i = 1; i < entryvec->n; i++)
120         {
121                 result_range = range_super_union(typcache, result_range,
122                                                                                  DatumGetRangeType(ent[i].key));
123         }
124
125         PG_RETURN_RANGE(result_range);
126 }
127
128 /* compress, decompress are no-ops */
129 Datum
130 range_gist_compress(PG_FUNCTION_ARGS)
131 {
132         GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
133
134         PG_RETURN_POINTER(entry);
135 }
136
137 Datum
138 range_gist_decompress(PG_FUNCTION_ARGS)
139 {
140         GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
141
142         PG_RETURN_POINTER(entry);
143 }
144
145 /* page split penalty function */
146 Datum
147 range_gist_penalty(PG_FUNCTION_ARGS)
148 {
149         GISTENTRY  *origentry = (GISTENTRY *) PG_GETARG_POINTER(0);
150         GISTENTRY  *newentry = (GISTENTRY *) PG_GETARG_POINTER(1);
151         float      *penalty = (float *) PG_GETARG_POINTER(2);
152         RangeType  *orig = DatumGetRangeType(origentry->key);
153         RangeType  *new = DatumGetRangeType(newentry->key);
154         TypeCacheEntry *typcache;
155         RangeType  *s_union;
156         FmgrInfo   *subtype_diff;
157         RangeBound      lower1,
158                                 lower2;
159         RangeBound      upper1,
160                                 upper2;
161         bool            empty1,
162                                 empty2;
163         float8          lower_diff,
164                                 upper_diff;
165
166         if (RangeTypeGetOid(orig) != RangeTypeGetOid(new))
167                 elog(ERROR, "range types do not match");
168
169         typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig));
170
171         subtype_diff = &typcache->rng_subdiff_finfo;
172
173         /* we want to compare the size of "orig" to size of "orig union new" */
174         s_union = range_super_union(typcache, orig, new);
175
176         range_deserialize(typcache, orig, &lower1, &upper1, &empty1);
177         range_deserialize(typcache, s_union, &lower2, &upper2, &empty2);
178
179         /* if orig isn't empty, s_union can't be either */
180         Assert(empty1 || !empty2);
181
182         if (empty1 && empty2)
183         {
184                 *penalty = 0;
185                 PG_RETURN_POINTER(penalty);
186         }
187         else if (empty1 && !empty2)
188         {
189                 if (lower2.infinite || upper2.infinite)
190                 {
191                         /* from empty to infinite */
192                         *penalty = get_float4_infinity();
193                         PG_RETURN_POINTER(penalty);
194                 }
195                 else if (OidIsValid(subtype_diff->fn_oid))
196                 {
197                         /* from empty to upper2-lower2 */
198                         *penalty = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
199                                                                                                                 typcache->rng_collation,
200                                                                                                                 upper2.val,
201                                                                                                                 lower2.val));
202                         if (*penalty < 0)
203                                 *penalty = 0;           /* subtype_diff is broken */
204                         PG_RETURN_POINTER(penalty);
205                 }
206                 else
207                 {
208                         /* wild guess */
209                         *penalty = 1.0;
210                         PG_RETURN_POINTER(penalty);
211                 }
212         }
213
214         Assert(lower2.infinite || !lower1.infinite);
215
216         if (lower2.infinite && !lower1.infinite)
217                 lower_diff = get_float8_infinity();
218         else if (lower2.infinite && lower1.infinite)
219                 lower_diff = 0;
220         else if (OidIsValid(subtype_diff->fn_oid))
221         {
222                 lower_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
223                                                                                                           typcache->rng_collation,
224                                                                                                           lower1.val,
225                                                                                                           lower2.val));
226                 if (lower_diff < 0)
227                         lower_diff = 0;         /* subtype_diff is broken */
228         }
229         else
230         {
231                 /* only know whether there is a difference or not */
232                 lower_diff = (float) range_cmp_bounds(typcache, &lower1, &lower2);
233         }
234
235         Assert(upper2.infinite || !upper1.infinite);
236
237         if (upper2.infinite && !upper1.infinite)
238                 upper_diff = get_float8_infinity();
239         else if (upper2.infinite && upper1.infinite)
240                 upper_diff = 0;
241         else if (OidIsValid(subtype_diff->fn_oid))
242         {
243                 upper_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
244                                                                                                           typcache->rng_collation,
245                                                                                                           upper2.val,
246                                                                                                           upper1.val));
247                 if (upper_diff < 0)
248                         upper_diff = 0;         /* subtype_diff is broken */
249         }
250         else
251         {
252                 /* only know whether there is a difference or not */
253                 upper_diff = (float) range_cmp_bounds(typcache, &upper2, &upper1);
254         }
255
256         Assert(lower_diff >= 0 && upper_diff >= 0);
257
258         *penalty = (float) (lower_diff + upper_diff);
259         PG_RETURN_POINTER(penalty);
260 }
261
262 /*
263  * The GiST PickSplit method for ranges
264  *
265  * Algorithm based on sorting.  Incoming array of ranges is sorted using
266  * sort_item_cmp function.  After that first half of ranges goes to the left
267  * output, and the second half of ranges goes to the right output.
268  */
269 Datum
270 range_gist_picksplit(PG_FUNCTION_ARGS)
271 {
272         GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
273         GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
274         TypeCacheEntry *typcache;
275         OffsetNumber i;
276         RangeType  *pred_left;
277         RangeType  *pred_right;
278         PickSplitSortItem *sortItems;
279         int                     nbytes;
280         OffsetNumber split_idx;
281         OffsetNumber *left;
282         OffsetNumber *right;
283         OffsetNumber maxoff;
284
285         /* use first item to look up range type's info */
286         pred_left = DatumGetRangeType(entryvec->vector[FirstOffsetNumber].key);
287         typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left));
288
289         /* allocate result and work arrays */
290         maxoff = entryvec->n - 1;
291         nbytes = (maxoff + 1) * sizeof(OffsetNumber);
292         v->spl_left = (OffsetNumber *) palloc(nbytes);
293         v->spl_right = (OffsetNumber *) palloc(nbytes);
294         sortItems = (PickSplitSortItem *) palloc(maxoff * sizeof(PickSplitSortItem));
295
296         /*
297          * Prepare auxiliary array and sort the values.
298          */
299         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
300         {
301                 sortItems[i - 1].index = i;
302                 sortItems[i - 1].data = DatumGetRangeType(entryvec->vector[i].key);
303                 sortItems[i - 1].typcache = typcache;
304         }
305         qsort(sortItems, maxoff, sizeof(PickSplitSortItem), sort_item_cmp);
306
307         split_idx = maxoff / 2;
308
309         left = v->spl_left;
310         v->spl_nleft = 0;
311         right = v->spl_right;
312         v->spl_nright = 0;
313
314         /*
315          * First half of items goes to the left output.
316          */
317         pred_left = sortItems[0].data;
318         *left++ = sortItems[0].index;
319         v->spl_nleft++;
320         for (i = 1; i < split_idx; i++)
321         {
322                 pred_left = range_super_union(typcache, pred_left, sortItems[i].data);
323                 *left++ = sortItems[i].index;
324                 v->spl_nleft++;
325         }
326
327         /*
328          * Second half of items goes to the right output.
329          */
330         pred_right = sortItems[split_idx].data;
331         *right++ = sortItems[split_idx].index;
332         v->spl_nright++;
333         for (i = split_idx + 1; i < maxoff; i++)
334         {
335                 pred_right = range_super_union(typcache, pred_right, sortItems[i].data);
336                 *right++ = sortItems[i].index;
337                 v->spl_nright++;
338         }
339
340         *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
341
342         v->spl_ldatum = RangeTypeGetDatum(pred_left);
343         v->spl_rdatum = RangeTypeGetDatum(pred_right);
344
345         PG_RETURN_POINTER(v);
346 }
347
348 /* equality comparator for GiST */
349 Datum
350 range_gist_same(PG_FUNCTION_ARGS)
351 {
352         /* Datum r1 = PG_GETARG_DATUM(0); */
353         /* Datum r2 = PG_GETARG_DATUM(1); */
354         bool       *result = (bool *) PG_GETARG_POINTER(2);
355
356         /*
357          * We can safely call range_eq using our fcinfo directly; it won't notice
358          * the third argument.  This allows it to use fn_extra for caching.
359          */
360         *result = DatumGetBool(range_eq(fcinfo));
361
362         PG_RETURN_POINTER(result);
363 }
364
365 /*
366  *----------------------------------------------------------
367  * STATIC FUNCTIONS
368  *----------------------------------------------------------
369  */
370
371 /*
372  * Return the smallest range that contains r1 and r2
373  *
374  * XXX would it be better to redefine range_union as working this way?
375  */
376 static RangeType *
377 range_super_union(TypeCacheEntry *typcache, RangeType * r1, RangeType * r2)
378 {
379         RangeBound      lower1,
380                                 lower2;
381         RangeBound      upper1,
382                                 upper2;
383         bool            empty1,
384                                 empty2;
385         RangeBound *result_lower;
386         RangeBound *result_upper;
387
388         range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
389         range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
390
391         if (empty1)
392                 return r2;
393         if (empty2)
394                 return r1;
395
396         if (range_cmp_bounds(typcache, &lower1, &lower2) <= 0)
397                 result_lower = &lower1;
398         else
399                 result_lower = &lower2;
400
401         if (range_cmp_bounds(typcache, &upper1, &upper2) >= 0)
402                 result_upper = &upper1;
403         else
404                 result_upper = &upper2;
405
406         /* optimization to avoid constructing a new range */
407         if (result_lower == &lower1 && result_upper == &upper1)
408                 return r1;
409         if (result_lower == &lower2 && result_upper == &upper2)
410                 return r2;
411
412         return make_range(typcache, result_lower, result_upper, false);
413 }
414
415 /*
416  * trick function call: call the given function with given FmgrInfo
417  *
418  * To allow the various functions called here to cache lookups of range
419  * datatype information, we use a trick: we pass them the FmgrInfo struct
420  * for the GiST consistent function.  This relies on the knowledge that
421  * none of them consult FmgrInfo for anything but fn_extra, and that they
422  * all use fn_extra the same way, i.e. as a pointer to the typcache entry
423  * for the range data type.  Since the FmgrInfo is long-lived (it's actually
424  * part of the relcache entry for the index, typically) this essentially
425  * eliminates lookup overhead during operations on a GiST range index.
426  */
427 static Datum
428 TrickFunctionCall2(PGFunction proc, FmgrInfo *flinfo, Datum arg1, Datum arg2)
429 {
430         FunctionCallInfoData fcinfo;
431         Datum           result;
432
433         InitFunctionCallInfoData(fcinfo, flinfo, 2, InvalidOid, NULL, NULL);
434
435         fcinfo.arg[0] = arg1;
436         fcinfo.arg[1] = arg2;
437         fcinfo.argnull[0] = false;
438         fcinfo.argnull[1] = false;
439
440         result = (*proc) (&fcinfo);
441
442         if (fcinfo.isnull)
443                 elog(ERROR, "function %p returned NULL", proc);
444
445         return result;
446 }
447
448 /*
449  * GiST consistent test on an index internal page
450  */
451 static bool
452 range_gist_consistent_int(FmgrInfo *flinfo, StrategyNumber strategy,
453                                                   RangeType * key, RangeType * query)
454 {
455         PGFunction      proc;
456         bool            negate = false;
457         bool            retval;
458
459         switch (strategy)
460         {
461                 case RANGESTRAT_BEFORE:
462                         if (RangeIsEmpty(key))
463                                 return false;
464                         proc = range_overright;
465                         negate = true;
466                         break;
467                 case RANGESTRAT_OVERLEFT:
468                         if (RangeIsEmpty(key))
469                                 return false;
470                         proc = range_after;
471                         negate = true;
472                         break;
473                 case RANGESTRAT_OVERLAPS:
474                         proc = range_overlaps;
475                         break;
476                 case RANGESTRAT_OVERRIGHT:
477                         if (RangeIsEmpty(key))
478                                 return false;
479                         proc = range_before;
480                         negate = true;
481                         break;
482                 case RANGESTRAT_AFTER:
483                         if (RangeIsEmpty(key))
484                                 return false;
485                         proc = range_overleft;
486                         negate = true;
487                         break;
488                 case RANGESTRAT_ADJACENT:
489                         if (RangeIsEmpty(key) || RangeIsEmpty(query))
490                                 return false;
491                         if (DatumGetBool(TrickFunctionCall2(range_adjacent, flinfo,
492                                                                                                 RangeTypeGetDatum(key),
493                                                                                                 RangeTypeGetDatum(query))))
494                                 return true;
495                         proc = range_overlaps;
496                         break;
497                 case RANGESTRAT_CONTAINS:
498                 case RANGESTRAT_CONTAINS_ELEM:
499                         proc = range_contains;
500                         break;
501                 case RANGESTRAT_CONTAINED_BY:
502                 case RANGESTRAT_ELEM_CONTAINED_BY:
503                         return true;
504                         break;
505                 case RANGESTRAT_EQ:
506                         proc = range_contains;
507                         break;
508                 case RANGESTRAT_NE:
509                         return true;
510                         break;
511                 default:
512                         elog(ERROR, "unrecognized range strategy: %d", strategy);
513                         proc = NULL;            /* keep compiler quiet */
514                         break;
515         }
516
517         retval = DatumGetBool(TrickFunctionCall2(proc, flinfo,
518                                                                                          RangeTypeGetDatum(key),
519                                                                                          RangeTypeGetDatum(query)));
520         if (negate)
521                 retval = !retval;
522
523         return retval;
524 }
525
526 /*
527  * GiST consistent test on an index leaf page
528  */
529 static bool
530 range_gist_consistent_leaf(FmgrInfo *flinfo, StrategyNumber strategy,
531                                                    RangeType * key, RangeType * query)
532 {
533         PGFunction      proc;
534
535         switch (strategy)
536         {
537                 case RANGESTRAT_BEFORE:
538                         if (RangeIsEmpty(key) || RangeIsEmpty(query))
539                                 return false;
540                         proc = range_before;
541                         break;
542                 case RANGESTRAT_OVERLEFT:
543                         if (RangeIsEmpty(key) || RangeIsEmpty(query))
544                                 return false;
545                         proc = range_overleft;
546                         break;
547                 case RANGESTRAT_OVERLAPS:
548                         proc = range_overlaps;
549                         break;
550                 case RANGESTRAT_OVERRIGHT:
551                         if (RangeIsEmpty(key) || RangeIsEmpty(query))
552                                 return false;
553                         proc = range_overright;
554                         break;
555                 case RANGESTRAT_AFTER:
556                         if (RangeIsEmpty(key) || RangeIsEmpty(query))
557                                 return false;
558                         proc = range_after;
559                         break;
560                 case RANGESTRAT_ADJACENT:
561                         if (RangeIsEmpty(key) || RangeIsEmpty(query))
562                                 return false;
563                         proc = range_adjacent;
564                         break;
565                 case RANGESTRAT_CONTAINS:
566                 case RANGESTRAT_CONTAINS_ELEM:
567                         proc = range_contains;
568                         break;
569                 case RANGESTRAT_CONTAINED_BY:
570                 case RANGESTRAT_ELEM_CONTAINED_BY:
571                         proc = range_contained_by;
572                         break;
573                 case RANGESTRAT_EQ:
574                         proc = range_eq;
575                         break;
576                 case RANGESTRAT_NE:
577                         proc = range_ne;
578                         break;
579                 default:
580                         elog(ERROR, "unrecognized range strategy: %d", strategy);
581                         proc = NULL;            /* keep compiler quiet */
582                         break;
583         }
584
585         return DatumGetBool(TrickFunctionCall2(proc, flinfo,
586                                                                                    RangeTypeGetDatum(key),
587                                                                                    RangeTypeGetDatum(query)));
588 }
589
590 /*
591  * Compare function for PickSplitSortItem. This is actually the
592  * interesting part of the picksplit algorithm.
593  *
594  * We want to separate out empty ranges, bounded ranges, and unbounded
595  * ranges. We assume that "contains" and "overlaps" are the most
596  * important queries, so empty ranges will rarely match and unbounded
597  * ranges frequently will. Bounded ranges should be in the middle.
598  *
599  * Empty ranges we push all the way to the left, then bounded ranges
600  * (sorted on lower bound, then upper), then ranges with no lower
601  * bound, then ranges with no upper bound; and finally, ranges with no
602  * upper or lower bound all the way to the right.
603  */
604 static int
605 sort_item_cmp(const void *a, const void *b)
606 {
607         PickSplitSortItem *i1 = (PickSplitSortItem *) a;
608         PickSplitSortItem *i2 = (PickSplitSortItem *) b;
609         RangeType  *r1 = i1->data;
610         RangeType  *r2 = i2->data;
611         TypeCacheEntry *typcache = i1->typcache;
612         RangeBound      lower1,
613                                 lower2;
614         RangeBound      upper1,
615                                 upper2;
616         bool            empty1,
617                                 empty2;
618         int                     cmp;
619
620         range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
621         range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
622
623         if (empty1 || empty2)
624         {
625                 if (empty1 && empty2)
626                         return 0;
627                 else if (empty1)
628                         return -1;
629                 else if (empty2)
630                         return 1;
631                 else
632                         Assert(false);
633         }
634
635         /*
636          * If both lower or both upper bounds are infinite, we sort by ascending
637          * range size. That means that if both upper bounds are infinite, we sort
638          * by the lower bound _descending_. That creates a slightly odd total
639          * order, but keeps the pages with very unselective predicates grouped
640          * more closely together on the right.
641          */
642         if (lower1.infinite || upper1.infinite ||
643                 lower2.infinite || upper2.infinite)
644         {
645                 if (lower1.infinite && lower2.infinite)
646                         return range_cmp_bounds(typcache, &upper1, &upper2);
647                 else if (lower1.infinite)
648                         return -1;
649                 else if (lower2.infinite)
650                         return 1;
651                 else if (upper1.infinite && upper2.infinite)
652                         return -1 * range_cmp_bounds(typcache, &lower1, &lower2);
653                 else if (upper1.infinite)
654                         return 1;
655                 else if (upper2.infinite)
656                         return -1;
657                 else
658                         Assert(false);
659         }
660
661         if ((cmp = range_cmp_bounds(typcache, &lower1, &lower2)) != 0)
662                 return cmp;
663
664         return range_cmp_bounds(typcache, &upper1, &upper2);
665 }