]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtutils.c
Teach btree to handle ScalarArrayOpExpr quals natively.
[postgresql] / src / backend / access / nbtree / nbtutils.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtutils.c
4  *        Utility code for Postgres btree implementation.
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/nbtree/nbtutils.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include <time.h>
19
20 #include "access/nbtree.h"
21 #include "access/reloptions.h"
22 #include "access/relscan.h"
23 #include "miscadmin.h"
24 #include "utils/array.h"
25 #include "utils/lsyscache.h"
26 #include "utils/memutils.h"
27 #include "utils/rel.h"
28
29
30 typedef struct BTSortArrayContext
31 {
32         FmgrInfo        flinfo;
33         Oid                     collation;
34         bool            reverse;
35 } BTSortArrayContext;
36
37 static Datum _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
38                                                  StrategyNumber strat,
39                                                  Datum *elems, int nelems);
40 static int      _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
41                                                 bool reverse,
42                                                 Datum *elems, int nelems);
43 static int _bt_compare_array_elements(const void *a, const void *b, void *arg);
44 static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
45                                                  ScanKey leftarg, ScanKey rightarg,
46                                                  bool *result);
47 static bool _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption);
48 static void _bt_mark_scankey_required(ScanKey skey);
49 static bool _bt_check_rowcompare(ScanKey skey,
50                                          IndexTuple tuple, TupleDesc tupdesc,
51                                          ScanDirection dir, bool *continuescan);
52
53
54 /*
55  * _bt_mkscankey
56  *              Build an insertion scan key that contains comparison data from itup
57  *              as well as comparator routines appropriate to the key datatypes.
58  *
59  *              The result is intended for use with _bt_compare().
60  */
61 ScanKey
62 _bt_mkscankey(Relation rel, IndexTuple itup)
63 {
64         ScanKey         skey;
65         TupleDesc       itupdesc;
66         int                     natts;
67         int16      *indoption;
68         int                     i;
69
70         itupdesc = RelationGetDescr(rel);
71         natts = RelationGetNumberOfAttributes(rel);
72         indoption = rel->rd_indoption;
73
74         skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
75
76         for (i = 0; i < natts; i++)
77         {
78                 FmgrInfo   *procinfo;
79                 Datum           arg;
80                 bool            null;
81                 int                     flags;
82
83                 /*
84                  * We can use the cached (default) support procs since no cross-type
85                  * comparison can be needed.
86                  */
87                 procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
88                 arg = index_getattr(itup, i + 1, itupdesc, &null);
89                 flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
90                 ScanKeyEntryInitializeWithInfo(&skey[i],
91                                                                            flags,
92                                                                            (AttrNumber) (i + 1),
93                                                                            InvalidStrategy,
94                                                                            InvalidOid,
95                                                                            rel->rd_indcollation[i],
96                                                                            procinfo,
97                                                                            arg);
98         }
99
100         return skey;
101 }
102
103 /*
104  * _bt_mkscankey_nodata
105  *              Build an insertion scan key that contains 3-way comparator routines
106  *              appropriate to the key datatypes, but no comparison data.  The
107  *              comparison data ultimately used must match the key datatypes.
108  *
109  *              The result cannot be used with _bt_compare(), unless comparison
110  *              data is first stored into the key entries.      Currently this
111  *              routine is only called by nbtsort.c and tuplesort.c, which have
112  *              their own comparison routines.
113  */
114 ScanKey
115 _bt_mkscankey_nodata(Relation rel)
116 {
117         ScanKey         skey;
118         int                     natts;
119         int16      *indoption;
120         int                     i;
121
122         natts = RelationGetNumberOfAttributes(rel);
123         indoption = rel->rd_indoption;
124
125         skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
126
127         for (i = 0; i < natts; i++)
128         {
129                 FmgrInfo   *procinfo;
130                 int                     flags;
131
132                 /*
133                  * We can use the cached (default) support procs since no cross-type
134                  * comparison can be needed.
135                  */
136                 procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
137                 flags = SK_ISNULL | (indoption[i] << SK_BT_INDOPTION_SHIFT);
138                 ScanKeyEntryInitializeWithInfo(&skey[i],
139                                                                            flags,
140                                                                            (AttrNumber) (i + 1),
141                                                                            InvalidStrategy,
142                                                                            InvalidOid,
143                                                                            rel->rd_indcollation[i],
144                                                                            procinfo,
145                                                                            (Datum) 0);
146         }
147
148         return skey;
149 }
150
151 /*
152  * free a scan key made by either _bt_mkscankey or _bt_mkscankey_nodata.
153  */
154 void
155 _bt_freeskey(ScanKey skey)
156 {
157         pfree(skey);
158 }
159
160 /*
161  * free a retracement stack made by _bt_search.
162  */
163 void
164 _bt_freestack(BTStack stack)
165 {
166         BTStack         ostack;
167
168         while (stack != NULL)
169         {
170                 ostack = stack;
171                 stack = stack->bts_parent;
172                 pfree(ostack);
173         }
174 }
175
176
177 /*
178  *      _bt_preprocess_array_keys() -- Preprocess SK_SEARCHARRAY scan keys
179  *
180  * If there are any SK_SEARCHARRAY scan keys, deconstruct the array(s) and
181  * set up BTArrayKeyInfo info for each one that is an equality-type key.
182  * Prepare modified scan keys in so->arrayKeyData, which will hold the current
183  * array elements during each primitive indexscan operation.  For inequality
184  * array keys, it's sufficient to find the extreme element value and replace
185  * the whole array with that scalar value.
186  *
187  * Note: the reason we need so->arrayKeyData, rather than just scribbling
188  * on scan->keyData, is that callers are permitted to call btrescan without
189  * supplying a new set of scankey data.
190  */
191 void
192 _bt_preprocess_array_keys(IndexScanDesc scan)
193 {
194         BTScanOpaque so = (BTScanOpaque) scan->opaque;
195         int                     numberOfKeys = scan->numberOfKeys;
196         int16      *indoption = scan->indexRelation->rd_indoption;
197         int                     numArrayKeys;
198         ScanKey         cur;
199         int                     i;
200         MemoryContext oldContext;
201
202         /* Quick check to see if there are any array keys */
203         numArrayKeys = 0;
204         for (i = 0; i < numberOfKeys; i++)
205         {
206                 cur = &scan->keyData[i];
207                 if (cur->sk_flags & SK_SEARCHARRAY)
208                 {
209                         numArrayKeys++;
210                         Assert(!(cur->sk_flags & (SK_ROW_HEADER | SK_SEARCHNULL | SK_SEARCHNOTNULL)));
211                         /* If any arrays are null as a whole, we can quit right now. */
212                         if (cur->sk_flags & SK_ISNULL)
213                         {
214                                 so->numArrayKeys = -1;
215                                 so->arrayKeyData = NULL;
216                                 return;
217                         }
218                 }
219         }
220
221         /* Quit if nothing to do. */
222         if (numArrayKeys == 0)
223         {
224                 so->numArrayKeys = 0;
225                 so->arrayKeyData = NULL;
226                 return;
227         }
228
229         /*
230          * Make a scan-lifespan context to hold array-associated data, or reset
231          * it if we already have one from a previous rescan cycle.
232          */
233         if (so->arrayContext == NULL)
234                 so->arrayContext = AllocSetContextCreate(CurrentMemoryContext,
235                                                                                                  "BTree Array Context",
236                                                                                                  ALLOCSET_SMALL_MINSIZE,
237                                                                                                  ALLOCSET_SMALL_INITSIZE,
238                                                                                                  ALLOCSET_SMALL_MAXSIZE);
239         else
240                 MemoryContextReset(so->arrayContext);
241
242         oldContext = MemoryContextSwitchTo(so->arrayContext);
243
244         /* Create modifiable copy of scan->keyData in the workspace context */
245         so->arrayKeyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
246         memcpy(so->arrayKeyData,
247                    scan->keyData,
248                    scan->numberOfKeys * sizeof(ScanKeyData));
249
250         /* Allocate space for per-array data in the workspace context */
251         so->arrayKeys = (BTArrayKeyInfo *) palloc0(numArrayKeys * sizeof(BTArrayKeyInfo));
252
253         /* Now process each array key */
254         numArrayKeys = 0;
255         for (i = 0; i < numberOfKeys; i++)
256         {
257                 ArrayType  *arrayval;
258                 int16           elmlen;
259                 bool            elmbyval;
260                 char            elmalign;
261                 int                     num_elems;
262                 Datum      *elem_values;
263                 bool       *elem_nulls;
264                 int                     num_nonnulls;
265                 int                     j;
266
267                 cur = &so->arrayKeyData[i];
268                 if (!(cur->sk_flags & SK_SEARCHARRAY))
269                         continue;
270
271                 /*
272                  * First, deconstruct the array into elements.  Anything allocated
273                  * here (including a possibly detoasted array value) is in the
274                  * workspace context.
275                  */
276                 arrayval = DatumGetArrayTypeP(cur->sk_argument);
277                 /* We could cache this data, but not clear it's worth it */
278                 get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
279                                                          &elmlen, &elmbyval, &elmalign);
280                 deconstruct_array(arrayval,
281                                                   ARR_ELEMTYPE(arrayval),
282                                                   elmlen, elmbyval, elmalign,
283                                                   &elem_values, &elem_nulls, &num_elems);
284
285                 /*
286                  * Compress out any null elements.  We can ignore them since we assume
287                  * all btree operators are strict.
288                  */
289                 num_nonnulls = 0;
290                 for (j = 0; j < num_elems; j++)
291                 {
292                         if (!elem_nulls[j])
293                                 elem_values[num_nonnulls++] = elem_values[j];
294                 }
295
296                 /* We could pfree(elem_nulls) now, but not worth the cycles */
297
298                 /* If there's no non-nulls, the scan qual is unsatisfiable */
299                 if (num_nonnulls == 0)
300                 {
301                         numArrayKeys = -1;
302                         break;
303                 }
304
305                 /*
306                  * If the comparison operator is not equality, then the array qual
307                  * degenerates to a simple comparison against the smallest or largest
308                  * non-null array element, as appropriate.
309                  */
310                 switch (cur->sk_strategy)
311                 {
312                         case BTLessStrategyNumber:
313                         case BTLessEqualStrategyNumber:
314                                 cur->sk_argument =
315                                         _bt_find_extreme_element(scan, cur,
316                                                                                          BTGreaterStrategyNumber,
317                                                                                          elem_values, num_nonnulls);
318                                 continue;
319                         case BTEqualStrategyNumber:
320                                 /* proceed with rest of loop */
321                                 break;
322                         case BTGreaterEqualStrategyNumber:
323                         case BTGreaterStrategyNumber:
324                                 cur->sk_argument =
325                                         _bt_find_extreme_element(scan, cur,
326                                                                                          BTLessStrategyNumber,
327                                                                                          elem_values, num_nonnulls);
328                                 continue;
329                         default:
330                                 elog(ERROR, "unrecognized StrategyNumber: %d",
331                                          (int) cur->sk_strategy);
332                                 break;
333                 }
334
335                 /*
336                  * Sort the non-null elements and eliminate any duplicates.  We must
337                  * sort in the same ordering used by the index column, so that the
338                  * successive primitive indexscans produce data in index order.
339                  */
340                 num_elems = _bt_sort_array_elements(scan, cur,
341                                                                                         (indoption[cur->sk_attno - 1] & INDOPTION_DESC) != 0,
342                                                                                         elem_values, num_nonnulls);
343
344                 /*
345                  * And set up the BTArrayKeyInfo data.
346                  */
347                 so->arrayKeys[numArrayKeys].scan_key = i;
348                 so->arrayKeys[numArrayKeys].num_elems = num_elems;
349                 so->arrayKeys[numArrayKeys].elem_values = elem_values;
350                 numArrayKeys++;
351         }
352
353         so->numArrayKeys = numArrayKeys;
354
355         MemoryContextSwitchTo(oldContext);
356 }
357
358 /*
359  * _bt_find_extreme_element() -- get least or greatest array element
360  *
361  * scan and skey identify the index column, whose opfamily determines the
362  * comparison semantics.  strat should be BTLessStrategyNumber to get the
363  * least element, or BTGreaterStrategyNumber to get the greatest.
364  */
365 static Datum
366 _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
367                                                  StrategyNumber strat,
368                                                  Datum *elems, int nelems)
369 {
370         Relation        rel = scan->indexRelation;
371         Oid                     elemtype,
372                                 cmp_op;
373         RegProcedure cmp_proc;
374         FmgrInfo        flinfo;
375         Datum           result;
376         int                     i;
377
378         /*
379          * Determine the nominal datatype of the array elements.  We have to
380          * support the convention that sk_subtype == InvalidOid means the opclass
381          * input type; this is a hack to simplify life for ScanKeyInit().
382          */
383         elemtype = skey->sk_subtype;
384         if (elemtype == InvalidOid)
385                 elemtype = rel->rd_opcintype[skey->sk_attno - 1];
386
387         /*
388          * Look up the appropriate comparison operator in the opfamily.
389          *
390          * Note: it's possible that this would fail, if the opfamily is incomplete,
391          * but it seems quite unlikely that an opfamily would omit non-cross-type
392          * comparison operators for any datatype that it supports at all.
393          */
394         cmp_op = get_opfamily_member(rel->rd_opfamily[skey->sk_attno - 1],
395                                                                  elemtype,
396                                                                  elemtype,
397                                                                  strat);
398         if (!OidIsValid(cmp_op))
399                 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
400                          strat, elemtype, elemtype,
401                          rel->rd_opfamily[skey->sk_attno - 1]);
402         cmp_proc = get_opcode(cmp_op);
403         if (!RegProcedureIsValid(cmp_proc))
404                 elog(ERROR, "missing oprcode for operator %u", cmp_op);
405
406         fmgr_info(cmp_proc, &flinfo);
407
408         Assert(nelems > 0);
409         result = elems[0];
410         for (i = 1; i < nelems; i++)
411         {
412                 if (DatumGetBool(FunctionCall2Coll(&flinfo,
413                                                                                    skey->sk_collation,
414                                                                                    elems[i],
415                                                                                    result)))
416                         result = elems[i];
417         }
418
419         return result;
420 }
421
422 /*
423  * _bt_sort_array_elements() -- sort and de-dup array elements
424  *
425  * The array elements are sorted in-place, and the new number of elements
426  * after duplicate removal is returned.
427  *
428  * scan and skey identify the index column, whose opfamily determines the
429  * comparison semantics.  If reverse is true, we sort in descending order.
430  */
431 static int
432 _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
433                                                 bool reverse,
434                                                 Datum *elems, int nelems)
435 {
436         Relation        rel = scan->indexRelation;
437         Oid                     elemtype;
438         RegProcedure cmp_proc;
439         BTSortArrayContext cxt;
440         int                     last_non_dup;
441         int                     i;
442
443         if (nelems <= 1)
444                 return nelems;                  /* no work to do */
445
446         /*
447          * Determine the nominal datatype of the array elements.  We have to
448          * support the convention that sk_subtype == InvalidOid means the opclass
449          * input type; this is a hack to simplify life for ScanKeyInit().
450          */
451         elemtype = skey->sk_subtype;
452         if (elemtype == InvalidOid)
453                 elemtype = rel->rd_opcintype[skey->sk_attno - 1];
454
455         /*
456          * Look up the appropriate comparison function in the opfamily.
457          *
458          * Note: it's possible that this would fail, if the opfamily is incomplete,
459          * but it seems quite unlikely that an opfamily would omit non-cross-type
460          * support functions for any datatype that it supports at all.
461          */
462         cmp_proc = get_opfamily_proc(rel->rd_opfamily[skey->sk_attno - 1],
463                                                                  elemtype,
464                                                                  elemtype,
465                                                                  BTORDER_PROC);
466         if (!RegProcedureIsValid(cmp_proc))
467                 elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
468                          BTORDER_PROC, elemtype, elemtype,
469                          rel->rd_opfamily[skey->sk_attno - 1]);
470
471         /* Sort the array elements */
472         fmgr_info(cmp_proc, &cxt.flinfo);
473         cxt.collation = skey->sk_collation;
474         cxt.reverse = reverse;
475         qsort_arg((void *) elems, nelems, sizeof(Datum),
476                           _bt_compare_array_elements, (void *) &cxt);
477
478         /* Now scan the sorted elements and remove duplicates */
479         last_non_dup = 0;
480         for (i = 1; i < nelems; i++)
481         {
482                 int32           compare;
483
484                 compare = DatumGetInt32(FunctionCall2Coll(&cxt.flinfo,
485                                                                                                   cxt.collation,
486                                                                                                   elems[last_non_dup],
487                                                                                                   elems[i]));
488                 if (compare != 0)
489                         elems[++last_non_dup] = elems[i];
490         }
491
492         return last_non_dup + 1;
493 }
494
495 /*
496  * qsort_arg comparator for sorting array elements
497  */
498 static int
499 _bt_compare_array_elements(const void *a, const void *b, void *arg)
500 {
501         Datum           da = *((const Datum *) a);
502         Datum           db = *((const Datum *) b);
503         BTSortArrayContext *cxt = (BTSortArrayContext *) arg;
504         int32           compare;
505
506         compare = DatumGetInt32(FunctionCall2Coll(&cxt->flinfo,
507                                                                                           cxt->collation,
508                                                                                           da, db));
509         if (cxt->reverse)
510                 compare = -compare;
511         return compare;
512 }
513
514 /*
515  * _bt_start_array_keys() -- Initialize array keys at start of a scan
516  *
517  * Set up the cur_elem counters and fill in the first sk_argument value for
518  * each array scankey.  We can't do this until we know the scan direction.
519  */
520 void
521 _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir)
522 {
523         BTScanOpaque so = (BTScanOpaque) scan->opaque;
524         int                     i;
525
526         for (i = 0; i < so->numArrayKeys; i++)
527         {
528                 BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
529                 ScanKey         skey = &so->arrayKeyData[curArrayKey->scan_key];
530
531                 Assert(curArrayKey->num_elems > 0);
532                 if (ScanDirectionIsBackward(dir))
533                         curArrayKey->cur_elem = curArrayKey->num_elems - 1;
534                 else
535                         curArrayKey->cur_elem = 0;
536                 skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem];
537         }
538 }
539
540 /*
541  * _bt_advance_array_keys() -- Advance to next set of array elements
542  *
543  * Returns TRUE if there is another set of values to consider, FALSE if not.
544  * On TRUE result, the scankeys are initialized with the next set of values.
545  */
546 bool
547 _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
548 {
549         BTScanOpaque so = (BTScanOpaque) scan->opaque;
550         bool            found = false;
551         int                     i;
552
553         /*
554          * We must advance the last array key most quickly, since it will
555          * correspond to the lowest-order index column among the available
556          * qualifications. This is necessary to ensure correct ordering of output
557          * when there are multiple array keys.
558          */
559         for (i = so->numArrayKeys - 1; i >= 0; i--)
560         {
561                 BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
562                 ScanKey         skey = &so->arrayKeyData[curArrayKey->scan_key];
563                 int                     cur_elem = curArrayKey->cur_elem;
564                 int                     num_elems = curArrayKey->num_elems;
565
566                 if (ScanDirectionIsBackward(dir))
567                 {
568                         if (--cur_elem < 0)
569                         {
570                                 cur_elem = num_elems - 1;
571                                 found = false;  /* need to advance next array key */
572                         }
573                         else
574                                 found = true;
575                 }
576                 else
577                 {
578                         if (++cur_elem >= num_elems)
579                         {
580                                 cur_elem = 0;
581                                 found = false;  /* need to advance next array key */
582                         }
583                         else
584                                 found = true;
585                 }
586
587                 curArrayKey->cur_elem = cur_elem;
588                 skey->sk_argument = curArrayKey->elem_values[cur_elem];
589                 if (found)
590                         break;
591         }
592
593         return found;
594 }
595
596
597 /*
598  *      _bt_preprocess_keys() -- Preprocess scan keys
599  *
600  * The given search-type keys (in scan->keyData[] or so->arrayKeyData[])
601  * are copied to so->keyData[] with possible transformation.
602  * scan->numberOfKeys is the number of input keys, so->numberOfKeys gets
603  * the number of output keys (possibly less, never greater).
604  *
605  * The output keys are marked with additional sk_flag bits beyond the
606  * system-standard bits supplied by the caller.  The DESC and NULLS_FIRST
607  * indoption bits for the relevant index attribute are copied into the flags.
608  * Also, for a DESC column, we commute (flip) all the sk_strategy numbers
609  * so that the index sorts in the desired direction.
610  *
611  * One key purpose of this routine is to discover how many scan keys
612  * must be satisfied to continue the scan.      It also attempts to eliminate
613  * redundant keys and detect contradictory keys.  (If the index opfamily
614  * provides incomplete sets of cross-type operators, we may fail to detect
615  * redundant or contradictory keys, but we can survive that.)
616  *
617  * The output keys must be sorted by index attribute.  Presently we expect
618  * (but verify) that the input keys are already so sorted --- this is done
619  * by group_clauses_by_indexkey() in indxpath.c.  Some reordering of the keys
620  * within each attribute may be done as a byproduct of the processing here,
621  * but no other code depends on that.
622  *
623  * The output keys are marked with flags SK_BT_REQFWD and/or SK_BT_REQBKWD
624  * if they must be satisfied in order to continue the scan forward or backward
625  * respectively.  _bt_checkkeys uses these flags.  For example, if the quals
626  * are "x = 1 AND y < 4 AND z < 5", then _bt_checkkeys will reject a tuple
627  * (1,2,7), but we must continue the scan in case there are tuples (1,3,z).
628  * But once we reach tuples like (1,4,z) we can stop scanning because no
629  * later tuples could match.  This is reflected by marking the x and y keys,
630  * but not the z key, with SK_BT_REQFWD.  In general, the keys for leading
631  * attributes with "=" keys are marked both SK_BT_REQFWD and SK_BT_REQBKWD.
632  * For the first attribute without an "=" key, any "<" and "<=" keys are
633  * marked SK_BT_REQFWD while any ">" and ">=" keys are marked SK_BT_REQBKWD.
634  * This can be seen to be correct by considering the above example.  Note
635  * in particular that if there are no keys for a given attribute, the keys for
636  * subsequent attributes can never be required; for instance "WHERE y = 4"
637  * requires a full-index scan.
638  *
639  * If possible, redundant keys are eliminated: we keep only the tightest
640  * >/>= bound and the tightest </<= bound, and if there's an = key then
641  * that's the only one returned.  (So, we return either a single = key,
642  * or one or two boundary-condition keys for each attr.)  However, if we
643  * cannot compare two keys for lack of a suitable cross-type operator,
644  * we cannot eliminate either.  If there are two such keys of the same
645  * operator strategy, the second one is just pushed into the output array
646  * without further processing here.  We may also emit both >/>= or both
647  * </<= keys if we can't compare them.  The logic about required keys still
648  * works if we don't eliminate redundant keys.
649  *
650  * As a byproduct of this work, we can detect contradictory quals such
651  * as "x = 1 AND x > 2".  If we see that, we return so->qual_ok = FALSE,
652  * indicating the scan need not be run at all since no tuples can match.
653  * (In this case we do not bother completing the output key array!)
654  * Again, missing cross-type operators might cause us to fail to prove the
655  * quals contradictory when they really are, but the scan will work correctly.
656  *
657  * Row comparison keys are currently also treated without any smarts:
658  * we just transfer them into the preprocessed array without any
659  * editorialization.  We can treat them the same as an ordinary inequality
660  * comparison on the row's first index column, for the purposes of the logic
661  * about required keys.
662  *
663  * Note: the reason we have to copy the preprocessed scan keys into private
664  * storage is that we are modifying the array based on comparisons of the
665  * key argument values, which could change on a rescan or after moving to
666  * new elements of array keys.  Therefore we can't overwrite the source data.
667  */
668 void
669 _bt_preprocess_keys(IndexScanDesc scan)
670 {
671         BTScanOpaque so = (BTScanOpaque) scan->opaque;
672         int                     numberOfKeys = scan->numberOfKeys;
673         int16      *indoption = scan->indexRelation->rd_indoption;
674         int                     new_numberOfKeys;
675         int                     numberOfEqualCols;
676         ScanKey         inkeys;
677         ScanKey         outkeys;
678         ScanKey         cur;
679         ScanKey         xform[BTMaxStrategyNumber];
680         bool            test_result;
681         int                     i,
682                                 j;
683         AttrNumber      attno;
684
685         /* initialize result variables */
686         so->qual_ok = true;
687         so->numberOfKeys = 0;
688
689         if (numberOfKeys < 1)
690                 return;                                 /* done if qual-less scan */
691
692         /*
693          * Read so->arrayKeyData if array keys are present, else scan->keyData
694          */
695         if (so->arrayKeyData != NULL)
696                 inkeys = so->arrayKeyData;
697         else
698                 inkeys = scan->keyData;
699
700         outkeys = so->keyData;
701         cur = &inkeys[0];
702         /* we check that input keys are correctly ordered */
703         if (cur->sk_attno < 1)
704                 elog(ERROR, "btree index keys must be ordered by attribute");
705
706         /* We can short-circuit most of the work if there's just one key */
707         if (numberOfKeys == 1)
708         {
709                 /* Apply indoption to scankey (might change sk_strategy!) */
710                 if (!_bt_fix_scankey_strategy(cur, indoption))
711                         so->qual_ok = false;
712                 memcpy(outkeys, cur, sizeof(ScanKeyData));
713                 so->numberOfKeys = 1;
714                 /* We can mark the qual as required if it's for first index col */
715                 if (cur->sk_attno == 1)
716                         _bt_mark_scankey_required(outkeys);
717                 return;
718         }
719
720         /*
721          * Otherwise, do the full set of pushups.
722          */
723         new_numberOfKeys = 0;
724         numberOfEqualCols = 0;
725
726         /*
727          * Initialize for processing of keys for attr 1.
728          *
729          * xform[i] points to the currently best scan key of strategy type i+1; it
730          * is NULL if we haven't yet found such a key for this attr.
731          */
732         attno = 1;
733         memset(xform, 0, sizeof(xform));
734
735         /*
736          * Loop iterates from 0 to numberOfKeys inclusive; we use the last pass to
737          * handle after-last-key processing.  Actual exit from the loop is at the
738          * "break" statement below.
739          */
740         for (i = 0;; cur++, i++)
741         {
742                 if (i < numberOfKeys)
743                 {
744                         /* Apply indoption to scankey (might change sk_strategy!) */
745                         if (!_bt_fix_scankey_strategy(cur, indoption))
746                         {
747                                 /* NULL can't be matched, so give up */
748                                 so->qual_ok = false;
749                                 return;
750                         }
751                 }
752
753                 /*
754                  * If we are at the end of the keys for a particular attr, finish up
755                  * processing and emit the cleaned-up keys.
756                  */
757                 if (i == numberOfKeys || cur->sk_attno != attno)
758                 {
759                         int                     priorNumberOfEqualCols = numberOfEqualCols;
760
761                         /* check input keys are correctly ordered */
762                         if (i < numberOfKeys && cur->sk_attno < attno)
763                                 elog(ERROR, "btree index keys must be ordered by attribute");
764
765                         /*
766                          * If = has been specified, all other keys can be eliminated as
767                          * redundant.  If we have a case like key = 1 AND key > 2, we can
768                          * set qual_ok to false and abandon further processing.
769                          *
770                          * We also have to deal with the case of "key IS NULL", which is
771                          * unsatisfiable in combination with any other index condition.
772                          * By the time we get here, that's been classified as an equality
773                          * check, and we've rejected any combination of it with a regular
774                          * equality condition; but not with other types of conditions.
775                          */
776                         if (xform[BTEqualStrategyNumber - 1])
777                         {
778                                 ScanKey         eq = xform[BTEqualStrategyNumber - 1];
779
780                                 for (j = BTMaxStrategyNumber; --j >= 0;)
781                                 {
782                                         ScanKey         chk = xform[j];
783
784                                         if (!chk || j == (BTEqualStrategyNumber - 1))
785                                                 continue;
786
787                                         if (eq->sk_flags & SK_SEARCHNULL)
788                                         {
789                                                 /* IS NULL is contradictory to anything else */
790                                                 so->qual_ok = false;
791                                                 return;
792                                         }
793
794                                         if (_bt_compare_scankey_args(scan, chk, eq, chk,
795                                                                                                  &test_result))
796                                         {
797                                                 if (!test_result)
798                                                 {
799                                                         /* keys proven mutually contradictory */
800                                                         so->qual_ok = false;
801                                                         return;
802                                                 }
803                                                 /* else discard the redundant non-equality key */
804                                                 xform[j] = NULL;
805                                         }
806                                         /* else, cannot determine redundancy, keep both keys */
807                                 }
808                                 /* track number of attrs for which we have "=" keys */
809                                 numberOfEqualCols++;
810                         }
811
812                         /* try to keep only one of <, <= */
813                         if (xform[BTLessStrategyNumber - 1]
814                                 && xform[BTLessEqualStrategyNumber - 1])
815                         {
816                                 ScanKey         lt = xform[BTLessStrategyNumber - 1];
817                                 ScanKey         le = xform[BTLessEqualStrategyNumber - 1];
818
819                                 if (_bt_compare_scankey_args(scan, le, lt, le,
820                                                                                          &test_result))
821                                 {
822                                         if (test_result)
823                                                 xform[BTLessEqualStrategyNumber - 1] = NULL;
824                                         else
825                                                 xform[BTLessStrategyNumber - 1] = NULL;
826                                 }
827                         }
828
829                         /* try to keep only one of >, >= */
830                         if (xform[BTGreaterStrategyNumber - 1]
831                                 && xform[BTGreaterEqualStrategyNumber - 1])
832                         {
833                                 ScanKey         gt = xform[BTGreaterStrategyNumber - 1];
834                                 ScanKey         ge = xform[BTGreaterEqualStrategyNumber - 1];
835
836                                 if (_bt_compare_scankey_args(scan, ge, gt, ge,
837                                                                                          &test_result))
838                                 {
839                                         if (test_result)
840                                                 xform[BTGreaterEqualStrategyNumber - 1] = NULL;
841                                         else
842                                                 xform[BTGreaterStrategyNumber - 1] = NULL;
843                                 }
844                         }
845
846                         /*
847                          * Emit the cleaned-up keys into the outkeys[] array, and then
848                          * mark them if they are required.      They are required (possibly
849                          * only in one direction) if all attrs before this one had "=".
850                          */
851                         for (j = BTMaxStrategyNumber; --j >= 0;)
852                         {
853                                 if (xform[j])
854                                 {
855                                         ScanKey         outkey = &outkeys[new_numberOfKeys++];
856
857                                         memcpy(outkey, xform[j], sizeof(ScanKeyData));
858                                         if (priorNumberOfEqualCols == attno - 1)
859                                                 _bt_mark_scankey_required(outkey);
860                                 }
861                         }
862
863                         /*
864                          * Exit loop here if done.
865                          */
866                         if (i == numberOfKeys)
867                                 break;
868
869                         /* Re-initialize for new attno */
870                         attno = cur->sk_attno;
871                         memset(xform, 0, sizeof(xform));
872                 }
873
874                 /* check strategy this key's operator corresponds to */
875                 j = cur->sk_strategy - 1;
876
877                 /* if row comparison, push it directly to the output array */
878                 if (cur->sk_flags & SK_ROW_HEADER)
879                 {
880                         ScanKey         outkey = &outkeys[new_numberOfKeys++];
881
882                         memcpy(outkey, cur, sizeof(ScanKeyData));
883                         if (numberOfEqualCols == attno - 1)
884                                 _bt_mark_scankey_required(outkey);
885
886                         /*
887                          * We don't support RowCompare using equality; such a qual would
888                          * mess up the numberOfEqualCols tracking.
889                          */
890                         Assert(j != (BTEqualStrategyNumber - 1));
891                         continue;
892                 }
893
894                 /* have we seen one of these before? */
895                 if (xform[j] == NULL)
896                 {
897                         /* nope, so remember this scankey */
898                         xform[j] = cur;
899                 }
900                 else
901                 {
902                         /* yup, keep only the more restrictive key */
903                         if (_bt_compare_scankey_args(scan, cur, cur, xform[j],
904                                                                                  &test_result))
905                         {
906                                 if (test_result)
907                                         xform[j] = cur;
908                                 else if (j == (BTEqualStrategyNumber - 1))
909                                 {
910                                         /* key == a && key == b, but a != b */
911                                         so->qual_ok = false;
912                                         return;
913                                 }
914                                 /* else old key is more restrictive, keep it */
915                         }
916                         else
917                         {
918                                 /*
919                                  * We can't determine which key is more restrictive.  Keep the
920                                  * previous one in xform[j] and push this one directly to the
921                                  * output array.
922                                  */
923                                 ScanKey         outkey = &outkeys[new_numberOfKeys++];
924
925                                 memcpy(outkey, cur, sizeof(ScanKeyData));
926                                 if (numberOfEqualCols == attno - 1)
927                                         _bt_mark_scankey_required(outkey);
928                         }
929                 }
930         }
931
932         so->numberOfKeys = new_numberOfKeys;
933 }
934
935 /*
936  * Compare two scankey values using a specified operator.
937  *
938  * The test we want to perform is logically "leftarg op rightarg", where
939  * leftarg and rightarg are the sk_argument values in those ScanKeys, and
940  * the comparison operator is the one in the op ScanKey.  However, in
941  * cross-data-type situations we may need to look up the correct operator in
942  * the index's opfamily: it is the one having amopstrategy = op->sk_strategy
943  * and amoplefttype/amoprighttype equal to the two argument datatypes.
944  *
945  * If the opfamily doesn't supply a complete set of cross-type operators we
946  * may not be able to make the comparison.      If we can make the comparison
947  * we store the operator result in *result and return TRUE.  We return FALSE
948  * if the comparison could not be made.
949  *
950  * Note: op always points at the same ScanKey as either leftarg or rightarg.
951  * Since we don't scribble on the scankeys, this aliasing should cause no
952  * trouble.
953  *
954  * Note: this routine needs to be insensitive to any DESC option applied
955  * to the index column.  For example, "x < 4" is a tighter constraint than
956  * "x < 5" regardless of which way the index is sorted.
957  */
958 static bool
959 _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
960                                                  ScanKey leftarg, ScanKey rightarg,
961                                                  bool *result)
962 {
963         Relation        rel = scan->indexRelation;
964         Oid                     lefttype,
965                                 righttype,
966                                 optype,
967                                 opcintype,
968                                 cmp_op;
969         StrategyNumber strat;
970
971         /*
972          * First, deal with cases where one or both args are NULL.      This should
973          * only happen when the scankeys represent IS NULL/NOT NULL conditions.
974          */
975         if ((leftarg->sk_flags | rightarg->sk_flags) & SK_ISNULL)
976         {
977                 bool            leftnull,
978                                         rightnull;
979
980                 if (leftarg->sk_flags & SK_ISNULL)
981                 {
982                         Assert(leftarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
983                         leftnull = true;
984                 }
985                 else
986                         leftnull = false;
987                 if (rightarg->sk_flags & SK_ISNULL)
988                 {
989                         Assert(rightarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
990                         rightnull = true;
991                 }
992                 else
993                         rightnull = false;
994
995                 /*
996                  * We treat NULL as either greater than or less than all other values.
997                  * Since true > false, the tests below work correctly for NULLS LAST
998                  * logic.  If the index is NULLS FIRST, we need to flip the strategy.
999                  */
1000                 strat = op->sk_strategy;
1001                 if (op->sk_flags & SK_BT_NULLS_FIRST)
1002                         strat = BTCommuteStrategyNumber(strat);
1003
1004                 switch (strat)
1005                 {
1006                         case BTLessStrategyNumber:
1007                                 *result = (leftnull < rightnull);
1008                                 break;
1009                         case BTLessEqualStrategyNumber:
1010                                 *result = (leftnull <= rightnull);
1011                                 break;
1012                         case BTEqualStrategyNumber:
1013                                 *result = (leftnull == rightnull);
1014                                 break;
1015                         case BTGreaterEqualStrategyNumber:
1016                                 *result = (leftnull >= rightnull);
1017                                 break;
1018                         case BTGreaterStrategyNumber:
1019                                 *result = (leftnull > rightnull);
1020                                 break;
1021                         default:
1022                                 elog(ERROR, "unrecognized StrategyNumber: %d", (int) strat);
1023                                 *result = false;        /* keep compiler quiet */
1024                                 break;
1025                 }
1026                 return true;
1027         }
1028
1029         /*
1030          * The opfamily we need to worry about is identified by the index column.
1031          */
1032         Assert(leftarg->sk_attno == rightarg->sk_attno);
1033
1034         opcintype = rel->rd_opcintype[leftarg->sk_attno - 1];
1035
1036         /*
1037          * Determine the actual datatypes of the ScanKey arguments.  We have to
1038          * support the convention that sk_subtype == InvalidOid means the opclass
1039          * input type; this is a hack to simplify life for ScanKeyInit().
1040          */
1041         lefttype = leftarg->sk_subtype;
1042         if (lefttype == InvalidOid)
1043                 lefttype = opcintype;
1044         righttype = rightarg->sk_subtype;
1045         if (righttype == InvalidOid)
1046                 righttype = opcintype;
1047         optype = op->sk_subtype;
1048         if (optype == InvalidOid)
1049                 optype = opcintype;
1050
1051         /*
1052          * If leftarg and rightarg match the types expected for the "op" scankey,
1053          * we can use its already-looked-up comparison function.
1054          */
1055         if (lefttype == opcintype && righttype == optype)
1056         {
1057                 *result = DatumGetBool(FunctionCall2Coll(&op->sk_func,
1058                                                                                                  op->sk_collation,
1059                                                                                                  leftarg->sk_argument,
1060                                                                                                  rightarg->sk_argument));
1061                 return true;
1062         }
1063
1064         /*
1065          * Otherwise, we need to go to the syscache to find the appropriate
1066          * operator.  (This cannot result in infinite recursion, since no
1067          * indexscan initiated by syscache lookup will use cross-data-type
1068          * operators.)
1069          *
1070          * If the sk_strategy was flipped by _bt_fix_scankey_strategy, we have to
1071          * un-flip it to get the correct opfamily member.
1072          */
1073         strat = op->sk_strategy;
1074         if (op->sk_flags & SK_BT_DESC)
1075                 strat = BTCommuteStrategyNumber(strat);
1076
1077         cmp_op = get_opfamily_member(rel->rd_opfamily[leftarg->sk_attno - 1],
1078                                                                  lefttype,
1079                                                                  righttype,
1080                                                                  strat);
1081         if (OidIsValid(cmp_op))
1082         {
1083                 RegProcedure cmp_proc = get_opcode(cmp_op);
1084
1085                 if (RegProcedureIsValid(cmp_proc))
1086                 {
1087                         *result = DatumGetBool(OidFunctionCall2Coll(cmp_proc,
1088                                                                                                                 op->sk_collation,
1089                                                                                                                 leftarg->sk_argument,
1090                                                                                                          rightarg->sk_argument));
1091                         return true;
1092                 }
1093         }
1094
1095         /* Can't make the comparison */
1096         *result = false;                        /* suppress compiler warnings */
1097         return false;
1098 }
1099
1100 /*
1101  * Adjust a scankey's strategy and flags setting as needed for indoptions.
1102  *
1103  * We copy the appropriate indoption value into the scankey sk_flags
1104  * (shifting to avoid clobbering system-defined flag bits).  Also, if
1105  * the DESC option is set, commute (flip) the operator strategy number.
1106  *
1107  * A secondary purpose is to check for IS NULL/NOT NULL scankeys and set up
1108  * the strategy field correctly for them.
1109  *
1110  * Lastly, for ordinary scankeys (not IS NULL/NOT NULL), we check for a
1111  * NULL comparison value.  Since all btree operators are assumed strict,
1112  * a NULL means that the qual cannot be satisfied.      We return TRUE if the
1113  * comparison value isn't NULL, or FALSE if the scan should be abandoned.
1114  *
1115  * This function is applied to the *input* scankey structure; therefore
1116  * on a rescan we will be looking at already-processed scankeys.  Hence
1117  * we have to be careful not to re-commute the strategy if we already did it.
1118  * It's a bit ugly to modify the caller's copy of the scankey but in practice
1119  * there shouldn't be any problem, since the index's indoptions are certainly
1120  * not going to change while the scankey survives.
1121  */
1122 static bool
1123 _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption)
1124 {
1125         int                     addflags;
1126
1127         addflags = indoption[skey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
1128
1129         /*
1130          * We treat all btree operators as strict (even if they're not so marked
1131          * in pg_proc). This means that it is impossible for an operator condition
1132          * with a NULL comparison constant to succeed, and we can reject it right
1133          * away.
1134          *
1135          * However, we now also support "x IS NULL" clauses as search conditions,
1136          * so in that case keep going. The planner has not filled in any
1137          * particular strategy in this case, so set it to BTEqualStrategyNumber
1138          * --- we can treat IS NULL as an equality operator for purposes of search
1139          * strategy.
1140          *
1141          * Likewise, "x IS NOT NULL" is supported.      We treat that as either "less
1142          * than NULL" in a NULLS LAST index, or "greater than NULL" in a NULLS
1143          * FIRST index.
1144          *
1145          * Note: someday we might have to fill in sk_collation from the index
1146          * column's collation.  At the moment this is a non-issue because we'll
1147          * never actually call the comparison operator on a NULL.
1148          */
1149         if (skey->sk_flags & SK_ISNULL)
1150         {
1151                 /* SK_ISNULL shouldn't be set in a row header scankey */
1152                 Assert(!(skey->sk_flags & SK_ROW_HEADER));
1153
1154                 /* Set indoption flags in scankey (might be done already) */
1155                 skey->sk_flags |= addflags;
1156
1157                 /* Set correct strategy for IS NULL or NOT NULL search */
1158                 if (skey->sk_flags & SK_SEARCHNULL)
1159                 {
1160                         skey->sk_strategy = BTEqualStrategyNumber;
1161                         skey->sk_subtype = InvalidOid;
1162                         skey->sk_collation = InvalidOid;
1163                 }
1164                 else if (skey->sk_flags & SK_SEARCHNOTNULL)
1165                 {
1166                         if (skey->sk_flags & SK_BT_NULLS_FIRST)
1167                                 skey->sk_strategy = BTGreaterStrategyNumber;
1168                         else
1169                                 skey->sk_strategy = BTLessStrategyNumber;
1170                         skey->sk_subtype = InvalidOid;
1171                         skey->sk_collation = InvalidOid;
1172                 }
1173                 else
1174                 {
1175                         /* regular qual, so it cannot be satisfied */
1176                         return false;
1177                 }
1178
1179                 /* Needn't do the rest */
1180                 return true;
1181         }
1182
1183         /* Adjust strategy for DESC, if we didn't already */
1184         if ((addflags & SK_BT_DESC) && !(skey->sk_flags & SK_BT_DESC))
1185                 skey->sk_strategy = BTCommuteStrategyNumber(skey->sk_strategy);
1186         skey->sk_flags |= addflags;
1187
1188         /* If it's a row header, fix row member flags and strategies similarly */
1189         if (skey->sk_flags & SK_ROW_HEADER)
1190         {
1191                 ScanKey         subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1192
1193                 for (;;)
1194                 {
1195                         Assert(subkey->sk_flags & SK_ROW_MEMBER);
1196                         addflags = indoption[subkey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
1197                         if ((addflags & SK_BT_DESC) && !(subkey->sk_flags & SK_BT_DESC))
1198                                 subkey->sk_strategy = BTCommuteStrategyNumber(subkey->sk_strategy);
1199                         subkey->sk_flags |= addflags;
1200                         if (subkey->sk_flags & SK_ROW_END)
1201                                 break;
1202                         subkey++;
1203                 }
1204         }
1205
1206         return true;
1207 }
1208
1209 /*
1210  * Mark a scankey as "required to continue the scan".
1211  *
1212  * Depending on the operator type, the key may be required for both scan
1213  * directions or just one.      Also, if the key is a row comparison header,
1214  * we have to mark the appropriate subsidiary ScanKeys as required.  In
1215  * such cases, the first subsidiary key is required, but subsequent ones
1216  * are required only as long as they correspond to successive index columns
1217  * and match the leading column as to sort direction.
1218  * Otherwise the row comparison ordering is different from the index ordering
1219  * and so we can't stop the scan on the basis of those lower-order columns.
1220  *
1221  * Note: when we set required-key flag bits in a subsidiary scankey, we are
1222  * scribbling on a data structure belonging to the index AM's caller, not on
1223  * our private copy.  This should be OK because the marking will not change
1224  * from scan to scan within a query, and so we'd just re-mark the same way
1225  * anyway on a rescan.  Something to keep an eye on though.
1226  */
1227 static void
1228 _bt_mark_scankey_required(ScanKey skey)
1229 {
1230         int                     addflags;
1231
1232         switch (skey->sk_strategy)
1233         {
1234                 case BTLessStrategyNumber:
1235                 case BTLessEqualStrategyNumber:
1236                         addflags = SK_BT_REQFWD;
1237                         break;
1238                 case BTEqualStrategyNumber:
1239                         addflags = SK_BT_REQFWD | SK_BT_REQBKWD;
1240                         break;
1241                 case BTGreaterEqualStrategyNumber:
1242                 case BTGreaterStrategyNumber:
1243                         addflags = SK_BT_REQBKWD;
1244                         break;
1245                 default:
1246                         elog(ERROR, "unrecognized StrategyNumber: %d",
1247                                  (int) skey->sk_strategy);
1248                         addflags = 0;           /* keep compiler quiet */
1249                         break;
1250         }
1251
1252         skey->sk_flags |= addflags;
1253
1254         if (skey->sk_flags & SK_ROW_HEADER)
1255         {
1256                 ScanKey         subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1257                 AttrNumber      attno = skey->sk_attno;
1258
1259                 /* First subkey should be same as the header says */
1260                 Assert(subkey->sk_attno == attno);
1261
1262                 for (;;)
1263                 {
1264                         Assert(subkey->sk_flags & SK_ROW_MEMBER);
1265                         if (subkey->sk_attno != attno)
1266                                 break;                  /* non-adjacent key, so not required */
1267                         if (subkey->sk_strategy != skey->sk_strategy)
1268                                 break;                  /* wrong direction, so not required */
1269                         subkey->sk_flags |= addflags;
1270                         if (subkey->sk_flags & SK_ROW_END)
1271                                 break;
1272                         subkey++;
1273                         attno++;
1274                 }
1275         }
1276 }
1277
1278 /*
1279  * Test whether an indextuple satisfies all the scankey conditions.
1280  *
1281  * If so, return the address of the index tuple on the index page.
1282  * If not, return NULL.
1283  *
1284  * If the tuple fails to pass the qual, we also determine whether there's
1285  * any need to continue the scan beyond this tuple, and set *continuescan
1286  * accordingly.  See comments for _bt_preprocess_keys(), above, about how
1287  * this is done.
1288  *
1289  * scan: index scan descriptor (containing a search-type scankey)
1290  * page: buffer page containing index tuple
1291  * offnum: offset number of index tuple (must be a valid item!)
1292  * dir: direction we are scanning in
1293  * continuescan: output parameter (will be set correctly in all cases)
1294  *
1295  * Caller must hold pin and lock on the index page.
1296  */
1297 IndexTuple
1298 _bt_checkkeys(IndexScanDesc scan,
1299                           Page page, OffsetNumber offnum,
1300                           ScanDirection dir, bool *continuescan)
1301 {
1302         ItemId          iid = PageGetItemId(page, offnum);
1303         bool            tuple_alive;
1304         IndexTuple      tuple;
1305         TupleDesc       tupdesc;
1306         BTScanOpaque so;
1307         int                     keysz;
1308         int                     ikey;
1309         ScanKey         key;
1310
1311         *continuescan = true;           /* default assumption */
1312
1313         /*
1314          * If the scan specifies not to return killed tuples, then we treat a
1315          * killed tuple as not passing the qual.  Most of the time, it's a win to
1316          * not bother examining the tuple's index keys, but just return
1317          * immediately with continuescan = true to proceed to the next tuple.
1318          * However, if this is the last tuple on the page, we should check the
1319          * index keys to prevent uselessly advancing to the next page.
1320          */
1321         if (scan->ignore_killed_tuples && ItemIdIsDead(iid))
1322         {
1323                 /* return immediately if there are more tuples on the page */
1324                 if (ScanDirectionIsForward(dir))
1325                 {
1326                         if (offnum < PageGetMaxOffsetNumber(page))
1327                                 return NULL;
1328                 }
1329                 else
1330                 {
1331                         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1332
1333                         if (offnum > P_FIRSTDATAKEY(opaque))
1334                                 return NULL;
1335                 }
1336
1337                 /*
1338                  * OK, we want to check the keys so we can set continuescan correctly,
1339                  * but we'll return NULL even if the tuple passes the key tests.
1340                  */
1341                 tuple_alive = false;
1342         }
1343         else
1344                 tuple_alive = true;
1345
1346         tuple = (IndexTuple) PageGetItem(page, iid);
1347
1348         tupdesc = RelationGetDescr(scan->indexRelation);
1349         so = (BTScanOpaque) scan->opaque;
1350         keysz = so->numberOfKeys;
1351
1352         for (key = so->keyData, ikey = 0; ikey < keysz; key++, ikey++)
1353         {
1354                 Datum           datum;
1355                 bool            isNull;
1356                 Datum           test;
1357
1358                 /* row-comparison keys need special processing */
1359                 if (key->sk_flags & SK_ROW_HEADER)
1360                 {
1361                         if (_bt_check_rowcompare(key, tuple, tupdesc, dir, continuescan))
1362                                 continue;
1363                         return NULL;
1364                 }
1365
1366                 datum = index_getattr(tuple,
1367                                                           key->sk_attno,
1368                                                           tupdesc,
1369                                                           &isNull);
1370
1371                 if (key->sk_flags & SK_ISNULL)
1372                 {
1373                         /* Handle IS NULL/NOT NULL tests */
1374                         if (key->sk_flags & SK_SEARCHNULL)
1375                         {
1376                                 if (isNull)
1377                                         continue;       /* tuple satisfies this qual */
1378                         }
1379                         else
1380                         {
1381                                 Assert(key->sk_flags & SK_SEARCHNOTNULL);
1382                                 if (!isNull)
1383                                         continue;       /* tuple satisfies this qual */
1384                         }
1385
1386                         /*
1387                          * Tuple fails this qual.  If it's a required qual for the current
1388                          * scan direction, then we can conclude no further tuples will
1389                          * pass, either.
1390                          */
1391                         if ((key->sk_flags & SK_BT_REQFWD) &&
1392                                 ScanDirectionIsForward(dir))
1393                                 *continuescan = false;
1394                         else if ((key->sk_flags & SK_BT_REQBKWD) &&
1395                                          ScanDirectionIsBackward(dir))
1396                                 *continuescan = false;
1397
1398                         /*
1399                          * In any case, this indextuple doesn't match the qual.
1400                          */
1401                         return NULL;
1402                 }
1403
1404                 if (isNull)
1405                 {
1406                         if (key->sk_flags & SK_BT_NULLS_FIRST)
1407                         {
1408                                 /*
1409                                  * Since NULLs are sorted before non-NULLs, we know we have
1410                                  * reached the lower limit of the range of values for this
1411                                  * index attr.  On a backward scan, we can stop if this qual
1412                                  * is one of the "must match" subset.  On a forward scan,
1413                                  * however, we should keep going.
1414                                  */
1415                                 if ((key->sk_flags & SK_BT_REQBKWD) &&
1416                                         ScanDirectionIsBackward(dir))
1417                                         *continuescan = false;
1418                         }
1419                         else
1420                         {
1421                                 /*
1422                                  * Since NULLs are sorted after non-NULLs, we know we have
1423                                  * reached the upper limit of the range of values for this
1424                                  * index attr.  On a forward scan, we can stop if this qual is
1425                                  * one of the "must match" subset.      On a backward scan,
1426                                  * however, we should keep going.
1427                                  */
1428                                 if ((key->sk_flags & SK_BT_REQFWD) &&
1429                                         ScanDirectionIsForward(dir))
1430                                         *continuescan = false;
1431                         }
1432
1433                         /*
1434                          * In any case, this indextuple doesn't match the qual.
1435                          */
1436                         return NULL;
1437                 }
1438
1439                 test = FunctionCall2Coll(&key->sk_func, key->sk_collation,
1440                                                                  datum, key->sk_argument);
1441
1442                 if (!DatumGetBool(test))
1443                 {
1444                         /*
1445                          * Tuple fails this qual.  If it's a required qual for the current
1446                          * scan direction, then we can conclude no further tuples will
1447                          * pass, either.
1448                          *
1449                          * Note: because we stop the scan as soon as any required equality
1450                          * qual fails, it is critical that equality quals be used for the
1451                          * initial positioning in _bt_first() when they are available. See
1452                          * comments in _bt_first().
1453                          */
1454                         if ((key->sk_flags & SK_BT_REQFWD) &&
1455                                 ScanDirectionIsForward(dir))
1456                                 *continuescan = false;
1457                         else if ((key->sk_flags & SK_BT_REQBKWD) &&
1458                                          ScanDirectionIsBackward(dir))
1459                                 *continuescan = false;
1460
1461                         /*
1462                          * In any case, this indextuple doesn't match the qual.
1463                          */
1464                         return NULL;
1465                 }
1466         }
1467
1468         /* Check for failure due to it being a killed tuple. */
1469         if (!tuple_alive)
1470                 return NULL;
1471
1472         /* If we get here, the tuple passes all index quals. */
1473         return tuple;
1474 }
1475
1476 /*
1477  * Test whether an indextuple satisfies a row-comparison scan condition.
1478  *
1479  * Return true if so, false if not.  If not, also clear *continuescan if
1480  * it's not possible for any future tuples in the current scan direction
1481  * to pass the qual.
1482  *
1483  * This is a subroutine for _bt_checkkeys, which see for more info.
1484  */
1485 static bool
1486 _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc,
1487                                          ScanDirection dir, bool *continuescan)
1488 {
1489         ScanKey         subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1490         int32           cmpresult = 0;
1491         bool            result;
1492
1493         /* First subkey should be same as the header says */
1494         Assert(subkey->sk_attno == skey->sk_attno);
1495
1496         /* Loop over columns of the row condition */
1497         for (;;)
1498         {
1499                 Datum           datum;
1500                 bool            isNull;
1501
1502                 Assert(subkey->sk_flags & SK_ROW_MEMBER);
1503
1504                 datum = index_getattr(tuple,
1505                                                           subkey->sk_attno,
1506                                                           tupdesc,
1507                                                           &isNull);
1508
1509                 if (isNull)
1510                 {
1511                         if (subkey->sk_flags & SK_BT_NULLS_FIRST)
1512                         {
1513                                 /*
1514                                  * Since NULLs are sorted before non-NULLs, we know we have
1515                                  * reached the lower limit of the range of values for this
1516                                  * index attr. On a backward scan, we can stop if this qual is
1517                                  * one of the "must match" subset.      On a forward scan,
1518                                  * however, we should keep going.
1519                                  */
1520                                 if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1521                                         ScanDirectionIsBackward(dir))
1522                                         *continuescan = false;
1523                         }
1524                         else
1525                         {
1526                                 /*
1527                                  * Since NULLs are sorted after non-NULLs, we know we have
1528                                  * reached the upper limit of the range of values for this
1529                                  * index attr. On a forward scan, we can stop if this qual is
1530                                  * one of the "must match" subset.      On a backward scan,
1531                                  * however, we should keep going.
1532                                  */
1533                                 if ((subkey->sk_flags & SK_BT_REQFWD) &&
1534                                         ScanDirectionIsForward(dir))
1535                                         *continuescan = false;
1536                         }
1537
1538                         /*
1539                          * In any case, this indextuple doesn't match the qual.
1540                          */
1541                         return false;
1542                 }
1543
1544                 if (subkey->sk_flags & SK_ISNULL)
1545                 {
1546                         /*
1547                          * Unlike the simple-scankey case, this isn't a disallowed case.
1548                          * But it can never match.      If all the earlier row comparison
1549                          * columns are required for the scan direction, we can stop the
1550                          * scan, because there can't be another tuple that will succeed.
1551                          */
1552                         if (subkey != (ScanKey) DatumGetPointer(skey->sk_argument))
1553                                 subkey--;
1554                         if ((subkey->sk_flags & SK_BT_REQFWD) &&
1555                                 ScanDirectionIsForward(dir))
1556                                 *continuescan = false;
1557                         else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1558                                          ScanDirectionIsBackward(dir))
1559                                 *continuescan = false;
1560                         return false;
1561                 }
1562
1563                 /* Perform the test --- three-way comparison not bool operator */
1564                 cmpresult = DatumGetInt32(FunctionCall2Coll(&subkey->sk_func,
1565                                                                                                         subkey->sk_collation,
1566                                                                                                         datum,
1567                                                                                                         subkey->sk_argument));
1568
1569                 if (subkey->sk_flags & SK_BT_DESC)
1570                         cmpresult = -cmpresult;
1571
1572                 /* Done comparing if unequal, else advance to next column */
1573                 if (cmpresult != 0)
1574                         break;
1575
1576                 if (subkey->sk_flags & SK_ROW_END)
1577                         break;
1578                 subkey++;
1579         }
1580
1581         /*
1582          * At this point cmpresult indicates the overall result of the row
1583          * comparison, and subkey points to the deciding column (or the last
1584          * column if the result is "=").
1585          */
1586         switch (subkey->sk_strategy)
1587         {
1588                         /* EQ and NE cases aren't allowed here */
1589                 case BTLessStrategyNumber:
1590                         result = (cmpresult < 0);
1591                         break;
1592                 case BTLessEqualStrategyNumber:
1593                         result = (cmpresult <= 0);
1594                         break;
1595                 case BTGreaterEqualStrategyNumber:
1596                         result = (cmpresult >= 0);
1597                         break;
1598                 case BTGreaterStrategyNumber:
1599                         result = (cmpresult > 0);
1600                         break;
1601                 default:
1602                         elog(ERROR, "unrecognized RowCompareType: %d",
1603                                  (int) subkey->sk_strategy);
1604                         result = 0;                     /* keep compiler quiet */
1605                         break;
1606         }
1607
1608         if (!result)
1609         {
1610                 /*
1611                  * Tuple fails this qual.  If it's a required qual for the current
1612                  * scan direction, then we can conclude no further tuples will pass,
1613                  * either.      Note we have to look at the deciding column, not
1614                  * necessarily the first or last column of the row condition.
1615                  */
1616                 if ((subkey->sk_flags & SK_BT_REQFWD) &&
1617                         ScanDirectionIsForward(dir))
1618                         *continuescan = false;
1619                 else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1620                                  ScanDirectionIsBackward(dir))
1621                         *continuescan = false;
1622         }
1623
1624         return result;
1625 }
1626
1627 /*
1628  * _bt_killitems - set LP_DEAD state for items an indexscan caller has
1629  * told us were killed
1630  *
1631  * scan->so contains information about the current page and killed tuples
1632  * thereon (generally, this should only be called if so->numKilled > 0).
1633  *
1634  * The caller must have pin on so->currPos.buf, but may or may not have
1635  * read-lock, as indicated by haveLock.  Note that we assume read-lock
1636  * is sufficient for setting LP_DEAD status (which is only a hint).
1637  *
1638  * We match items by heap TID before assuming they are the right ones to
1639  * delete.      We cope with cases where items have moved right due to insertions.
1640  * If an item has moved off the current page due to a split, we'll fail to
1641  * find it and do nothing (this is not an error case --- we assume the item
1642  * will eventually get marked in a future indexscan).  Note that because we
1643  * hold pin on the target page continuously from initially reading the items
1644  * until applying this function, VACUUM cannot have deleted any items from
1645  * the page, and so there is no need to search left from the recorded offset.
1646  * (This observation also guarantees that the item is still the right one
1647  * to delete, which might otherwise be questionable since heap TIDs can get
1648  * recycled.)
1649  */
1650 void
1651 _bt_killitems(IndexScanDesc scan, bool haveLock)
1652 {
1653         BTScanOpaque so = (BTScanOpaque) scan->opaque;
1654         Page            page;
1655         BTPageOpaque opaque;
1656         OffsetNumber minoff;
1657         OffsetNumber maxoff;
1658         int                     i;
1659         bool            killedsomething = false;
1660
1661         Assert(BufferIsValid(so->currPos.buf));
1662
1663         if (!haveLock)
1664                 LockBuffer(so->currPos.buf, BT_READ);
1665
1666         page = BufferGetPage(so->currPos.buf);
1667         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1668         minoff = P_FIRSTDATAKEY(opaque);
1669         maxoff = PageGetMaxOffsetNumber(page);
1670
1671         for (i = 0; i < so->numKilled; i++)
1672         {
1673                 int                     itemIndex = so->killedItems[i];
1674                 BTScanPosItem *kitem = &so->currPos.items[itemIndex];
1675                 OffsetNumber offnum = kitem->indexOffset;
1676
1677                 Assert(itemIndex >= so->currPos.firstItem &&
1678                            itemIndex <= so->currPos.lastItem);
1679                 if (offnum < minoff)
1680                         continue;                       /* pure paranoia */
1681                 while (offnum <= maxoff)
1682                 {
1683                         ItemId          iid = PageGetItemId(page, offnum);
1684                         IndexTuple      ituple = (IndexTuple) PageGetItem(page, iid);
1685
1686                         if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
1687                         {
1688                                 /* found the item */
1689                                 ItemIdMarkDead(iid);
1690                                 killedsomething = true;
1691                                 break;                  /* out of inner search loop */
1692                         }
1693                         offnum = OffsetNumberNext(offnum);
1694                 }
1695         }
1696
1697         /*
1698          * Since this can be redone later if needed, it's treated the same as a
1699          * commit-hint-bit status update for heap tuples: we mark the buffer dirty
1700          * but don't make a WAL log entry.
1701          *
1702          * Whenever we mark anything LP_DEAD, we also set the page's
1703          * BTP_HAS_GARBAGE flag, which is likewise just a hint.
1704          */
1705         if (killedsomething)
1706         {
1707                 opaque->btpo_flags |= BTP_HAS_GARBAGE;
1708                 SetBufferCommitInfoNeedsSave(so->currPos.buf);
1709         }
1710
1711         if (!haveLock)
1712                 LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
1713
1714         /*
1715          * Always reset the scan state, so we don't look for same items on other
1716          * pages.
1717          */
1718         so->numKilled = 0;
1719 }
1720
1721
1722 /*
1723  * The following routines manage a shared-memory area in which we track
1724  * assignment of "vacuum cycle IDs" to currently-active btree vacuuming
1725  * operations.  There is a single counter which increments each time we
1726  * start a vacuum to assign it a cycle ID.      Since multiple vacuums could
1727  * be active concurrently, we have to track the cycle ID for each active
1728  * vacuum; this requires at most MaxBackends entries (usually far fewer).
1729  * We assume at most one vacuum can be active for a given index.
1730  *
1731  * Access to the shared memory area is controlled by BtreeVacuumLock.
1732  * In principle we could use a separate lmgr locktag for each index,
1733  * but a single LWLock is much cheaper, and given the short time that
1734  * the lock is ever held, the concurrency hit should be minimal.
1735  */
1736
1737 typedef struct BTOneVacInfo
1738 {
1739         LockRelId       relid;                  /* global identifier of an index */
1740         BTCycleId       cycleid;                /* cycle ID for its active VACUUM */
1741 } BTOneVacInfo;
1742
1743 typedef struct BTVacInfo
1744 {
1745         BTCycleId       cycle_ctr;              /* cycle ID most recently assigned */
1746         int                     num_vacuums;    /* number of currently active VACUUMs */
1747         int                     max_vacuums;    /* allocated length of vacuums[] array */
1748         BTOneVacInfo vacuums[1];        /* VARIABLE LENGTH ARRAY */
1749 } BTVacInfo;
1750
1751 static BTVacInfo *btvacinfo;
1752
1753
1754 /*
1755  * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index,
1756  *              or zero if there is no active VACUUM
1757  *
1758  * Note: for correct interlocking, the caller must already hold pin and
1759  * exclusive lock on each buffer it will store the cycle ID into.  This
1760  * ensures that even if a VACUUM starts immediately afterwards, it cannot
1761  * process those pages until the page split is complete.
1762  */
1763 BTCycleId
1764 _bt_vacuum_cycleid(Relation rel)
1765 {
1766         BTCycleId       result = 0;
1767         int                     i;
1768
1769         /* Share lock is enough since this is a read-only operation */
1770         LWLockAcquire(BtreeVacuumLock, LW_SHARED);
1771
1772         for (i = 0; i < btvacinfo->num_vacuums; i++)
1773         {
1774                 BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1775
1776                 if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1777                         vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1778                 {
1779                         result = vac->cycleid;
1780                         break;
1781                 }
1782         }
1783
1784         LWLockRelease(BtreeVacuumLock);
1785         return result;
1786 }
1787
1788 /*
1789  * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation
1790  *
1791  * Note: the caller must guarantee that it will eventually call
1792  * _bt_end_vacuum, else we'll permanently leak an array slot.  To ensure
1793  * that this happens even in elog(FATAL) scenarios, the appropriate coding
1794  * is not just a PG_TRY, but
1795  *              PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel))
1796  */
1797 BTCycleId
1798 _bt_start_vacuum(Relation rel)
1799 {
1800         BTCycleId       result;
1801         int                     i;
1802         BTOneVacInfo *vac;
1803
1804         LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1805
1806         /*
1807          * Assign the next cycle ID, being careful to avoid zero as well as the
1808          * reserved high values.
1809          */
1810         result = ++(btvacinfo->cycle_ctr);
1811         if (result == 0 || result > MAX_BT_CYCLE_ID)
1812                 result = btvacinfo->cycle_ctr = 1;
1813
1814         /* Let's just make sure there's no entry already for this index */
1815         for (i = 0; i < btvacinfo->num_vacuums; i++)
1816         {
1817                 vac = &btvacinfo->vacuums[i];
1818                 if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1819                         vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1820                 {
1821                         /*
1822                          * Unlike most places in the backend, we have to explicitly
1823                          * release our LWLock before throwing an error.  This is because
1824                          * we expect _bt_end_vacuum() to be called before transaction
1825                          * abort cleanup can run to release LWLocks.
1826                          */
1827                         LWLockRelease(BtreeVacuumLock);
1828                         elog(ERROR, "multiple active vacuums for index \"%s\"",
1829                                  RelationGetRelationName(rel));
1830                 }
1831         }
1832
1833         /* OK, add an entry */
1834         if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums)
1835         {
1836                 LWLockRelease(BtreeVacuumLock);
1837                 elog(ERROR, "out of btvacinfo slots");
1838         }
1839         vac = &btvacinfo->vacuums[btvacinfo->num_vacuums];
1840         vac->relid = rel->rd_lockInfo.lockRelId;
1841         vac->cycleid = result;
1842         btvacinfo->num_vacuums++;
1843
1844         LWLockRelease(BtreeVacuumLock);
1845         return result;
1846 }
1847
1848 /*
1849  * _bt_end_vacuum --- mark a btree VACUUM operation as done
1850  *
1851  * Note: this is deliberately coded not to complain if no entry is found;
1852  * this allows the caller to put PG_TRY around the start_vacuum operation.
1853  */
1854 void
1855 _bt_end_vacuum(Relation rel)
1856 {
1857         int                     i;
1858
1859         LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1860
1861         /* Find the array entry */
1862         for (i = 0; i < btvacinfo->num_vacuums; i++)
1863         {
1864                 BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1865
1866                 if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1867                         vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1868                 {
1869                         /* Remove it by shifting down the last entry */
1870                         *vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1];
1871                         btvacinfo->num_vacuums--;
1872                         break;
1873                 }
1874         }
1875
1876         LWLockRelease(BtreeVacuumLock);
1877 }
1878
1879 /*
1880  * _bt_end_vacuum wrapped as an on_shmem_exit callback function
1881  */
1882 void
1883 _bt_end_vacuum_callback(int code, Datum arg)
1884 {
1885         _bt_end_vacuum((Relation) DatumGetPointer(arg));
1886 }
1887
1888 /*
1889  * BTreeShmemSize --- report amount of shared memory space needed
1890  */
1891 Size
1892 BTreeShmemSize(void)
1893 {
1894         Size            size;
1895
1896         size = offsetof(BTVacInfo, vacuums[0]);
1897         size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo)));
1898         return size;
1899 }
1900
1901 /*
1902  * BTreeShmemInit --- initialize this module's shared memory
1903  */
1904 void
1905 BTreeShmemInit(void)
1906 {
1907         bool            found;
1908
1909         btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State",
1910                                                                                           BTreeShmemSize(),
1911                                                                                           &found);
1912
1913         if (!IsUnderPostmaster)
1914         {
1915                 /* Initialize shared memory area */
1916                 Assert(!found);
1917
1918                 /*
1919                  * It doesn't really matter what the cycle counter starts at, but
1920                  * having it always start the same doesn't seem good.  Seed with
1921                  * low-order bits of time() instead.
1922                  */
1923                 btvacinfo->cycle_ctr = (BTCycleId) time(NULL);
1924
1925                 btvacinfo->num_vacuums = 0;
1926                 btvacinfo->max_vacuums = MaxBackends;
1927         }
1928         else
1929                 Assert(found);
1930 }
1931
1932 Datum
1933 btoptions(PG_FUNCTION_ARGS)
1934 {
1935         Datum           reloptions = PG_GETARG_DATUM(0);
1936         bool            validate = PG_GETARG_BOOL(1);
1937         bytea      *result;
1938
1939         result = default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
1940         if (result)
1941                 PG_RETURN_BYTEA_P(result);
1942         PG_RETURN_NULL();
1943 }