]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtutils.c
Adjust INCLUDE index truncation comments and code.
[postgresql] / src / backend / access / nbtree / nbtutils.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtutils.c
4  *        Utility code for Postgres btree implementation.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/nbtree/nbtutils.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include <time.h>
19
20 #include "access/nbtree.h"
21 #include "access/reloptions.h"
22 #include "access/relscan.h"
23 #include "miscadmin.h"
24 #include "utils/array.h"
25 #include "utils/lsyscache.h"
26 #include "utils/memutils.h"
27 #include "utils/rel.h"
28
29
30 typedef struct BTSortArrayContext
31 {
32         FmgrInfo        flinfo;
33         Oid                     collation;
34         bool            reverse;
35 } BTSortArrayContext;
36
37 static Datum _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
38                                                  StrategyNumber strat,
39                                                  Datum *elems, int nelems);
40 static int _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
41                                                 bool reverse,
42                                                 Datum *elems, int nelems);
43 static int      _bt_compare_array_elements(const void *a, const void *b, void *arg);
44 static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
45                                                  ScanKey leftarg, ScanKey rightarg,
46                                                  bool *result);
47 static bool _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption);
48 static void _bt_mark_scankey_required(ScanKey skey);
49 static bool _bt_check_rowcompare(ScanKey skey,
50                                          IndexTuple tuple, TupleDesc tupdesc,
51                                          ScanDirection dir, bool *continuescan);
52
53
54 /*
55  * _bt_mkscankey
56  *              Build an insertion scan key that contains comparison data from itup
57  *              as well as comparator routines appropriate to the key datatypes.
58  *
59  *              The result is intended for use with _bt_compare().
60  */
61 ScanKey
62 _bt_mkscankey(Relation rel, IndexTuple itup)
63 {
64         ScanKey         skey;
65         TupleDesc       itupdesc;
66         int                     indnatts PG_USED_FOR_ASSERTS_ONLY;
67         int                     indnkeyatts;
68         int16      *indoption;
69         int                     i;
70
71         itupdesc = RelationGetDescr(rel);
72         indnatts = IndexRelationGetNumberOfAttributes(rel);
73         indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
74         indoption = rel->rd_indoption;
75
76         Assert(indnkeyatts > 0);
77         Assert(indnkeyatts <= indnatts);
78         Assert(BTreeTupleGetNAtts(itup, rel) == indnatts ||
79                    BTreeTupleGetNAtts(itup, rel) == indnkeyatts);
80
81         /*
82          * We'll execute search using scan key constructed on key columns. Non-key
83          * (INCLUDE index) columns are always omitted from scan keys.
84          */
85         skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
86
87         for (i = 0; i < indnkeyatts; i++)
88         {
89                 FmgrInfo   *procinfo;
90                 Datum           arg;
91                 bool            null;
92                 int                     flags;
93
94                 /*
95                  * We can use the cached (default) support procs since no cross-type
96                  * comparison can be needed.
97                  */
98                 procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
99                 arg = index_getattr(itup, i + 1, itupdesc, &null);
100                 flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
101                 ScanKeyEntryInitializeWithInfo(&skey[i],
102                                                                            flags,
103                                                                            (AttrNumber) (i + 1),
104                                                                            InvalidStrategy,
105                                                                            InvalidOid,
106                                                                            rel->rd_indcollation[i],
107                                                                            procinfo,
108                                                                            arg);
109         }
110
111         return skey;
112 }
113
114 /*
115  * _bt_mkscankey_nodata
116  *              Build an insertion scan key that contains 3-way comparator routines
117  *              appropriate to the key datatypes, but no comparison data.  The
118  *              comparison data ultimately used must match the key datatypes.
119  *
120  *              The result cannot be used with _bt_compare(), unless comparison
121  *              data is first stored into the key entries.  Currently this
122  *              routine is only called by nbtsort.c and tuplesort.c, which have
123  *              their own comparison routines.
124  */
125 ScanKey
126 _bt_mkscankey_nodata(Relation rel)
127 {
128         ScanKey         skey;
129         int                     indnkeyatts;
130         int16      *indoption;
131         int                     i;
132
133         indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
134         indoption = rel->rd_indoption;
135
136         skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
137
138         for (i = 0; i < indnkeyatts; i++)
139         {
140                 FmgrInfo   *procinfo;
141                 int                     flags;
142
143                 /*
144                  * We can use the cached (default) support procs since no cross-type
145                  * comparison can be needed.
146                  */
147                 procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
148                 flags = SK_ISNULL | (indoption[i] << SK_BT_INDOPTION_SHIFT);
149                 ScanKeyEntryInitializeWithInfo(&skey[i],
150                                                                            flags,
151                                                                            (AttrNumber) (i + 1),
152                                                                            InvalidStrategy,
153                                                                            InvalidOid,
154                                                                            rel->rd_indcollation[i],
155                                                                            procinfo,
156                                                                            (Datum) 0);
157         }
158
159         return skey;
160 }
161
162 /*
163  * free a scan key made by either _bt_mkscankey or _bt_mkscankey_nodata.
164  */
165 void
166 _bt_freeskey(ScanKey skey)
167 {
168         pfree(skey);
169 }
170
171 /*
172  * free a retracement stack made by _bt_search.
173  */
174 void
175 _bt_freestack(BTStack stack)
176 {
177         BTStack         ostack;
178
179         while (stack != NULL)
180         {
181                 ostack = stack;
182                 stack = stack->bts_parent;
183                 pfree(ostack);
184         }
185 }
186
187
188 /*
189  *      _bt_preprocess_array_keys() -- Preprocess SK_SEARCHARRAY scan keys
190  *
191  * If there are any SK_SEARCHARRAY scan keys, deconstruct the array(s) and
192  * set up BTArrayKeyInfo info for each one that is an equality-type key.
193  * Prepare modified scan keys in so->arrayKeyData, which will hold the current
194  * array elements during each primitive indexscan operation.  For inequality
195  * array keys, it's sufficient to find the extreme element value and replace
196  * the whole array with that scalar value.
197  *
198  * Note: the reason we need so->arrayKeyData, rather than just scribbling
199  * on scan->keyData, is that callers are permitted to call btrescan without
200  * supplying a new set of scankey data.
201  */
202 void
203 _bt_preprocess_array_keys(IndexScanDesc scan)
204 {
205         BTScanOpaque so = (BTScanOpaque) scan->opaque;
206         int                     numberOfKeys = scan->numberOfKeys;
207         int16      *indoption = scan->indexRelation->rd_indoption;
208         int                     numArrayKeys;
209         ScanKey         cur;
210         int                     i;
211         MemoryContext oldContext;
212
213         /* Quick check to see if there are any array keys */
214         numArrayKeys = 0;
215         for (i = 0; i < numberOfKeys; i++)
216         {
217                 cur = &scan->keyData[i];
218                 if (cur->sk_flags & SK_SEARCHARRAY)
219                 {
220                         numArrayKeys++;
221                         Assert(!(cur->sk_flags & (SK_ROW_HEADER | SK_SEARCHNULL | SK_SEARCHNOTNULL)));
222                         /* If any arrays are null as a whole, we can quit right now. */
223                         if (cur->sk_flags & SK_ISNULL)
224                         {
225                                 so->numArrayKeys = -1;
226                                 so->arrayKeyData = NULL;
227                                 return;
228                         }
229                 }
230         }
231
232         /* Quit if nothing to do. */
233         if (numArrayKeys == 0)
234         {
235                 so->numArrayKeys = 0;
236                 so->arrayKeyData = NULL;
237                 return;
238         }
239
240         /*
241          * Make a scan-lifespan context to hold array-associated data, or reset it
242          * if we already have one from a previous rescan cycle.
243          */
244         if (so->arrayContext == NULL)
245                 so->arrayContext = AllocSetContextCreate(CurrentMemoryContext,
246                                                                                                  "BTree array context",
247                                                                                                  ALLOCSET_SMALL_SIZES);
248         else
249                 MemoryContextReset(so->arrayContext);
250
251         oldContext = MemoryContextSwitchTo(so->arrayContext);
252
253         /* Create modifiable copy of scan->keyData in the workspace context */
254         so->arrayKeyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
255         memcpy(so->arrayKeyData,
256                    scan->keyData,
257                    scan->numberOfKeys * sizeof(ScanKeyData));
258
259         /* Allocate space for per-array data in the workspace context */
260         so->arrayKeys = (BTArrayKeyInfo *) palloc0(numArrayKeys * sizeof(BTArrayKeyInfo));
261
262         /* Now process each array key */
263         numArrayKeys = 0;
264         for (i = 0; i < numberOfKeys; i++)
265         {
266                 ArrayType  *arrayval;
267                 int16           elmlen;
268                 bool            elmbyval;
269                 char            elmalign;
270                 int                     num_elems;
271                 Datum      *elem_values;
272                 bool       *elem_nulls;
273                 int                     num_nonnulls;
274                 int                     j;
275
276                 cur = &so->arrayKeyData[i];
277                 if (!(cur->sk_flags & SK_SEARCHARRAY))
278                         continue;
279
280                 /*
281                  * First, deconstruct the array into elements.  Anything allocated
282                  * here (including a possibly detoasted array value) is in the
283                  * workspace context.
284                  */
285                 arrayval = DatumGetArrayTypeP(cur->sk_argument);
286                 /* We could cache this data, but not clear it's worth it */
287                 get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
288                                                          &elmlen, &elmbyval, &elmalign);
289                 deconstruct_array(arrayval,
290                                                   ARR_ELEMTYPE(arrayval),
291                                                   elmlen, elmbyval, elmalign,
292                                                   &elem_values, &elem_nulls, &num_elems);
293
294                 /*
295                  * Compress out any null elements.  We can ignore them since we assume
296                  * all btree operators are strict.
297                  */
298                 num_nonnulls = 0;
299                 for (j = 0; j < num_elems; j++)
300                 {
301                         if (!elem_nulls[j])
302                                 elem_values[num_nonnulls++] = elem_values[j];
303                 }
304
305                 /* We could pfree(elem_nulls) now, but not worth the cycles */
306
307                 /* If there's no non-nulls, the scan qual is unsatisfiable */
308                 if (num_nonnulls == 0)
309                 {
310                         numArrayKeys = -1;
311                         break;
312                 }
313
314                 /*
315                  * If the comparison operator is not equality, then the array qual
316                  * degenerates to a simple comparison against the smallest or largest
317                  * non-null array element, as appropriate.
318                  */
319                 switch (cur->sk_strategy)
320                 {
321                         case BTLessStrategyNumber:
322                         case BTLessEqualStrategyNumber:
323                                 cur->sk_argument =
324                                         _bt_find_extreme_element(scan, cur,
325                                                                                          BTGreaterStrategyNumber,
326                                                                                          elem_values, num_nonnulls);
327                                 continue;
328                         case BTEqualStrategyNumber:
329                                 /* proceed with rest of loop */
330                                 break;
331                         case BTGreaterEqualStrategyNumber:
332                         case BTGreaterStrategyNumber:
333                                 cur->sk_argument =
334                                         _bt_find_extreme_element(scan, cur,
335                                                                                          BTLessStrategyNumber,
336                                                                                          elem_values, num_nonnulls);
337                                 continue;
338                         default:
339                                 elog(ERROR, "unrecognized StrategyNumber: %d",
340                                          (int) cur->sk_strategy);
341                                 break;
342                 }
343
344                 /*
345                  * Sort the non-null elements and eliminate any duplicates.  We must
346                  * sort in the same ordering used by the index column, so that the
347                  * successive primitive indexscans produce data in index order.
348                  */
349                 num_elems = _bt_sort_array_elements(scan, cur,
350                                                                                         (indoption[cur->sk_attno - 1] & INDOPTION_DESC) != 0,
351                                                                                         elem_values, num_nonnulls);
352
353                 /*
354                  * And set up the BTArrayKeyInfo data.
355                  */
356                 so->arrayKeys[numArrayKeys].scan_key = i;
357                 so->arrayKeys[numArrayKeys].num_elems = num_elems;
358                 so->arrayKeys[numArrayKeys].elem_values = elem_values;
359                 numArrayKeys++;
360         }
361
362         so->numArrayKeys = numArrayKeys;
363
364         MemoryContextSwitchTo(oldContext);
365 }
366
367 /*
368  * _bt_find_extreme_element() -- get least or greatest array element
369  *
370  * scan and skey identify the index column, whose opfamily determines the
371  * comparison semantics.  strat should be BTLessStrategyNumber to get the
372  * least element, or BTGreaterStrategyNumber to get the greatest.
373  */
374 static Datum
375 _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
376                                                  StrategyNumber strat,
377                                                  Datum *elems, int nelems)
378 {
379         Relation        rel = scan->indexRelation;
380         Oid                     elemtype,
381                                 cmp_op;
382         RegProcedure cmp_proc;
383         FmgrInfo        flinfo;
384         Datum           result;
385         int                     i;
386
387         /*
388          * Determine the nominal datatype of the array elements.  We have to
389          * support the convention that sk_subtype == InvalidOid means the opclass
390          * input type; this is a hack to simplify life for ScanKeyInit().
391          */
392         elemtype = skey->sk_subtype;
393         if (elemtype == InvalidOid)
394                 elemtype = rel->rd_opcintype[skey->sk_attno - 1];
395
396         /*
397          * Look up the appropriate comparison operator in the opfamily.
398          *
399          * Note: it's possible that this would fail, if the opfamily is
400          * incomplete, but it seems quite unlikely that an opfamily would omit
401          * non-cross-type comparison operators for any datatype that it supports
402          * at all.
403          */
404         cmp_op = get_opfamily_member(rel->rd_opfamily[skey->sk_attno - 1],
405                                                                  elemtype,
406                                                                  elemtype,
407                                                                  strat);
408         if (!OidIsValid(cmp_op))
409                 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
410                          strat, elemtype, elemtype,
411                          rel->rd_opfamily[skey->sk_attno - 1]);
412         cmp_proc = get_opcode(cmp_op);
413         if (!RegProcedureIsValid(cmp_proc))
414                 elog(ERROR, "missing oprcode for operator %u", cmp_op);
415
416         fmgr_info(cmp_proc, &flinfo);
417
418         Assert(nelems > 0);
419         result = elems[0];
420         for (i = 1; i < nelems; i++)
421         {
422                 if (DatumGetBool(FunctionCall2Coll(&flinfo,
423                                                                                    skey->sk_collation,
424                                                                                    elems[i],
425                                                                                    result)))
426                         result = elems[i];
427         }
428
429         return result;
430 }
431
432 /*
433  * _bt_sort_array_elements() -- sort and de-dup array elements
434  *
435  * The array elements are sorted in-place, and the new number of elements
436  * after duplicate removal is returned.
437  *
438  * scan and skey identify the index column, whose opfamily determines the
439  * comparison semantics.  If reverse is true, we sort in descending order.
440  */
441 static int
442 _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
443                                                 bool reverse,
444                                                 Datum *elems, int nelems)
445 {
446         Relation        rel = scan->indexRelation;
447         Oid                     elemtype;
448         RegProcedure cmp_proc;
449         BTSortArrayContext cxt;
450         int                     last_non_dup;
451         int                     i;
452
453         if (nelems <= 1)
454                 return nelems;                  /* no work to do */
455
456         /*
457          * Determine the nominal datatype of the array elements.  We have to
458          * support the convention that sk_subtype == InvalidOid means the opclass
459          * input type; this is a hack to simplify life for ScanKeyInit().
460          */
461         elemtype = skey->sk_subtype;
462         if (elemtype == InvalidOid)
463                 elemtype = rel->rd_opcintype[skey->sk_attno - 1];
464
465         /*
466          * Look up the appropriate comparison function in the opfamily.
467          *
468          * Note: it's possible that this would fail, if the opfamily is
469          * incomplete, but it seems quite unlikely that an opfamily would omit
470          * non-cross-type support functions for any datatype that it supports at
471          * all.
472          */
473         cmp_proc = get_opfamily_proc(rel->rd_opfamily[skey->sk_attno - 1],
474                                                                  elemtype,
475                                                                  elemtype,
476                                                                  BTORDER_PROC);
477         if (!RegProcedureIsValid(cmp_proc))
478                 elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
479                          BTORDER_PROC, elemtype, elemtype,
480                          rel->rd_opfamily[skey->sk_attno - 1]);
481
482         /* Sort the array elements */
483         fmgr_info(cmp_proc, &cxt.flinfo);
484         cxt.collation = skey->sk_collation;
485         cxt.reverse = reverse;
486         qsort_arg((void *) elems, nelems, sizeof(Datum),
487                           _bt_compare_array_elements, (void *) &cxt);
488
489         /* Now scan the sorted elements and remove duplicates */
490         last_non_dup = 0;
491         for (i = 1; i < nelems; i++)
492         {
493                 int32           compare;
494
495                 compare = DatumGetInt32(FunctionCall2Coll(&cxt.flinfo,
496                                                                                                   cxt.collation,
497                                                                                                   elems[last_non_dup],
498                                                                                                   elems[i]));
499                 if (compare != 0)
500                         elems[++last_non_dup] = elems[i];
501         }
502
503         return last_non_dup + 1;
504 }
505
506 /*
507  * qsort_arg comparator for sorting array elements
508  */
509 static int
510 _bt_compare_array_elements(const void *a, const void *b, void *arg)
511 {
512         Datum           da = *((const Datum *) a);
513         Datum           db = *((const Datum *) b);
514         BTSortArrayContext *cxt = (BTSortArrayContext *) arg;
515         int32           compare;
516
517         compare = DatumGetInt32(FunctionCall2Coll(&cxt->flinfo,
518                                                                                           cxt->collation,
519                                                                                           da, db));
520         if (cxt->reverse)
521                 compare = -compare;
522         return compare;
523 }
524
525 /*
526  * _bt_start_array_keys() -- Initialize array keys at start of a scan
527  *
528  * Set up the cur_elem counters and fill in the first sk_argument value for
529  * each array scankey.  We can't do this until we know the scan direction.
530  */
531 void
532 _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir)
533 {
534         BTScanOpaque so = (BTScanOpaque) scan->opaque;
535         int                     i;
536
537         for (i = 0; i < so->numArrayKeys; i++)
538         {
539                 BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
540                 ScanKey         skey = &so->arrayKeyData[curArrayKey->scan_key];
541
542                 Assert(curArrayKey->num_elems > 0);
543                 if (ScanDirectionIsBackward(dir))
544                         curArrayKey->cur_elem = curArrayKey->num_elems - 1;
545                 else
546                         curArrayKey->cur_elem = 0;
547                 skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem];
548         }
549 }
550
551 /*
552  * _bt_advance_array_keys() -- Advance to next set of array elements
553  *
554  * Returns true if there is another set of values to consider, false if not.
555  * On true result, the scankeys are initialized with the next set of values.
556  */
557 bool
558 _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
559 {
560         BTScanOpaque so = (BTScanOpaque) scan->opaque;
561         bool            found = false;
562         int                     i;
563
564         /*
565          * We must advance the last array key most quickly, since it will
566          * correspond to the lowest-order index column among the available
567          * qualifications. This is necessary to ensure correct ordering of output
568          * when there are multiple array keys.
569          */
570         for (i = so->numArrayKeys - 1; i >= 0; i--)
571         {
572                 BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
573                 ScanKey         skey = &so->arrayKeyData[curArrayKey->scan_key];
574                 int                     cur_elem = curArrayKey->cur_elem;
575                 int                     num_elems = curArrayKey->num_elems;
576
577                 if (ScanDirectionIsBackward(dir))
578                 {
579                         if (--cur_elem < 0)
580                         {
581                                 cur_elem = num_elems - 1;
582                                 found = false;  /* need to advance next array key */
583                         }
584                         else
585                                 found = true;
586                 }
587                 else
588                 {
589                         if (++cur_elem >= num_elems)
590                         {
591                                 cur_elem = 0;
592                                 found = false;  /* need to advance next array key */
593                         }
594                         else
595                                 found = true;
596                 }
597
598                 curArrayKey->cur_elem = cur_elem;
599                 skey->sk_argument = curArrayKey->elem_values[cur_elem];
600                 if (found)
601                         break;
602         }
603
604         /* advance parallel scan */
605         if (scan->parallel_scan != NULL)
606                 _bt_parallel_advance_array_keys(scan);
607
608         return found;
609 }
610
611 /*
612  * _bt_mark_array_keys() -- Handle array keys during btmarkpos
613  *
614  * Save the current state of the array keys as the "mark" position.
615  */
616 void
617 _bt_mark_array_keys(IndexScanDesc scan)
618 {
619         BTScanOpaque so = (BTScanOpaque) scan->opaque;
620         int                     i;
621
622         for (i = 0; i < so->numArrayKeys; i++)
623         {
624                 BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
625
626                 curArrayKey->mark_elem = curArrayKey->cur_elem;
627         }
628 }
629
630 /*
631  * _bt_restore_array_keys() -- Handle array keys during btrestrpos
632  *
633  * Restore the array keys to where they were when the mark was set.
634  */
635 void
636 _bt_restore_array_keys(IndexScanDesc scan)
637 {
638         BTScanOpaque so = (BTScanOpaque) scan->opaque;
639         bool            changed = false;
640         int                     i;
641
642         /* Restore each array key to its position when the mark was set */
643         for (i = 0; i < so->numArrayKeys; i++)
644         {
645                 BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
646                 ScanKey         skey = &so->arrayKeyData[curArrayKey->scan_key];
647                 int                     mark_elem = curArrayKey->mark_elem;
648
649                 if (curArrayKey->cur_elem != mark_elem)
650                 {
651                         curArrayKey->cur_elem = mark_elem;
652                         skey->sk_argument = curArrayKey->elem_values[mark_elem];
653                         changed = true;
654                 }
655         }
656
657         /*
658          * If we changed any keys, we must redo _bt_preprocess_keys.  That might
659          * sound like overkill, but in cases with multiple keys per index column
660          * it seems necessary to do the full set of pushups.
661          */
662         if (changed)
663         {
664                 _bt_preprocess_keys(scan);
665                 /* The mark should have been set on a consistent set of keys... */
666                 Assert(so->qual_ok);
667         }
668 }
669
670
671 /*
672  *      _bt_preprocess_keys() -- Preprocess scan keys
673  *
674  * The given search-type keys (in scan->keyData[] or so->arrayKeyData[])
675  * are copied to so->keyData[] with possible transformation.
676  * scan->numberOfKeys is the number of input keys, so->numberOfKeys gets
677  * the number of output keys (possibly less, never greater).
678  *
679  * The output keys are marked with additional sk_flag bits beyond the
680  * system-standard bits supplied by the caller.  The DESC and NULLS_FIRST
681  * indoption bits for the relevant index attribute are copied into the flags.
682  * Also, for a DESC column, we commute (flip) all the sk_strategy numbers
683  * so that the index sorts in the desired direction.
684  *
685  * One key purpose of this routine is to discover which scan keys must be
686  * satisfied to continue the scan.  It also attempts to eliminate redundant
687  * keys and detect contradictory keys.  (If the index opfamily provides
688  * incomplete sets of cross-type operators, we may fail to detect redundant
689  * or contradictory keys, but we can survive that.)
690  *
691  * The output keys must be sorted by index attribute.  Presently we expect
692  * (but verify) that the input keys are already so sorted --- this is done
693  * by match_clauses_to_index() in indxpath.c.  Some reordering of the keys
694  * within each attribute may be done as a byproduct of the processing here,
695  * but no other code depends on that.
696  *
697  * The output keys are marked with flags SK_BT_REQFWD and/or SK_BT_REQBKWD
698  * if they must be satisfied in order to continue the scan forward or backward
699  * respectively.  _bt_checkkeys uses these flags.  For example, if the quals
700  * are "x = 1 AND y < 4 AND z < 5", then _bt_checkkeys will reject a tuple
701  * (1,2,7), but we must continue the scan in case there are tuples (1,3,z).
702  * But once we reach tuples like (1,4,z) we can stop scanning because no
703  * later tuples could match.  This is reflected by marking the x and y keys,
704  * but not the z key, with SK_BT_REQFWD.  In general, the keys for leading
705  * attributes with "=" keys are marked both SK_BT_REQFWD and SK_BT_REQBKWD.
706  * For the first attribute without an "=" key, any "<" and "<=" keys are
707  * marked SK_BT_REQFWD while any ">" and ">=" keys are marked SK_BT_REQBKWD.
708  * This can be seen to be correct by considering the above example.  Note
709  * in particular that if there are no keys for a given attribute, the keys for
710  * subsequent attributes can never be required; for instance "WHERE y = 4"
711  * requires a full-index scan.
712  *
713  * If possible, redundant keys are eliminated: we keep only the tightest
714  * >/>= bound and the tightest </<= bound, and if there's an = key then
715  * that's the only one returned.  (So, we return either a single = key,
716  * or one or two boundary-condition keys for each attr.)  However, if we
717  * cannot compare two keys for lack of a suitable cross-type operator,
718  * we cannot eliminate either.  If there are two such keys of the same
719  * operator strategy, the second one is just pushed into the output array
720  * without further processing here.  We may also emit both >/>= or both
721  * </<= keys if we can't compare them.  The logic about required keys still
722  * works if we don't eliminate redundant keys.
723  *
724  * Note that one reason we need direction-sensitive required-key flags is
725  * precisely that we may not be able to eliminate redundant keys.  Suppose
726  * we have "x > 4::int AND x > 10::bigint", and we are unable to determine
727  * which key is more restrictive for lack of a suitable cross-type operator.
728  * _bt_first will arbitrarily pick one of the keys to do the initial
729  * positioning with.  If it picks x > 4, then the x > 10 condition will fail
730  * until we reach index entries > 10; but we can't stop the scan just because
731  * x > 10 is failing.  On the other hand, if we are scanning backwards, then
732  * failure of either key is indeed enough to stop the scan.  (In general, when
733  * inequality keys are present, the initial-positioning code only promises to
734  * position before the first possible match, not exactly at the first match,
735  * for a forward scan; or after the last match for a backward scan.)
736  *
737  * As a byproduct of this work, we can detect contradictory quals such
738  * as "x = 1 AND x > 2".  If we see that, we return so->qual_ok = false,
739  * indicating the scan need not be run at all since no tuples can match.
740  * (In this case we do not bother completing the output key array!)
741  * Again, missing cross-type operators might cause us to fail to prove the
742  * quals contradictory when they really are, but the scan will work correctly.
743  *
744  * Row comparison keys are currently also treated without any smarts:
745  * we just transfer them into the preprocessed array without any
746  * editorialization.  We can treat them the same as an ordinary inequality
747  * comparison on the row's first index column, for the purposes of the logic
748  * about required keys.
749  *
750  * Note: the reason we have to copy the preprocessed scan keys into private
751  * storage is that we are modifying the array based on comparisons of the
752  * key argument values, which could change on a rescan or after moving to
753  * new elements of array keys.  Therefore we can't overwrite the source data.
754  */
755 void
756 _bt_preprocess_keys(IndexScanDesc scan)
757 {
758         BTScanOpaque so = (BTScanOpaque) scan->opaque;
759         int                     numberOfKeys = scan->numberOfKeys;
760         int16      *indoption = scan->indexRelation->rd_indoption;
761         int                     new_numberOfKeys;
762         int                     numberOfEqualCols;
763         ScanKey         inkeys;
764         ScanKey         outkeys;
765         ScanKey         cur;
766         ScanKey         xform[BTMaxStrategyNumber];
767         bool            test_result;
768         int                     i,
769                                 j;
770         AttrNumber      attno;
771
772         /* initialize result variables */
773         so->qual_ok = true;
774         so->numberOfKeys = 0;
775
776         if (numberOfKeys < 1)
777                 return;                                 /* done if qual-less scan */
778
779         /*
780          * Read so->arrayKeyData if array keys are present, else scan->keyData
781          */
782         if (so->arrayKeyData != NULL)
783                 inkeys = so->arrayKeyData;
784         else
785                 inkeys = scan->keyData;
786
787         outkeys = so->keyData;
788         cur = &inkeys[0];
789         /* we check that input keys are correctly ordered */
790         if (cur->sk_attno < 1)
791                 elog(ERROR, "btree index keys must be ordered by attribute");
792
793         /* We can short-circuit most of the work if there's just one key */
794         if (numberOfKeys == 1)
795         {
796                 /* Apply indoption to scankey (might change sk_strategy!) */
797                 if (!_bt_fix_scankey_strategy(cur, indoption))
798                         so->qual_ok = false;
799                 memcpy(outkeys, cur, sizeof(ScanKeyData));
800                 so->numberOfKeys = 1;
801                 /* We can mark the qual as required if it's for first index col */
802                 if (cur->sk_attno == 1)
803                         _bt_mark_scankey_required(outkeys);
804                 return;
805         }
806
807         /*
808          * Otherwise, do the full set of pushups.
809          */
810         new_numberOfKeys = 0;
811         numberOfEqualCols = 0;
812
813         /*
814          * Initialize for processing of keys for attr 1.
815          *
816          * xform[i] points to the currently best scan key of strategy type i+1; it
817          * is NULL if we haven't yet found such a key for this attr.
818          */
819         attno = 1;
820         memset(xform, 0, sizeof(xform));
821
822         /*
823          * Loop iterates from 0 to numberOfKeys inclusive; we use the last pass to
824          * handle after-last-key processing.  Actual exit from the loop is at the
825          * "break" statement below.
826          */
827         for (i = 0;; cur++, i++)
828         {
829                 if (i < numberOfKeys)
830                 {
831                         /* Apply indoption to scankey (might change sk_strategy!) */
832                         if (!_bt_fix_scankey_strategy(cur, indoption))
833                         {
834                                 /* NULL can't be matched, so give up */
835                                 so->qual_ok = false;
836                                 return;
837                         }
838                 }
839
840                 /*
841                  * If we are at the end of the keys for a particular attr, finish up
842                  * processing and emit the cleaned-up keys.
843                  */
844                 if (i == numberOfKeys || cur->sk_attno != attno)
845                 {
846                         int                     priorNumberOfEqualCols = numberOfEqualCols;
847
848                         /* check input keys are correctly ordered */
849                         if (i < numberOfKeys && cur->sk_attno < attno)
850                                 elog(ERROR, "btree index keys must be ordered by attribute");
851
852                         /*
853                          * If = has been specified, all other keys can be eliminated as
854                          * redundant.  If we have a case like key = 1 AND key > 2, we can
855                          * set qual_ok to false and abandon further processing.
856                          *
857                          * We also have to deal with the case of "key IS NULL", which is
858                          * unsatisfiable in combination with any other index condition. By
859                          * the time we get here, that's been classified as an equality
860                          * check, and we've rejected any combination of it with a regular
861                          * equality condition; but not with other types of conditions.
862                          */
863                         if (xform[BTEqualStrategyNumber - 1])
864                         {
865                                 ScanKey         eq = xform[BTEqualStrategyNumber - 1];
866
867                                 for (j = BTMaxStrategyNumber; --j >= 0;)
868                                 {
869                                         ScanKey         chk = xform[j];
870
871                                         if (!chk || j == (BTEqualStrategyNumber - 1))
872                                                 continue;
873
874                                         if (eq->sk_flags & SK_SEARCHNULL)
875                                         {
876                                                 /* IS NULL is contradictory to anything else */
877                                                 so->qual_ok = false;
878                                                 return;
879                                         }
880
881                                         if (_bt_compare_scankey_args(scan, chk, eq, chk,
882                                                                                                  &test_result))
883                                         {
884                                                 if (!test_result)
885                                                 {
886                                                         /* keys proven mutually contradictory */
887                                                         so->qual_ok = false;
888                                                         return;
889                                                 }
890                                                 /* else discard the redundant non-equality key */
891                                                 xform[j] = NULL;
892                                         }
893                                         /* else, cannot determine redundancy, keep both keys */
894                                 }
895                                 /* track number of attrs for which we have "=" keys */
896                                 numberOfEqualCols++;
897                         }
898
899                         /* try to keep only one of <, <= */
900                         if (xform[BTLessStrategyNumber - 1]
901                                 && xform[BTLessEqualStrategyNumber - 1])
902                         {
903                                 ScanKey         lt = xform[BTLessStrategyNumber - 1];
904                                 ScanKey         le = xform[BTLessEqualStrategyNumber - 1];
905
906                                 if (_bt_compare_scankey_args(scan, le, lt, le,
907                                                                                          &test_result))
908                                 {
909                                         if (test_result)
910                                                 xform[BTLessEqualStrategyNumber - 1] = NULL;
911                                         else
912                                                 xform[BTLessStrategyNumber - 1] = NULL;
913                                 }
914                         }
915
916                         /* try to keep only one of >, >= */
917                         if (xform[BTGreaterStrategyNumber - 1]
918                                 && xform[BTGreaterEqualStrategyNumber - 1])
919                         {
920                                 ScanKey         gt = xform[BTGreaterStrategyNumber - 1];
921                                 ScanKey         ge = xform[BTGreaterEqualStrategyNumber - 1];
922
923                                 if (_bt_compare_scankey_args(scan, ge, gt, ge,
924                                                                                          &test_result))
925                                 {
926                                         if (test_result)
927                                                 xform[BTGreaterEqualStrategyNumber - 1] = NULL;
928                                         else
929                                                 xform[BTGreaterStrategyNumber - 1] = NULL;
930                                 }
931                         }
932
933                         /*
934                          * Emit the cleaned-up keys into the outkeys[] array, and then
935                          * mark them if they are required.  They are required (possibly
936                          * only in one direction) if all attrs before this one had "=".
937                          */
938                         for (j = BTMaxStrategyNumber; --j >= 0;)
939                         {
940                                 if (xform[j])
941                                 {
942                                         ScanKey         outkey = &outkeys[new_numberOfKeys++];
943
944                                         memcpy(outkey, xform[j], sizeof(ScanKeyData));
945                                         if (priorNumberOfEqualCols == attno - 1)
946                                                 _bt_mark_scankey_required(outkey);
947                                 }
948                         }
949
950                         /*
951                          * Exit loop here if done.
952                          */
953                         if (i == numberOfKeys)
954                                 break;
955
956                         /* Re-initialize for new attno */
957                         attno = cur->sk_attno;
958                         memset(xform, 0, sizeof(xform));
959                 }
960
961                 /* check strategy this key's operator corresponds to */
962                 j = cur->sk_strategy - 1;
963
964                 /* if row comparison, push it directly to the output array */
965                 if (cur->sk_flags & SK_ROW_HEADER)
966                 {
967                         ScanKey         outkey = &outkeys[new_numberOfKeys++];
968
969                         memcpy(outkey, cur, sizeof(ScanKeyData));
970                         if (numberOfEqualCols == attno - 1)
971                                 _bt_mark_scankey_required(outkey);
972
973                         /*
974                          * We don't support RowCompare using equality; such a qual would
975                          * mess up the numberOfEqualCols tracking.
976                          */
977                         Assert(j != (BTEqualStrategyNumber - 1));
978                         continue;
979                 }
980
981                 /* have we seen one of these before? */
982                 if (xform[j] == NULL)
983                 {
984                         /* nope, so remember this scankey */
985                         xform[j] = cur;
986                 }
987                 else
988                 {
989                         /* yup, keep only the more restrictive key */
990                         if (_bt_compare_scankey_args(scan, cur, cur, xform[j],
991                                                                                  &test_result))
992                         {
993                                 if (test_result)
994                                         xform[j] = cur;
995                                 else if (j == (BTEqualStrategyNumber - 1))
996                                 {
997                                         /* key == a && key == b, but a != b */
998                                         so->qual_ok = false;
999                                         return;
1000                                 }
1001                                 /* else old key is more restrictive, keep it */
1002                         }
1003                         else
1004                         {
1005                                 /*
1006                                  * We can't determine which key is more restrictive.  Keep the
1007                                  * previous one in xform[j] and push this one directly to the
1008                                  * output array.
1009                                  */
1010                                 ScanKey         outkey = &outkeys[new_numberOfKeys++];
1011
1012                                 memcpy(outkey, cur, sizeof(ScanKeyData));
1013                                 if (numberOfEqualCols == attno - 1)
1014                                         _bt_mark_scankey_required(outkey);
1015                         }
1016                 }
1017         }
1018
1019         so->numberOfKeys = new_numberOfKeys;
1020 }
1021
1022 /*
1023  * Compare two scankey values using a specified operator.
1024  *
1025  * The test we want to perform is logically "leftarg op rightarg", where
1026  * leftarg and rightarg are the sk_argument values in those ScanKeys, and
1027  * the comparison operator is the one in the op ScanKey.  However, in
1028  * cross-data-type situations we may need to look up the correct operator in
1029  * the index's opfamily: it is the one having amopstrategy = op->sk_strategy
1030  * and amoplefttype/amoprighttype equal to the two argument datatypes.
1031  *
1032  * If the opfamily doesn't supply a complete set of cross-type operators we
1033  * may not be able to make the comparison.  If we can make the comparison
1034  * we store the operator result in *result and return true.  We return false
1035  * if the comparison could not be made.
1036  *
1037  * Note: op always points at the same ScanKey as either leftarg or rightarg.
1038  * Since we don't scribble on the scankeys, this aliasing should cause no
1039  * trouble.
1040  *
1041  * Note: this routine needs to be insensitive to any DESC option applied
1042  * to the index column.  For example, "x < 4" is a tighter constraint than
1043  * "x < 5" regardless of which way the index is sorted.
1044  */
1045 static bool
1046 _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
1047                                                  ScanKey leftarg, ScanKey rightarg,
1048                                                  bool *result)
1049 {
1050         Relation        rel = scan->indexRelation;
1051         Oid                     lefttype,
1052                                 righttype,
1053                                 optype,
1054                                 opcintype,
1055                                 cmp_op;
1056         StrategyNumber strat;
1057
1058         /*
1059          * First, deal with cases where one or both args are NULL.  This should
1060          * only happen when the scankeys represent IS NULL/NOT NULL conditions.
1061          */
1062         if ((leftarg->sk_flags | rightarg->sk_flags) & SK_ISNULL)
1063         {
1064                 bool            leftnull,
1065                                         rightnull;
1066
1067                 if (leftarg->sk_flags & SK_ISNULL)
1068                 {
1069                         Assert(leftarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
1070                         leftnull = true;
1071                 }
1072                 else
1073                         leftnull = false;
1074                 if (rightarg->sk_flags & SK_ISNULL)
1075                 {
1076                         Assert(rightarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
1077                         rightnull = true;
1078                 }
1079                 else
1080                         rightnull = false;
1081
1082                 /*
1083                  * We treat NULL as either greater than or less than all other values.
1084                  * Since true > false, the tests below work correctly for NULLS LAST
1085                  * logic.  If the index is NULLS FIRST, we need to flip the strategy.
1086                  */
1087                 strat = op->sk_strategy;
1088                 if (op->sk_flags & SK_BT_NULLS_FIRST)
1089                         strat = BTCommuteStrategyNumber(strat);
1090
1091                 switch (strat)
1092                 {
1093                         case BTLessStrategyNumber:
1094                                 *result = (leftnull < rightnull);
1095                                 break;
1096                         case BTLessEqualStrategyNumber:
1097                                 *result = (leftnull <= rightnull);
1098                                 break;
1099                         case BTEqualStrategyNumber:
1100                                 *result = (leftnull == rightnull);
1101                                 break;
1102                         case BTGreaterEqualStrategyNumber:
1103                                 *result = (leftnull >= rightnull);
1104                                 break;
1105                         case BTGreaterStrategyNumber:
1106                                 *result = (leftnull > rightnull);
1107                                 break;
1108                         default:
1109                                 elog(ERROR, "unrecognized StrategyNumber: %d", (int) strat);
1110                                 *result = false;        /* keep compiler quiet */
1111                                 break;
1112                 }
1113                 return true;
1114         }
1115
1116         /*
1117          * The opfamily we need to worry about is identified by the index column.
1118          */
1119         Assert(leftarg->sk_attno == rightarg->sk_attno);
1120
1121         opcintype = rel->rd_opcintype[leftarg->sk_attno - 1];
1122
1123         /*
1124          * Determine the actual datatypes of the ScanKey arguments.  We have to
1125          * support the convention that sk_subtype == InvalidOid means the opclass
1126          * input type; this is a hack to simplify life for ScanKeyInit().
1127          */
1128         lefttype = leftarg->sk_subtype;
1129         if (lefttype == InvalidOid)
1130                 lefttype = opcintype;
1131         righttype = rightarg->sk_subtype;
1132         if (righttype == InvalidOid)
1133                 righttype = opcintype;
1134         optype = op->sk_subtype;
1135         if (optype == InvalidOid)
1136                 optype = opcintype;
1137
1138         /*
1139          * If leftarg and rightarg match the types expected for the "op" scankey,
1140          * we can use its already-looked-up comparison function.
1141          */
1142         if (lefttype == opcintype && righttype == optype)
1143         {
1144                 *result = DatumGetBool(FunctionCall2Coll(&op->sk_func,
1145                                                                                                  op->sk_collation,
1146                                                                                                  leftarg->sk_argument,
1147                                                                                                  rightarg->sk_argument));
1148                 return true;
1149         }
1150
1151         /*
1152          * Otherwise, we need to go to the syscache to find the appropriate
1153          * operator.  (This cannot result in infinite recursion, since no
1154          * indexscan initiated by syscache lookup will use cross-data-type
1155          * operators.)
1156          *
1157          * If the sk_strategy was flipped by _bt_fix_scankey_strategy, we have to
1158          * un-flip it to get the correct opfamily member.
1159          */
1160         strat = op->sk_strategy;
1161         if (op->sk_flags & SK_BT_DESC)
1162                 strat = BTCommuteStrategyNumber(strat);
1163
1164         cmp_op = get_opfamily_member(rel->rd_opfamily[leftarg->sk_attno - 1],
1165                                                                  lefttype,
1166                                                                  righttype,
1167                                                                  strat);
1168         if (OidIsValid(cmp_op))
1169         {
1170                 RegProcedure cmp_proc = get_opcode(cmp_op);
1171
1172                 if (RegProcedureIsValid(cmp_proc))
1173                 {
1174                         *result = DatumGetBool(OidFunctionCall2Coll(cmp_proc,
1175                                                                                                                 op->sk_collation,
1176                                                                                                                 leftarg->sk_argument,
1177                                                                                                                 rightarg->sk_argument));
1178                         return true;
1179                 }
1180         }
1181
1182         /* Can't make the comparison */
1183         *result = false;                        /* suppress compiler warnings */
1184         return false;
1185 }
1186
1187 /*
1188  * Adjust a scankey's strategy and flags setting as needed for indoptions.
1189  *
1190  * We copy the appropriate indoption value into the scankey sk_flags
1191  * (shifting to avoid clobbering system-defined flag bits).  Also, if
1192  * the DESC option is set, commute (flip) the operator strategy number.
1193  *
1194  * A secondary purpose is to check for IS NULL/NOT NULL scankeys and set up
1195  * the strategy field correctly for them.
1196  *
1197  * Lastly, for ordinary scankeys (not IS NULL/NOT NULL), we check for a
1198  * NULL comparison value.  Since all btree operators are assumed strict,
1199  * a NULL means that the qual cannot be satisfied.  We return true if the
1200  * comparison value isn't NULL, or false if the scan should be abandoned.
1201  *
1202  * This function is applied to the *input* scankey structure; therefore
1203  * on a rescan we will be looking at already-processed scankeys.  Hence
1204  * we have to be careful not to re-commute the strategy if we already did it.
1205  * It's a bit ugly to modify the caller's copy of the scankey but in practice
1206  * there shouldn't be any problem, since the index's indoptions are certainly
1207  * not going to change while the scankey survives.
1208  */
1209 static bool
1210 _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption)
1211 {
1212         int                     addflags;
1213
1214         addflags = indoption[skey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
1215
1216         /*
1217          * We treat all btree operators as strict (even if they're not so marked
1218          * in pg_proc). This means that it is impossible for an operator condition
1219          * with a NULL comparison constant to succeed, and we can reject it right
1220          * away.
1221          *
1222          * However, we now also support "x IS NULL" clauses as search conditions,
1223          * so in that case keep going. The planner has not filled in any
1224          * particular strategy in this case, so set it to BTEqualStrategyNumber
1225          * --- we can treat IS NULL as an equality operator for purposes of search
1226          * strategy.
1227          *
1228          * Likewise, "x IS NOT NULL" is supported.  We treat that as either "less
1229          * than NULL" in a NULLS LAST index, or "greater than NULL" in a NULLS
1230          * FIRST index.
1231          *
1232          * Note: someday we might have to fill in sk_collation from the index
1233          * column's collation.  At the moment this is a non-issue because we'll
1234          * never actually call the comparison operator on a NULL.
1235          */
1236         if (skey->sk_flags & SK_ISNULL)
1237         {
1238                 /* SK_ISNULL shouldn't be set in a row header scankey */
1239                 Assert(!(skey->sk_flags & SK_ROW_HEADER));
1240
1241                 /* Set indoption flags in scankey (might be done already) */
1242                 skey->sk_flags |= addflags;
1243
1244                 /* Set correct strategy for IS NULL or NOT NULL search */
1245                 if (skey->sk_flags & SK_SEARCHNULL)
1246                 {
1247                         skey->sk_strategy = BTEqualStrategyNumber;
1248                         skey->sk_subtype = InvalidOid;
1249                         skey->sk_collation = InvalidOid;
1250                 }
1251                 else if (skey->sk_flags & SK_SEARCHNOTNULL)
1252                 {
1253                         if (skey->sk_flags & SK_BT_NULLS_FIRST)
1254                                 skey->sk_strategy = BTGreaterStrategyNumber;
1255                         else
1256                                 skey->sk_strategy = BTLessStrategyNumber;
1257                         skey->sk_subtype = InvalidOid;
1258                         skey->sk_collation = InvalidOid;
1259                 }
1260                 else
1261                 {
1262                         /* regular qual, so it cannot be satisfied */
1263                         return false;
1264                 }
1265
1266                 /* Needn't do the rest */
1267                 return true;
1268         }
1269
1270         /* Adjust strategy for DESC, if we didn't already */
1271         if ((addflags & SK_BT_DESC) && !(skey->sk_flags & SK_BT_DESC))
1272                 skey->sk_strategy = BTCommuteStrategyNumber(skey->sk_strategy);
1273         skey->sk_flags |= addflags;
1274
1275         /* If it's a row header, fix row member flags and strategies similarly */
1276         if (skey->sk_flags & SK_ROW_HEADER)
1277         {
1278                 ScanKey         subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1279
1280                 for (;;)
1281                 {
1282                         Assert(subkey->sk_flags & SK_ROW_MEMBER);
1283                         addflags = indoption[subkey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
1284                         if ((addflags & SK_BT_DESC) && !(subkey->sk_flags & SK_BT_DESC))
1285                                 subkey->sk_strategy = BTCommuteStrategyNumber(subkey->sk_strategy);
1286                         subkey->sk_flags |= addflags;
1287                         if (subkey->sk_flags & SK_ROW_END)
1288                                 break;
1289                         subkey++;
1290                 }
1291         }
1292
1293         return true;
1294 }
1295
1296 /*
1297  * Mark a scankey as "required to continue the scan".
1298  *
1299  * Depending on the operator type, the key may be required for both scan
1300  * directions or just one.  Also, if the key is a row comparison header,
1301  * we have to mark its first subsidiary ScanKey as required.  (Subsequent
1302  * subsidiary ScanKeys are normally for lower-order columns, and thus
1303  * cannot be required, since they're after the first non-equality scankey.)
1304  *
1305  * Note: when we set required-key flag bits in a subsidiary scankey, we are
1306  * scribbling on a data structure belonging to the index AM's caller, not on
1307  * our private copy.  This should be OK because the marking will not change
1308  * from scan to scan within a query, and so we'd just re-mark the same way
1309  * anyway on a rescan.  Something to keep an eye on though.
1310  */
1311 static void
1312 _bt_mark_scankey_required(ScanKey skey)
1313 {
1314         int                     addflags;
1315
1316         switch (skey->sk_strategy)
1317         {
1318                 case BTLessStrategyNumber:
1319                 case BTLessEqualStrategyNumber:
1320                         addflags = SK_BT_REQFWD;
1321                         break;
1322                 case BTEqualStrategyNumber:
1323                         addflags = SK_BT_REQFWD | SK_BT_REQBKWD;
1324                         break;
1325                 case BTGreaterEqualStrategyNumber:
1326                 case BTGreaterStrategyNumber:
1327                         addflags = SK_BT_REQBKWD;
1328                         break;
1329                 default:
1330                         elog(ERROR, "unrecognized StrategyNumber: %d",
1331                                  (int) skey->sk_strategy);
1332                         addflags = 0;           /* keep compiler quiet */
1333                         break;
1334         }
1335
1336         skey->sk_flags |= addflags;
1337
1338         if (skey->sk_flags & SK_ROW_HEADER)
1339         {
1340                 ScanKey         subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1341
1342                 /* First subkey should be same column/operator as the header */
1343                 Assert(subkey->sk_flags & SK_ROW_MEMBER);
1344                 Assert(subkey->sk_attno == skey->sk_attno);
1345                 Assert(subkey->sk_strategy == skey->sk_strategy);
1346                 subkey->sk_flags |= addflags;
1347         }
1348 }
1349
1350 /*
1351  * Test whether an indextuple satisfies all the scankey conditions.
1352  *
1353  * If so, return the address of the index tuple on the index page.
1354  * If not, return NULL.
1355  *
1356  * If the tuple fails to pass the qual, we also determine whether there's
1357  * any need to continue the scan beyond this tuple, and set *continuescan
1358  * accordingly.  See comments for _bt_preprocess_keys(), above, about how
1359  * this is done.
1360  *
1361  * scan: index scan descriptor (containing a search-type scankey)
1362  * page: buffer page containing index tuple
1363  * offnum: offset number of index tuple (must be a valid item!)
1364  * dir: direction we are scanning in
1365  * continuescan: output parameter (will be set correctly in all cases)
1366  *
1367  * Caller must hold pin and lock on the index page.
1368  */
1369 IndexTuple
1370 _bt_checkkeys(IndexScanDesc scan,
1371                           Page page, OffsetNumber offnum,
1372                           ScanDirection dir, bool *continuescan)
1373 {
1374         ItemId          iid = PageGetItemId(page, offnum);
1375         bool            tuple_alive;
1376         IndexTuple      tuple;
1377         TupleDesc       tupdesc;
1378         BTScanOpaque so;
1379         int                     keysz;
1380         int                     ikey;
1381         ScanKey         key;
1382
1383         *continuescan = true;           /* default assumption */
1384
1385         /*
1386          * If the scan specifies not to return killed tuples, then we treat a
1387          * killed tuple as not passing the qual.  Most of the time, it's a win to
1388          * not bother examining the tuple's index keys, but just return
1389          * immediately with continuescan = true to proceed to the next tuple.
1390          * However, if this is the last tuple on the page, we should check the
1391          * index keys to prevent uselessly advancing to the next page.
1392          */
1393         if (scan->ignore_killed_tuples && ItemIdIsDead(iid))
1394         {
1395                 /* return immediately if there are more tuples on the page */
1396                 if (ScanDirectionIsForward(dir))
1397                 {
1398                         if (offnum < PageGetMaxOffsetNumber(page))
1399                                 return NULL;
1400                 }
1401                 else
1402                 {
1403                         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1404
1405                         if (offnum > P_FIRSTDATAKEY(opaque))
1406                                 return NULL;
1407                 }
1408
1409                 /*
1410                  * OK, we want to check the keys so we can set continuescan correctly,
1411                  * but we'll return NULL even if the tuple passes the key tests.
1412                  */
1413                 tuple_alive = false;
1414         }
1415         else
1416                 tuple_alive = true;
1417
1418         tuple = (IndexTuple) PageGetItem(page, iid);
1419
1420         tupdesc = RelationGetDescr(scan->indexRelation);
1421         so = (BTScanOpaque) scan->opaque;
1422         keysz = so->numberOfKeys;
1423
1424         for (key = so->keyData, ikey = 0; ikey < keysz; key++, ikey++)
1425         {
1426                 Datum           datum;
1427                 bool            isNull;
1428                 Datum           test;
1429
1430                 Assert(key->sk_attno <= BTreeTupleGetNAtts(tuple, scan->indexRelation));
1431                 /* row-comparison keys need special processing */
1432                 if (key->sk_flags & SK_ROW_HEADER)
1433                 {
1434                         if (_bt_check_rowcompare(key, tuple, tupdesc, dir, continuescan))
1435                                 continue;
1436                         return NULL;
1437                 }
1438
1439                 datum = index_getattr(tuple,
1440                                                           key->sk_attno,
1441                                                           tupdesc,
1442                                                           &isNull);
1443
1444                 if (key->sk_flags & SK_ISNULL)
1445                 {
1446                         /* Handle IS NULL/NOT NULL tests */
1447                         if (key->sk_flags & SK_SEARCHNULL)
1448                         {
1449                                 if (isNull)
1450                                         continue;       /* tuple satisfies this qual */
1451                         }
1452                         else
1453                         {
1454                                 Assert(key->sk_flags & SK_SEARCHNOTNULL);
1455                                 if (!isNull)
1456                                         continue;       /* tuple satisfies this qual */
1457                         }
1458
1459                         /*
1460                          * Tuple fails this qual.  If it's a required qual for the current
1461                          * scan direction, then we can conclude no further tuples will
1462                          * pass, either.
1463                          */
1464                         if ((key->sk_flags & SK_BT_REQFWD) &&
1465                                 ScanDirectionIsForward(dir))
1466                                 *continuescan = false;
1467                         else if ((key->sk_flags & SK_BT_REQBKWD) &&
1468                                          ScanDirectionIsBackward(dir))
1469                                 *continuescan = false;
1470
1471                         /*
1472                          * In any case, this indextuple doesn't match the qual.
1473                          */
1474                         return NULL;
1475                 }
1476
1477                 if (isNull)
1478                 {
1479                         if (key->sk_flags & SK_BT_NULLS_FIRST)
1480                         {
1481                                 /*
1482                                  * Since NULLs are sorted before non-NULLs, we know we have
1483                                  * reached the lower limit of the range of values for this
1484                                  * index attr.  On a backward scan, we can stop if this qual
1485                                  * is one of the "must match" subset.  We can stop regardless
1486                                  * of whether the qual is > or <, so long as it's required,
1487                                  * because it's not possible for any future tuples to pass. On
1488                                  * a forward scan, however, we must keep going, because we may
1489                                  * have initially positioned to the start of the index.
1490                                  */
1491                                 if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1492                                         ScanDirectionIsBackward(dir))
1493                                         *continuescan = false;
1494                         }
1495                         else
1496                         {
1497                                 /*
1498                                  * Since NULLs are sorted after non-NULLs, we know we have
1499                                  * reached the upper limit of the range of values for this
1500                                  * index attr.  On a forward scan, we can stop if this qual is
1501                                  * one of the "must match" subset.  We can stop regardless of
1502                                  * whether the qual is > or <, so long as it's required,
1503                                  * because it's not possible for any future tuples to pass. On
1504                                  * a backward scan, however, we must keep going, because we
1505                                  * may have initially positioned to the end of the index.
1506                                  */
1507                                 if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1508                                         ScanDirectionIsForward(dir))
1509                                         *continuescan = false;
1510                         }
1511
1512                         /*
1513                          * In any case, this indextuple doesn't match the qual.
1514                          */
1515                         return NULL;
1516                 }
1517
1518                 test = FunctionCall2Coll(&key->sk_func, key->sk_collation,
1519                                                                  datum, key->sk_argument);
1520
1521                 if (!DatumGetBool(test))
1522                 {
1523                         /*
1524                          * Tuple fails this qual.  If it's a required qual for the current
1525                          * scan direction, then we can conclude no further tuples will
1526                          * pass, either.
1527                          *
1528                          * Note: because we stop the scan as soon as any required equality
1529                          * qual fails, it is critical that equality quals be used for the
1530                          * initial positioning in _bt_first() when they are available. See
1531                          * comments in _bt_first().
1532                          */
1533                         if ((key->sk_flags & SK_BT_REQFWD) &&
1534                                 ScanDirectionIsForward(dir))
1535                                 *continuescan = false;
1536                         else if ((key->sk_flags & SK_BT_REQBKWD) &&
1537                                          ScanDirectionIsBackward(dir))
1538                                 *continuescan = false;
1539
1540                         /*
1541                          * In any case, this indextuple doesn't match the qual.
1542                          */
1543                         return NULL;
1544                 }
1545         }
1546
1547         /* Check for failure due to it being a killed tuple. */
1548         if (!tuple_alive)
1549                 return NULL;
1550
1551         /* If we get here, the tuple passes all index quals. */
1552         return tuple;
1553 }
1554
1555 /*
1556  * Test whether an indextuple satisfies a row-comparison scan condition.
1557  *
1558  * Return true if so, false if not.  If not, also clear *continuescan if
1559  * it's not possible for any future tuples in the current scan direction
1560  * to pass the qual.
1561  *
1562  * This is a subroutine for _bt_checkkeys, which see for more info.
1563  */
1564 static bool
1565 _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc,
1566                                          ScanDirection dir, bool *continuescan)
1567 {
1568         ScanKey         subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1569         int32           cmpresult = 0;
1570         bool            result;
1571
1572         /* First subkey should be same as the header says */
1573         Assert(subkey->sk_attno == skey->sk_attno);
1574
1575         /* Loop over columns of the row condition */
1576         for (;;)
1577         {
1578                 Datum           datum;
1579                 bool            isNull;
1580
1581                 Assert(subkey->sk_flags & SK_ROW_MEMBER);
1582
1583                 datum = index_getattr(tuple,
1584                                                           subkey->sk_attno,
1585                                                           tupdesc,
1586                                                           &isNull);
1587
1588                 if (isNull)
1589                 {
1590                         if (subkey->sk_flags & SK_BT_NULLS_FIRST)
1591                         {
1592                                 /*
1593                                  * Since NULLs are sorted before non-NULLs, we know we have
1594                                  * reached the lower limit of the range of values for this
1595                                  * index attr.  On a backward scan, we can stop if this qual
1596                                  * is one of the "must match" subset.  We can stop regardless
1597                                  * of whether the qual is > or <, so long as it's required,
1598                                  * because it's not possible for any future tuples to pass. On
1599                                  * a forward scan, however, we must keep going, because we may
1600                                  * have initially positioned to the start of the index.
1601                                  */
1602                                 if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1603                                         ScanDirectionIsBackward(dir))
1604                                         *continuescan = false;
1605                         }
1606                         else
1607                         {
1608                                 /*
1609                                  * Since NULLs are sorted after non-NULLs, we know we have
1610                                  * reached the upper limit of the range of values for this
1611                                  * index attr.  On a forward scan, we can stop if this qual is
1612                                  * one of the "must match" subset.  We can stop regardless of
1613                                  * whether the qual is > or <, so long as it's required,
1614                                  * because it's not possible for any future tuples to pass. On
1615                                  * a backward scan, however, we must keep going, because we
1616                                  * may have initially positioned to the end of the index.
1617                                  */
1618                                 if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1619                                         ScanDirectionIsForward(dir))
1620                                         *continuescan = false;
1621                         }
1622
1623                         /*
1624                          * In any case, this indextuple doesn't match the qual.
1625                          */
1626                         return false;
1627                 }
1628
1629                 if (subkey->sk_flags & SK_ISNULL)
1630                 {
1631                         /*
1632                          * Unlike the simple-scankey case, this isn't a disallowed case.
1633                          * But it can never match.  If all the earlier row comparison
1634                          * columns are required for the scan direction, we can stop the
1635                          * scan, because there can't be another tuple that will succeed.
1636                          */
1637                         if (subkey != (ScanKey) DatumGetPointer(skey->sk_argument))
1638                                 subkey--;
1639                         if ((subkey->sk_flags & SK_BT_REQFWD) &&
1640                                 ScanDirectionIsForward(dir))
1641                                 *continuescan = false;
1642                         else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1643                                          ScanDirectionIsBackward(dir))
1644                                 *continuescan = false;
1645                         return false;
1646                 }
1647
1648                 /* Perform the test --- three-way comparison not bool operator */
1649                 cmpresult = DatumGetInt32(FunctionCall2Coll(&subkey->sk_func,
1650                                                                                                         subkey->sk_collation,
1651                                                                                                         datum,
1652                                                                                                         subkey->sk_argument));
1653
1654                 if (subkey->sk_flags & SK_BT_DESC)
1655                         cmpresult = -cmpresult;
1656
1657                 /* Done comparing if unequal, else advance to next column */
1658                 if (cmpresult != 0)
1659                         break;
1660
1661                 if (subkey->sk_flags & SK_ROW_END)
1662                         break;
1663                 subkey++;
1664         }
1665
1666         /*
1667          * At this point cmpresult indicates the overall result of the row
1668          * comparison, and subkey points to the deciding column (or the last
1669          * column if the result is "=").
1670          */
1671         switch (subkey->sk_strategy)
1672         {
1673                         /* EQ and NE cases aren't allowed here */
1674                 case BTLessStrategyNumber:
1675                         result = (cmpresult < 0);
1676                         break;
1677                 case BTLessEqualStrategyNumber:
1678                         result = (cmpresult <= 0);
1679                         break;
1680                 case BTGreaterEqualStrategyNumber:
1681                         result = (cmpresult >= 0);
1682                         break;
1683                 case BTGreaterStrategyNumber:
1684                         result = (cmpresult > 0);
1685                         break;
1686                 default:
1687                         elog(ERROR, "unrecognized RowCompareType: %d",
1688                                  (int) subkey->sk_strategy);
1689                         result = 0;                     /* keep compiler quiet */
1690                         break;
1691         }
1692
1693         if (!result)
1694         {
1695                 /*
1696                  * Tuple fails this qual.  If it's a required qual for the current
1697                  * scan direction, then we can conclude no further tuples will pass,
1698                  * either.  Note we have to look at the deciding column, not
1699                  * necessarily the first or last column of the row condition.
1700                  */
1701                 if ((subkey->sk_flags & SK_BT_REQFWD) &&
1702                         ScanDirectionIsForward(dir))
1703                         *continuescan = false;
1704                 else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1705                                  ScanDirectionIsBackward(dir))
1706                         *continuescan = false;
1707         }
1708
1709         return result;
1710 }
1711
1712 /*
1713  * _bt_killitems - set LP_DEAD state for items an indexscan caller has
1714  * told us were killed
1715  *
1716  * scan->opaque, referenced locally through so, contains information about the
1717  * current page and killed tuples thereon (generally, this should only be
1718  * called if so->numKilled > 0).
1719  *
1720  * The caller does not have a lock on the page and may or may not have the
1721  * page pinned in a buffer.  Note that read-lock is sufficient for setting
1722  * LP_DEAD status (which is only a hint).
1723  *
1724  * We match items by heap TID before assuming they are the right ones to
1725  * delete.  We cope with cases where items have moved right due to insertions.
1726  * If an item has moved off the current page due to a split, we'll fail to
1727  * find it and do nothing (this is not an error case --- we assume the item
1728  * will eventually get marked in a future indexscan).
1729  *
1730  * Note that if we hold a pin on the target page continuously from initially
1731  * reading the items until applying this function, VACUUM cannot have deleted
1732  * any items from the page, and so there is no need to search left from the
1733  * recorded offset.  (This observation also guarantees that the item is still
1734  * the right one to delete, which might otherwise be questionable since heap
1735  * TIDs can get recycled.)      This holds true even if the page has been modified
1736  * by inserts and page splits, so there is no need to consult the LSN.
1737  *
1738  * If the pin was released after reading the page, then we re-read it.  If it
1739  * has been modified since we read it (as determined by the LSN), we dare not
1740  * flag any entries because it is possible that the old entry was vacuumed
1741  * away and the TID was re-used by a completely different heap tuple.
1742  */
1743 void
1744 _bt_killitems(IndexScanDesc scan)
1745 {
1746         BTScanOpaque so = (BTScanOpaque) scan->opaque;
1747         Page            page;
1748         BTPageOpaque opaque;
1749         OffsetNumber minoff;
1750         OffsetNumber maxoff;
1751         int                     i;
1752         int                     numKilled = so->numKilled;
1753         bool            killedsomething = false;
1754
1755         Assert(BTScanPosIsValid(so->currPos));
1756
1757         /*
1758          * Always reset the scan state, so we don't look for same items on other
1759          * pages.
1760          */
1761         so->numKilled = 0;
1762
1763         if (BTScanPosIsPinned(so->currPos))
1764         {
1765                 /*
1766                  * We have held the pin on this page since we read the index tuples,
1767                  * so all we need to do is lock it.  The pin will have prevented
1768                  * re-use of any TID on the page, so there is no need to check the
1769                  * LSN.
1770                  */
1771                 LockBuffer(so->currPos.buf, BT_READ);
1772
1773                 page = BufferGetPage(so->currPos.buf);
1774         }
1775         else
1776         {
1777                 Buffer          buf;
1778
1779                 /* Attempt to re-read the buffer, getting pin and lock. */
1780                 buf = _bt_getbuf(scan->indexRelation, so->currPos.currPage, BT_READ);
1781
1782                 /* It might not exist anymore; in which case we can't hint it. */
1783                 if (!BufferIsValid(buf))
1784                         return;
1785
1786                 page = BufferGetPage(buf);
1787                 if (BufferGetLSNAtomic(buf) == so->currPos.lsn)
1788                         so->currPos.buf = buf;
1789                 else
1790                 {
1791                         /* Modified while not pinned means hinting is not safe. */
1792                         _bt_relbuf(scan->indexRelation, buf);
1793                         return;
1794                 }
1795         }
1796
1797         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1798         minoff = P_FIRSTDATAKEY(opaque);
1799         maxoff = PageGetMaxOffsetNumber(page);
1800
1801         for (i = 0; i < numKilled; i++)
1802         {
1803                 int                     itemIndex = so->killedItems[i];
1804                 BTScanPosItem *kitem = &so->currPos.items[itemIndex];
1805                 OffsetNumber offnum = kitem->indexOffset;
1806
1807                 Assert(itemIndex >= so->currPos.firstItem &&
1808                            itemIndex <= so->currPos.lastItem);
1809                 if (offnum < minoff)
1810                         continue;                       /* pure paranoia */
1811                 while (offnum <= maxoff)
1812                 {
1813                         ItemId          iid = PageGetItemId(page, offnum);
1814                         IndexTuple      ituple = (IndexTuple) PageGetItem(page, iid);
1815
1816                         if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
1817                         {
1818                                 /* found the item */
1819                                 ItemIdMarkDead(iid);
1820                                 killedsomething = true;
1821                                 break;                  /* out of inner search loop */
1822                         }
1823                         offnum = OffsetNumberNext(offnum);
1824                 }
1825         }
1826
1827         /*
1828          * Since this can be redone later if needed, mark as dirty hint.
1829          *
1830          * Whenever we mark anything LP_DEAD, we also set the page's
1831          * BTP_HAS_GARBAGE flag, which is likewise just a hint.
1832          */
1833         if (killedsomething)
1834         {
1835                 opaque->btpo_flags |= BTP_HAS_GARBAGE;
1836                 MarkBufferDirtyHint(so->currPos.buf, true);
1837         }
1838
1839         LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
1840 }
1841
1842
1843 /*
1844  * The following routines manage a shared-memory area in which we track
1845  * assignment of "vacuum cycle IDs" to currently-active btree vacuuming
1846  * operations.  There is a single counter which increments each time we
1847  * start a vacuum to assign it a cycle ID.  Since multiple vacuums could
1848  * be active concurrently, we have to track the cycle ID for each active
1849  * vacuum; this requires at most MaxBackends entries (usually far fewer).
1850  * We assume at most one vacuum can be active for a given index.
1851  *
1852  * Access to the shared memory area is controlled by BtreeVacuumLock.
1853  * In principle we could use a separate lmgr locktag for each index,
1854  * but a single LWLock is much cheaper, and given the short time that
1855  * the lock is ever held, the concurrency hit should be minimal.
1856  */
1857
1858 typedef struct BTOneVacInfo
1859 {
1860         LockRelId       relid;                  /* global identifier of an index */
1861         BTCycleId       cycleid;                /* cycle ID for its active VACUUM */
1862 } BTOneVacInfo;
1863
1864 typedef struct BTVacInfo
1865 {
1866         BTCycleId       cycle_ctr;              /* cycle ID most recently assigned */
1867         int                     num_vacuums;    /* number of currently active VACUUMs */
1868         int                     max_vacuums;    /* allocated length of vacuums[] array */
1869         BTOneVacInfo vacuums[FLEXIBLE_ARRAY_MEMBER];
1870 } BTVacInfo;
1871
1872 static BTVacInfo *btvacinfo;
1873
1874
1875 /*
1876  * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index,
1877  *              or zero if there is no active VACUUM
1878  *
1879  * Note: for correct interlocking, the caller must already hold pin and
1880  * exclusive lock on each buffer it will store the cycle ID into.  This
1881  * ensures that even if a VACUUM starts immediately afterwards, it cannot
1882  * process those pages until the page split is complete.
1883  */
1884 BTCycleId
1885 _bt_vacuum_cycleid(Relation rel)
1886 {
1887         BTCycleId       result = 0;
1888         int                     i;
1889
1890         /* Share lock is enough since this is a read-only operation */
1891         LWLockAcquire(BtreeVacuumLock, LW_SHARED);
1892
1893         for (i = 0; i < btvacinfo->num_vacuums; i++)
1894         {
1895                 BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1896
1897                 if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1898                         vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1899                 {
1900                         result = vac->cycleid;
1901                         break;
1902                 }
1903         }
1904
1905         LWLockRelease(BtreeVacuumLock);
1906         return result;
1907 }
1908
1909 /*
1910  * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation
1911  *
1912  * Note: the caller must guarantee that it will eventually call
1913  * _bt_end_vacuum, else we'll permanently leak an array slot.  To ensure
1914  * that this happens even in elog(FATAL) scenarios, the appropriate coding
1915  * is not just a PG_TRY, but
1916  *              PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel))
1917  */
1918 BTCycleId
1919 _bt_start_vacuum(Relation rel)
1920 {
1921         BTCycleId       result;
1922         int                     i;
1923         BTOneVacInfo *vac;
1924
1925         LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1926
1927         /*
1928          * Assign the next cycle ID, being careful to avoid zero as well as the
1929          * reserved high values.
1930          */
1931         result = ++(btvacinfo->cycle_ctr);
1932         if (result == 0 || result > MAX_BT_CYCLE_ID)
1933                 result = btvacinfo->cycle_ctr = 1;
1934
1935         /* Let's just make sure there's no entry already for this index */
1936         for (i = 0; i < btvacinfo->num_vacuums; i++)
1937         {
1938                 vac = &btvacinfo->vacuums[i];
1939                 if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1940                         vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1941                 {
1942                         /*
1943                          * Unlike most places in the backend, we have to explicitly
1944                          * release our LWLock before throwing an error.  This is because
1945                          * we expect _bt_end_vacuum() to be called before transaction
1946                          * abort cleanup can run to release LWLocks.
1947                          */
1948                         LWLockRelease(BtreeVacuumLock);
1949                         elog(ERROR, "multiple active vacuums for index \"%s\"",
1950                                  RelationGetRelationName(rel));
1951                 }
1952         }
1953
1954         /* OK, add an entry */
1955         if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums)
1956         {
1957                 LWLockRelease(BtreeVacuumLock);
1958                 elog(ERROR, "out of btvacinfo slots");
1959         }
1960         vac = &btvacinfo->vacuums[btvacinfo->num_vacuums];
1961         vac->relid = rel->rd_lockInfo.lockRelId;
1962         vac->cycleid = result;
1963         btvacinfo->num_vacuums++;
1964
1965         LWLockRelease(BtreeVacuumLock);
1966         return result;
1967 }
1968
1969 /*
1970  * _bt_end_vacuum --- mark a btree VACUUM operation as done
1971  *
1972  * Note: this is deliberately coded not to complain if no entry is found;
1973  * this allows the caller to put PG_TRY around the start_vacuum operation.
1974  */
1975 void
1976 _bt_end_vacuum(Relation rel)
1977 {
1978         int                     i;
1979
1980         LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1981
1982         /* Find the array entry */
1983         for (i = 0; i < btvacinfo->num_vacuums; i++)
1984         {
1985                 BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1986
1987                 if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1988                         vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1989                 {
1990                         /* Remove it by shifting down the last entry */
1991                         *vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1];
1992                         btvacinfo->num_vacuums--;
1993                         break;
1994                 }
1995         }
1996
1997         LWLockRelease(BtreeVacuumLock);
1998 }
1999
2000 /*
2001  * _bt_end_vacuum wrapped as an on_shmem_exit callback function
2002  */
2003 void
2004 _bt_end_vacuum_callback(int code, Datum arg)
2005 {
2006         _bt_end_vacuum((Relation) DatumGetPointer(arg));
2007 }
2008
2009 /*
2010  * BTreeShmemSize --- report amount of shared memory space needed
2011  */
2012 Size
2013 BTreeShmemSize(void)
2014 {
2015         Size            size;
2016
2017         size = offsetof(BTVacInfo, vacuums);
2018         size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo)));
2019         return size;
2020 }
2021
2022 /*
2023  * BTreeShmemInit --- initialize this module's shared memory
2024  */
2025 void
2026 BTreeShmemInit(void)
2027 {
2028         bool            found;
2029
2030         btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State",
2031                                                                                           BTreeShmemSize(),
2032                                                                                           &found);
2033
2034         if (!IsUnderPostmaster)
2035         {
2036                 /* Initialize shared memory area */
2037                 Assert(!found);
2038
2039                 /*
2040                  * It doesn't really matter what the cycle counter starts at, but
2041                  * having it always start the same doesn't seem good.  Seed with
2042                  * low-order bits of time() instead.
2043                  */
2044                 btvacinfo->cycle_ctr = (BTCycleId) time(NULL);
2045
2046                 btvacinfo->num_vacuums = 0;
2047                 btvacinfo->max_vacuums = MaxBackends;
2048         }
2049         else
2050                 Assert(found);
2051 }
2052
2053 bytea *
2054 btoptions(Datum reloptions, bool validate)
2055 {
2056         return default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
2057 }
2058
2059 /*
2060  *      btproperty() -- Check boolean properties of indexes.
2061  *
2062  * This is optional, but handling AMPROP_RETURNABLE here saves opening the rel
2063  * to call btcanreturn.
2064  */
2065 bool
2066 btproperty(Oid index_oid, int attno,
2067                    IndexAMProperty prop, const char *propname,
2068                    bool *res, bool *isnull)
2069 {
2070         switch (prop)
2071         {
2072                 case AMPROP_RETURNABLE:
2073                         /* answer only for columns, not AM or whole index */
2074                         if (attno == 0)
2075                                 return false;
2076                         /* otherwise, btree can always return data */
2077                         *res = true;
2078                         return true;
2079
2080                 default:
2081                         return false;           /* punt to generic code */
2082         }
2083 }
2084
2085 /*
2086  *      _bt_nonkey_truncate() -- create tuple without non-key suffix attributes.
2087  *
2088  * Returns truncated index tuple allocated in caller's memory context, with key
2089  * attributes copied from caller's itup argument.  Currently, suffix truncation
2090  * is only performed to create pivot tuples in INCLUDE indexes, but some day it
2091  * could be generalized to remove suffix attributes after the first
2092  * distinguishing key attribute.
2093  *
2094  * Truncated tuple is guaranteed to be no larger than the original, which is
2095  * important for staying under the 1/3 of a page restriction on tuple size.
2096  *
2097  * Note that returned tuple's t_tid offset will hold the number of attributes
2098  * present, so the original item pointer offset is not represented.  Caller
2099  * should only change truncated tuple's downlink.
2100  */
2101 IndexTuple
2102 _bt_nonkey_truncate(Relation rel, IndexTuple itup)
2103 {
2104         int                             nkeyattrs = IndexRelationGetNumberOfKeyAttributes(rel);
2105         IndexTuple              truncated;
2106
2107         /*
2108          * We should only ever truncate leaf index tuples, which must have both key
2109          * and non-key attributes.  It's never okay to truncate a second time.
2110          */
2111         Assert(BTreeTupleGetNAtts(itup, rel) ==
2112                    IndexRelationGetNumberOfAttributes(rel));
2113
2114         truncated = index_truncate_tuple(RelationGetDescr(rel), itup, nkeyattrs);
2115         BTreeTupleSetNAtts(truncated, nkeyattrs);
2116
2117         return truncated;
2118 }
2119
2120 /*
2121  *  _bt_check_natts() -- Verify tuple has expected number of attributes.
2122  *
2123  * Returns value indicating if the expected number of attributes were found
2124  * for a particular offset on page.  This can be used as a general purpose
2125  * sanity check.
2126  *
2127  * Testing a tuple directly with BTreeTupleGetNAtts() should generally be
2128  * preferred to calling here.  That's usually more convenient, and is always
2129  * more explicit.  Call here instead when offnum's tuple may be a negative
2130  * infinity tuple that uses the pre-v11 on-disk representation, or when a low
2131  * context check is appropriate.
2132  */
2133 bool
2134 _bt_check_natts(Relation rel, Page page, OffsetNumber offnum)
2135 {
2136         int16                   natts = IndexRelationGetNumberOfAttributes(rel);
2137         int16                   nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
2138         BTPageOpaque    opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2139         IndexTuple              itup;
2140
2141         /*
2142          * We cannot reliably test a deleted or half-deleted page, since they have
2143          * dummy high keys
2144          */
2145         if (P_IGNORE(opaque))
2146                 return true;
2147
2148         Assert(offnum >= FirstOffsetNumber &&
2149                    offnum <= PageGetMaxOffsetNumber(page));
2150         /*
2151          * Mask allocated for number of keys in index tuple must be able to fit
2152          * maximum possible number of index attributes
2153          */
2154         StaticAssertStmt(BT_N_KEYS_OFFSET_MASK >= INDEX_MAX_KEYS,
2155                                          "BT_N_KEYS_OFFSET_MASK can't fit INDEX_MAX_KEYS");
2156
2157         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
2158
2159         if (P_ISLEAF(opaque))
2160         {
2161                 if (offnum >= P_FIRSTDATAKEY(opaque))
2162                 {
2163                         /*
2164                          * Leaf tuples that are not the page high key (non-pivot tuples)
2165                          * should never be truncated
2166                          */
2167                         return BTreeTupleGetNAtts(itup, rel) == natts;
2168                 }
2169                 else
2170                 {
2171                         /*
2172                          * Rightmost page doesn't contain a page high key, so tuple was
2173                          * checked above as ordinary leaf tuple
2174                          */
2175                         Assert(!P_RIGHTMOST(opaque));
2176
2177                         /* Page high key tuple contains only key attributes */
2178                         return BTreeTupleGetNAtts(itup, rel) == nkeyatts;
2179                 }
2180         }
2181         else  /* !P_ISLEAF(opaque) */
2182         {
2183                 if (offnum == P_FIRSTDATAKEY(opaque))
2184                 {
2185                         /*
2186                          * The first tuple on any internal page (possibly the first after
2187                          * its high key) is its negative infinity tuple.  Negative infinity
2188                          * tuples are always truncated to zero attributes.  They are a
2189                          * particular kind of pivot tuple.
2190                          *
2191                          * The number of attributes won't be explicitly represented if the
2192                          * negative infinity tuple was generated during a page split that
2193                          * occurred with a version of Postgres before v11.  There must be a
2194                          * problem when there is an explicit representation that is
2195                          * non-zero, or when there is no explicit representation and the
2196                          * tuple is evidently not a pre-pg_upgrade tuple.
2197                          *
2198                          * Prior to v11, downlinks always had P_HIKEY as their offset.  Use
2199                          * that to decide if the tuple is a pre-v11 tuple.
2200                          */
2201                         return BTreeTupleGetNAtts(itup, rel) == 0 ||
2202                                         ((itup->t_info & INDEX_ALT_TID_MASK) == 0 &&
2203                                          ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
2204                 }
2205                 else
2206                 {
2207                         /*
2208                          * Tuple contains only key attributes despite on is it page high
2209                          * key or not
2210                          */
2211                         return BTreeTupleGetNAtts(itup, rel) == nkeyatts;
2212                 }
2213
2214         }
2215 }