]> granicus.if.org Git - postgresql/commitdiff
Clean up and document btree code for ordering keys. Neat stuff,
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 25 Jul 2000 04:47:59 +0000 (04:47 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 25 Jul 2000 04:47:59 +0000 (04:47 +0000)
actually, but who could understand it with no comments?  Fix bug
while at it: _bt_orderkeys would try to invoke comparisons on
NULL inputs, given the right sort of redundant quals.

src/backend/access/nbtree/nbtsearch.c
src/backend/access/nbtree/nbtutils.c
src/include/access/nbtree.h

index 49aec3b23d9b05788b9b9302d41df90c94fa11ed..8b0804d28ebbfb33ab539e41bf88cfd95d4669cc 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.61 2000/07/21 06:42:32 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.62 2000/07/25 04:47:58 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -374,7 +374,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
        BTItem          btitem;
        IndexTuple      itup;
        BTScanOpaque so;
-       Size            keysok;
+       bool            continuescan;
 
        rel = scan->relation;
        so = (BTScanOpaque) scan->opaque;
@@ -396,16 +396,14 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
                btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
                itup = &btitem->bti_itup;
 
-               if (_bt_checkkeys(scan, itup, &keysok))
+               if (_bt_checkkeys(scan, itup, dir, &continuescan))
                {
                        /* tuple passes all scan key conditions, so return it */
-                       Assert(keysok == so->numberOfKeys);
                        return FormRetrieveIndexResult(current, &(itup->t_tid));
                }
 
                /* This tuple doesn't pass, but there might be more that do */
-       } while (keysok >= so->numberOfFirstKeys ||
-                        (keysok == ((Size) -1) && ScanDirectionIsBackward(dir)));
+       } while (continuescan);
 
        /* No more items, so close down the current-item info */
        ItemPointerSetInvalid(current);
@@ -442,11 +440,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        RegProcedure proc;
        int32           result;
        BTScanOpaque so;
-       Size            keysok;
-       bool            strategyCheck;
-       ScanKey         scankeys = 0;
+       bool            continuescan;
+       ScanKey         scankeys = NULL;
        int                     keysCount = 0;
-       int                *nKeyIs = 0;
+       int                *nKeyIs = NULL;
        int                     i,
                                j;
        StrategyNumber strat_total;
@@ -455,71 +452,74 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        so = (BTScanOpaque) scan->opaque;
 
        /*
-        * Order the keys in the qualification and be sure that the scan
-        * exploits the tree order.
+        * Order the scan keys in our canonical fashion and eliminate any
+        * redundant keys.
         */
-       so->numberOfFirstKeys = 0;      /* may be changed by _bt_orderkeys */
-       so->qual_ok = 1;                        /* may be changed by _bt_orderkeys */
-       scan->scanFromEnd = false;
-       strategyCheck = false;
-       if (so->numberOfKeys > 0)
-       {
-               _bt_orderkeys(rel, so);
+       _bt_orderkeys(rel, so);
 
-               if (so->qual_ok)
-                       strategyCheck = true;
-       }
+       /*
+        * Quit now if _bt_orderkeys() discovered that the scan keys can
+        * never be satisfied (eg, x == 1 AND x > 2).
+        */
+       if (! so->qual_ok)
+               return (RetrieveIndexResult) NULL;
+
+       /*
+        * Examine the scan keys to discover where we need to start the scan.
+        */
+       scan->scanFromEnd = false;
        strat_total = BTEqualStrategyNumber;
-       if (strategyCheck)
+       if (so->numberOfKeys > 0)
        {
-               AttrNumber      attno;
-
                nKeyIs = (int *) palloc(so->numberOfKeys * sizeof(int));
                for (i = 0; i < so->numberOfKeys; i++)
                {
-                       attno = so->keyData[i].sk_attno;
-                       if (attno == keysCount)
+                       AttrNumber      attno = so->keyData[i].sk_attno;
+
+                       /* ignore keys for already-determined attrs */
+                       if (attno <= keysCount)
                                continue;
+                       /* if we didn't find a boundary for the preceding attr, quit */
                        if (attno > keysCount + 1)
                                break;
                        strat = _bt_getstrat(rel, attno,
                                                                 so->keyData[i].sk_procedure);
+                       /*
+                        * Can we use this key as a starting boundary for this attr?
+                        *
+                        * We can use multiple keys if they look like, say, = >= =
+                        * but we have to stop after accepting a > or < boundary.
+                        */
                        if (strat == strat_total ||
                                strat == BTEqualStrategyNumber)
                        {
                                nKeyIs[keysCount++] = i;
-                               continue;
                        }
-                       if (ScanDirectionIsBackward(dir) &&
-                               (strat == BTLessStrategyNumber ||
-                                strat == BTLessEqualStrategyNumber))
+                       else if (ScanDirectionIsBackward(dir) &&
+                                        (strat == BTLessStrategyNumber ||
+                                         strat == BTLessEqualStrategyNumber))
                        {
                                nKeyIs[keysCount++] = i;
                                strat_total = strat;
                                if (strat == BTLessStrategyNumber)
                                        break;
-                               continue;
                        }
-                       if (ScanDirectionIsForward(dir) &&
-                               (strat == BTGreaterStrategyNumber ||
-                                strat == BTGreaterEqualStrategyNumber))
+                       else if (ScanDirectionIsForward(dir) &&
+                                        (strat == BTGreaterStrategyNumber ||
+                                         strat == BTGreaterEqualStrategyNumber))
                        {
                                nKeyIs[keysCount++] = i;
                                strat_total = strat;
                                if (strat == BTGreaterStrategyNumber)
                                        break;
-                               continue;
                        }
                }
-               if (!keysCount)
+               if (keysCount == 0)
                        scan->scanFromEnd = true;
        }
        else
                scan->scanFromEnd = true;
 
-       if (so->qual_ok == 0)
-               return (RetrieveIndexResult) NULL;
-
        /* if we just need to walk down one edge of the tree, do that */
        if (scan->scanFromEnd)
        {
@@ -529,16 +529,14 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        }
 
        /*
-        * Okay, we want something more complicated.  What we'll do is use the
-        * first item in the scan key passed in (which has been correctly
-        * ordered to take advantage of index ordering) to position ourselves
-        * at the right place in the scan.
+        * We want to start the scan somewhere within the index.  Set up a
+        * scankey we can use to search for the correct starting point.
         */
        scankeys = (ScanKey) palloc(keysCount * sizeof(ScanKeyData));
        for (i = 0; i < keysCount; i++)
        {
                j = nKeyIs[i];
-               /* _bt_orderkeys disallows it, but it's place to add some code latter */
+               /* _bt_orderkeys disallows it, but it's place to add some code later */
                if (so->keyData[j].sk_flags & SK_ISNULL)
                {
                        pfree(nKeyIs);
@@ -578,6 +576,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        blkno = BufferGetBlockNumber(buf);
        page = BufferGetPage(buf);
 
+       /* position to the precise item on the page */
        offnum = _bt_binsrch(rel, buf, keysCount, scankeys);
 
        ItemPointerSet(current, blkno, offnum);
@@ -595,7 +594,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
         * call _bt_endpoint() to set up a scan starting at that index endpoint,
         * as appropriate for the desired scan type.
         *
-        * it's yet other place to add some code latter for is(not)null ...
+        * it's yet other place to add some code later for is(not)null ...
         *----------
         */
 
@@ -737,13 +736,12 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        itup = &btitem->bti_itup;
 
        /* is the first item actually acceptable? */
-       if (_bt_checkkeys(scan, itup, &keysok))
+       if (_bt_checkkeys(scan, itup, dir, &continuescan))
        {
                /* yes, return it */
                res = FormRetrieveIndexResult(current, &(itup->t_tid));
        }
-       else if (keysok >= so->numberOfFirstKeys ||
-                        (keysok == ((Size) -1) && ScanDirectionIsBackward(dir)))
+       else if (continuescan)
        {
                /* no, but there might be another one that is */
                res = _bt_next(scan, dir);
@@ -906,7 +904,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
        IndexTuple      itup;
        BTScanOpaque so;
        RetrieveIndexResult res;
-       Size            keysok;
+       bool            continuescan;
 
        rel = scan->relation;
        current = &(scan->currentItemData);
@@ -1012,13 +1010,12 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
        itup = &(btitem->bti_itup);
 
        /* see if we picked a winner */
-       if (_bt_checkkeys(scan, itup, &keysok))
+       if (_bt_checkkeys(scan, itup, dir, &continuescan))
        {
                /* yes, return it */
                res = FormRetrieveIndexResult(current, &(itup->t_tid));
        }
-       else if (keysok >= so->numberOfFirstKeys ||
-                        (keysok == ((Size) -1) && ScanDirectionIsBackward(dir)))
+       else if (continuescan)
        {
                /* no, but there might be another one that is */
                res = _bt_next(scan, dir);
index eb77bfd8daecf472a8ef4d21cf7b68aab75fe3f0..7bf527dd44fa1e0587c4c9e5d69f9c73d02538dc 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.39 2000/07/21 19:21:00 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.40 2000/07/25 04:47:59 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -145,88 +145,148 @@ _bt_formitem(IndexTuple itup)
        return btitem;
 }
 
-/*
+/*----------
  *     _bt_orderkeys() -- Put keys in a sensible order for conjunctive quals.
  *
- *             The order of the keys in the qual match the ordering imposed by
- *             the index.      This routine only needs to be called if there is
- *             more than one qual clause using this index.
+ * After this routine runs, the scan keys are ordered by index attribute
+ * (all quals for attr 1, then all for attr 2, etc) and within each attr
+ * the keys are ordered by constraint type: ">", ">=", "=", "<=", "<".
+ * Furthermore, redundant keys are eliminated: we keep only the tightest
+ * >/>= bound and the tightest </<= bound, and if there's an = key then
+ * that's the only one returned.  (So, we return either a single = key,
+ * or one or two boundary-condition keys for each attr.)
+ *
+ * As a byproduct of this work, we can detect contradictory quals such
+ * as "x = 1 AND x > 2".  If we see that, we return so->quals_ok = FALSE,
+ * indicating the scan need not be run at all since no tuples can match.
+ *
+ * Another byproduct is to determine how many quals must be satisfied to
+ * continue the scan.  _bt_checkkeys uses this.  For example, if the quals
+ * are "x = 1 AND y < 4 AND z < 5", then _bt_checkkeys will reject a tuple
+ * (1,2,7), but we must continue the scan in case there are tuples (1,3,z).
+ * But once we reach tuples like (1,4,z) we can stop scanning because no
+ * later tuples could match.  This is reflected by setting 
+ * so->numberOfRequiredKeys to the number of leading keys that must be
+ * matched to continue the scan.  numberOfRequiredKeys is equal to the
+ * number of leading "=" keys plus the key(s) for the first non "="
+ * attribute, which can be seen to be correct by considering the above
+ * example.
+ *
+ * The initial ordering of the keys is expected to be by attribute already
+ * (see group_clauses_by_indexkey() in indxpath.c).  The task here is to
+ * standardize the appearance of multiple keys for the same attribute.
+ *
+ * XXX this routine is one of many places that fail to handle SK_COMMUTE
+ * scankeys properly.  Currently, the planner is careful never to generate
+ * any indexquals that would require SK_COMMUTE to be set.  Someday we ought
+ * to try to fix this, though it's not real critical as long as indexable
+ * operators all have commutators...
+ *
+ * Note: this routine invokes comparison operators via OidFunctionCallN,
+ * ie, without caching function lookups.  No point in trying to be smarter,
+ * since these comparisons are executed only when the user expresses a
+ * hokey qualification, and happen only once per scan anyway.
+ *----------
  */
 void
 _bt_orderkeys(Relation relation, BTScanOpaque so)
 {
-       ScanKey         xform;
-       ScanKeyData *cur;
+       ScanKeyData     xform[BTMaxStrategyNumber];
+       bool            init[BTMaxStrategyNumber];
+       uint16          numberOfKeys = so->numberOfKeys;
+       ScanKey         key;
+       ScanKey         cur;
        StrategyMap map;
-       int                     nbytes;
        Datum           test;
        int                     i,
                                j;
-       int                     init[BTMaxStrategyNumber + 1];
-       ScanKey         key;
-       uint16          numberOfKeys = so->numberOfKeys;
-       uint16          new_numberOfKeys = 0;
-       AttrNumber      attno = 1;
-       bool            equalStrategyEnd,
-                               underEqualStrategy;
+       AttrNumber      attno;
+       uint16          new_numberOfKeys;
+       bool            allEqualSoFar;
+
+       so->qual_ok = true;
+       so->numberOfRequiredKeys = 0;
 
        if (numberOfKeys < 1)
-               return;
+               return;                                 /* done if qual-less scan */
 
        key = so->keyData;
-
        cur = &key[0];
+       /* check input keys are correctly ordered */
        if (cur->sk_attno != 1)
                elog(ERROR, "_bt_orderkeys: key(s) for attribute 1 missed");
 
+       /* We can short-circuit most of the work if there's just one key */
        if (numberOfKeys == 1)
        {
 
                /*
                 * We don't use indices for 'A is null' and 'A is not null'
-                * currently and 'A < = > <> NULL' is non-sense' - so qual is not
-                * Ok.          - vadim 03/21/97
+                * currently and 'A < = > <> NULL' will always fail - so qual is
+                * not Ok if comparison value is NULL.          - vadim 03/21/97
                 */
                if (cur->sk_flags & SK_ISNULL)
-                       so->qual_ok = 0;
-               so->numberOfFirstKeys = 1;
+                       so->qual_ok = false;
+               so->numberOfRequiredKeys = 1;
                return;
        }
 
-       /* get space for the modified array of keys */
-       nbytes = BTMaxStrategyNumber * sizeof(ScanKeyData);
-       xform = (ScanKey) palloc(nbytes);
-
-       MemSet(xform, 0, nbytes);
+       /*
+        * Otherwise, do the full set of pushups.
+        */
+       new_numberOfKeys = 0;
+       allEqualSoFar = true;
+
+       /*
+        * Initialize for processing of keys for attr 1.
+        *
+        * xform[i] holds a copy of the current scan key of strategy type i+1,
+        * if any; init[i] is TRUE if we have found such a key for this attr.
+        */
+       attno = 1;
        map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation),
                                                                          BTMaxStrategyNumber,
                                                                          attno);
-       for (j = 0; j <= BTMaxStrategyNumber; j++)
-               init[j] = 0;
-
-       equalStrategyEnd = false;
-       underEqualStrategy = true;
-       /* check each key passed in */
-       for (i = 0;;)
+       MemSet(xform, 0, sizeof(xform)); /* not really necessary */
+       MemSet(init, 0, sizeof(init));
+
+       /*
+        * Loop iterates from 0 to numberOfKeys inclusive; we use the last
+        * pass to handle after-last-key processing.  Actual exit from the
+        * loop is at the "break" statement below.
+        */
+       for (i = 0; ; cur++, i++)
        {
                if (i < numberOfKeys)
-                       cur = &key[i];
-
-               if (cur->sk_flags & SK_ISNULL)  /* see comments above */
-                       so->qual_ok = 0;
+               {
+                       /* See comments above: any NULL implies cannot match qual */
+                       if (cur->sk_flags & SK_ISNULL)
+                       {
+                               so->qual_ok = false;
+                               /* Quit processing so we don't try to invoke comparison
+                                * routines on NULLs.
+                                */
+                               return;
+                       }
+               }
 
+               /*
+                * If we are at the end of the keys for a particular attr,
+                * finish up processing and emit the cleaned-up keys.
+                */
                if (i == numberOfKeys || cur->sk_attno != attno)
                {
-                       if (cur->sk_attno != attno + 1 && i < numberOfKeys)
+                       bool            priorAllEqualSoFar = allEqualSoFar;
+
+                       /* check input keys are correctly ordered */
+                       if (i < numberOfKeys && cur->sk_attno != attno + 1)
                                elog(ERROR, "_bt_orderkeys: key(s) for attribute %d missed",
                                         attno + 1);
 
-                       underEqualStrategy = (!equalStrategyEnd);
-
                        /*
                         * If = has been specified, no other key will be used. In case
-                        * of key < 2 && key == 1 and so on we have to set qual_ok to
-                        * 0
+                        * of key > 2 && key == 1 and so on we have to set qual_ok to
+                        * false before discarding the other keys.
                         */
                        if (init[BTEqualStrategyNumber - 1])
                        {
@@ -236,187 +296,222 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
                                eq = &xform[BTEqualStrategyNumber - 1];
                                for (j = BTMaxStrategyNumber; --j >= 0;)
                                {
-                                       if (j == (BTEqualStrategyNumber - 1) || init[j] == 0)
+                                       if (! init[j] ||
+                                               j == (BTEqualStrategyNumber - 1))
                                                continue;
                                        chk = &xform[j];
                                        test = OidFunctionCall2(chk->sk_procedure,
-                                                                                       eq->sk_argument, chk->sk_argument);
+                                                                                       eq->sk_argument,
+                                                                                       chk->sk_argument);
                                        if (!DatumGetBool(test))
-                                               so->qual_ok = 0;
+                                               so->qual_ok = false;
                                }
-                               init[BTLessStrategyNumber - 1] = 0;
-                               init[BTLessEqualStrategyNumber - 1] = 0;
-                               init[BTGreaterEqualStrategyNumber - 1] = 0;
-                               init[BTGreaterStrategyNumber - 1] = 0;
+                               init[BTLessStrategyNumber - 1] = false;
+                               init[BTLessEqualStrategyNumber - 1] = false;
+                               init[BTGreaterEqualStrategyNumber - 1] = false;
+                               init[BTGreaterStrategyNumber - 1] = false;
                        }
                        else
-                               equalStrategyEnd = true;
+                       {
+                               /*
+                                * No "=" for this key, so we're done with required keys
+                                */
+                               allEqualSoFar = false;
+                       }
 
-                       /* only one of <, <= */
+                       /* keep only one of <, <= */
                        if (init[BTLessStrategyNumber - 1]
                                && init[BTLessEqualStrategyNumber - 1])
                        {
-                               ScanKeyData *lt,
-                                                  *le;
+                               ScanKeyData *lt = &xform[BTLessStrategyNumber - 1];
+                               ScanKeyData *le = &xform[BTLessEqualStrategyNumber - 1];
 
-                               lt = &xform[BTLessStrategyNumber - 1];
-                               le = &xform[BTLessEqualStrategyNumber - 1];
-
-                               /*
-                                * DO NOT use the cached function stuff here -- this is
-                                * key ordering, happens only when the user expresses a
-                                * hokey qualification, and gets executed only once,
-                                * anyway.      The transform maps are hard-coded, and can't
-                                * be initialized in the correct way.
-                                */
                                test = OidFunctionCall2(le->sk_procedure,
-                                                                               lt->sk_argument, le->sk_argument);
+                                                                               lt->sk_argument,
+                                                                               le->sk_argument);
                                if (DatumGetBool(test))
-                                       init[BTLessEqualStrategyNumber - 1] = 0;
+                                       init[BTLessEqualStrategyNumber - 1] = false;
                                else
-                                       init[BTLessStrategyNumber - 1] = 0;
+                                       init[BTLessStrategyNumber - 1] = false;
                        }
 
-                       /* only one of >, >= */
+                       /* keep only one of >, >= */
                        if (init[BTGreaterStrategyNumber - 1]
                                && init[BTGreaterEqualStrategyNumber - 1])
                        {
-                               ScanKeyData *gt,
-                                                  *ge;
-
-                               gt = &xform[BTGreaterStrategyNumber - 1];
-                               ge = &xform[BTGreaterEqualStrategyNumber - 1];
+                               ScanKeyData *gt = &xform[BTGreaterStrategyNumber - 1];
+                               ScanKeyData *ge = &xform[BTGreaterEqualStrategyNumber - 1];
 
-                               /* see note above on function cache */
                                test = OidFunctionCall2(ge->sk_procedure,
-                                                                               gt->sk_argument, ge->sk_argument);
+                                                                               gt->sk_argument,
+                                                                               ge->sk_argument);
                                if (DatumGetBool(test))
-                                       init[BTGreaterEqualStrategyNumber - 1] = 0;
+                                       init[BTGreaterEqualStrategyNumber - 1] = false;
                                else
-                                       init[BTGreaterStrategyNumber - 1] = 0;
+                                       init[BTGreaterStrategyNumber - 1] = false;
                        }
 
-                       /* okay, reorder and count */
+                       /*
+                        * Emit the cleaned-up keys back into the key[] array in the
+                        * correct order.  Note we are overwriting our input here!
+                        * It's OK because (a) xform[] is a physical copy of the keys
+                        * we want, (b) we cannot emit more keys than we input, so
+                        * we won't overwrite as-yet-unprocessed keys.
+                        */
                        for (j = BTMaxStrategyNumber; --j >= 0;)
+                       {
                                if (init[j])
-                                       key[new_numberOfKeys++] = xform[j];
+                                       memcpy(&key[new_numberOfKeys++], &xform[j],
+                                                  sizeof(ScanKeyData));
+                       }
 
-                       if (underEqualStrategy)
-                               so->numberOfFirstKeys = new_numberOfKeys;
+                       /*
+                        * If all attrs before this one had "=", include these keys
+                        * into the required-keys count.
+                        */
+                       if (priorAllEqualSoFar)
+                               so->numberOfRequiredKeys = new_numberOfKeys;
 
+                       /*
+                        * Exit loop here if done.
+                        */
                        if (i == numberOfKeys)
                                break;
 
-                       /* initialization for new attno */
+                       /* Re-initialize for new attno */
                        attno = cur->sk_attno;
-                       MemSet(xform, 0, nbytes);
                        map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation),
                                                                                          BTMaxStrategyNumber,
                                                                                          attno);
-                       /* haven't looked at any strategies yet */
-                       for (j = 0; j <= BTMaxStrategyNumber; j++)
-                               init[j] = 0;
+                       MemSet(xform, 0, sizeof(xform)); /* not really necessary */
+                       MemSet(init, 0, sizeof(init));
                }
 
+               /*
+                * OK, figure out which strategy this key corresponds to
+                */
                for (j = BTMaxStrategyNumber; --j >= 0;)
                {
                        if (cur->sk_procedure == map->entry[j].sk_procedure)
                                break;
                }
+               if (j < 0)
+                       elog(ERROR, "_bt_orderkeys: unable to identify operator %u",
+                                cur->sk_procedure);
 
                /* have we seen one of these before? */
                if (init[j])
                {
-                       /* yup, use the appropriate value */
+                       /* yup, keep the more restrictive value */
                        test = FunctionCall2(&cur->sk_func,
-                                                                cur->sk_argument, xform[j].sk_argument);
+                                                                cur->sk_argument,
+                                                                xform[j].sk_argument);
                        if (DatumGetBool(test))
                                xform[j].sk_argument = cur->sk_argument;
                        else if (j == (BTEqualStrategyNumber - 1))
-                               so->qual_ok = 0;/* key == a && key == b, but a != b */
+                               so->qual_ok = false; /* key == a && key == b, but a != b */
                }
                else
                {
-                       /* nope, use this value */
-                       memmove(&xform[j], cur, sizeof(*cur));
-                       init[j] = 1;
+                       /* nope, so remember this scankey */
+                       memcpy(&xform[j], cur, sizeof(ScanKeyData));
+                       init[j] = true;
                }
-
-               i++;
        }
 
        so->numberOfKeys = new_numberOfKeys;
-
-       pfree(xform);
 }
 
 /*
- * Test whether an indextuple satisfies all the scankey conditions
- *
- * If not ("false" return), the number of conditions satisfied is
- * returned in *keysok.  Given proper ordering of the scankey conditions,
- * we can use this to determine whether it's worth continuing the scan.
- * See _bt_orderkeys().
+ * Test whether an indextuple satisfies all the scankey conditions.
  *
- * HACK: *keysok == (Size) -1 means we stopped evaluating because we found
- * a NULL value in the index tuple.  It's not quite clear to me why this
- * case has to be treated specially --- tgl 7/00.
+ * If the tuple fails to pass the qual, we also determine whether there's
+ * any need to continue the scan beyond this tuple, and set *continuescan
+ * accordingly.  See comments for _bt_orderkeys(), above, about how this is
+ * done.
  */
 bool
-_bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, Size *keysok)
+_bt_checkkeys(IndexScanDesc scan, IndexTuple tuple,
+                         ScanDirection dir, bool *continuescan)
 {
        BTScanOpaque so = (BTScanOpaque) scan->opaque;
        Size            keysz = so->numberOfKeys;
        TupleDesc       tupdesc;
        ScanKey         key;
-       Datum           datum;
-       bool            isNull;
-       Datum           test;
+       Size            keysok;
+
+       *continuescan = true;
 
-       *keysok = 0;
+       /* If no keys, always scan the whole index */
        if (keysz == 0)
                return true;
 
-       key = so->keyData;
        tupdesc = RelationGetDescr(scan->relation);
+       key = so->keyData;
+       keysok = 0;
 
        IncrIndexProcessed();
 
        while (keysz > 0)
        {
+               Datum           datum;
+               bool            isNull;
+               Datum           test;
+
                datum = index_getattr(tuple,
-                                                         key[0].sk_attno,
+                                                         key->sk_attno,
                                                          tupdesc,
                                                          &isNull);
 
                /* btree doesn't support 'A is null' clauses, yet */
-               if (key[0].sk_flags & SK_ISNULL)
-                       return false;
-               if (isNull)
+               if (key->sk_flags & SK_ISNULL)
                {
-                       if (*keysok < so->numberOfFirstKeys)
-                               *keysok = -1;
+                       /* we shouldn't get here, really; see _bt_orderkeys() */
+                       *continuescan = false;
                        return false;
                }
 
-               if (key[0].sk_flags & SK_COMMUTE)
+               if (isNull)
                {
-                       test = FunctionCall2(&key[0].sk_func,
-                                                                key[0].sk_argument, datum);
+                       /*
+                        * Since NULLs are sorted after non-NULLs, we know we have
+                        * reached the upper limit of the range of values for this
+                        * index attr.  On a forward scan, we can stop if this qual
+                        * is one of the "must match" subset.  On a backward scan,
+                        * however, we should keep going.
+                        */
+                       if (keysok < so->numberOfRequiredKeys &&
+                               ScanDirectionIsForward(dir))
+                               *continuescan = false;
+                       /*
+                        * In any case, this indextuple doesn't match the qual.
+                        */
+                       return false;
                }
+
+               if (key->sk_flags & SK_COMMUTE)
+                       test = FunctionCall2(&key->sk_func,
+                                                                key->sk_argument, datum);
                else
-               {
-                       test = FunctionCall2(&key[0].sk_func,
-                                                                datum, key[0].sk_argument);
-               }
+                       test = FunctionCall2(&key->sk_func,
+                                                                datum, key->sk_argument);
 
-               if (DatumGetBool(test) == !!(key[0].sk_flags & SK_NEGATE))
+               if (DatumGetBool(test) == !!(key->sk_flags & SK_NEGATE))
+               {
+                       /*
+                        * Tuple fails this qual.  If it's a required qual, then
+                        * we can conclude no further tuples will pass, either.
+                        */
+                       if (keysok < so->numberOfRequiredKeys)
+                               *continuescan = false;
                        return false;
+               }
 
-               (*keysok)++;
+               keysok++;
                key++;
                keysz--;
        }
 
+       /* If we get here, the tuple passes all quals. */
        return true;
 }
index 3f8eebc3b3614d3e58bafa201110c7f11a33e4a6..d84cfa5ded94b8496856d8a1f253f2df741ed4ae 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: nbtree.h,v 1.39 2000/07/21 06:42:35 tgl Exp $
+ * $Id: nbtree.h,v 1.40 2000/07/25 04:47:57 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -47,13 +47,12 @@ typedef struct BTPageOpaqueData
 typedef BTPageOpaqueData *BTPageOpaque;
 
 /*
- *     ScanOpaqueData is used to remember which buffers we're currently
- *     examining in the scan.  We keep these buffers locked and pinned
- *     and recorded in the opaque entry of the scan in order to avoid
- *     doing a ReadBuffer() for every tuple in the index.      This avoids
- *     semop() calls, which are expensive.
+ *     BTScanOpaqueData is used to remember which buffers we're currently
+ *     examining in the scan.  We keep these buffers pinned (but not locked,
+ *     see nbtree.c) and recorded in the opaque entry of the scan to avoid
+ *     doing a ReadBuffer() for every tuple in the index.
  *
- *     And it's used to remember actual scankey info (we need in it
+ *     And it's used to remember actual scankey info (we need it
  *     if some scankeys evaled at runtime).
  *
  *     curHeapIptr & mrkHeapIptr are heap iptr-s from current/marked
@@ -69,11 +68,12 @@ typedef struct BTScanOpaqueData
        Buffer          btso_mrkbuf;
        ItemPointerData curHeapIptr;
        ItemPointerData mrkHeapIptr;
-       uint16          qual_ok;                /* 0 for quals like key == 1 && key > 2 */
-       uint16          numberOfKeys;   /* number of keys */
-       uint16          numberOfFirstKeys;              /* number of keys for 1st
-                                                                                * attribute */
-       ScanKey         keyData;                /* key descriptor */
+       /* these fields are set by _bt_orderkeys(), which see for more info: */
+       bool            qual_ok;                /* false if qual can never be satisfied */
+       uint16          numberOfKeys;   /* number of scan keys */
+       uint16          numberOfRequiredKeys; /* number of keys that must be matched
+                                                                          * to continue the scan */
+       ScanKey         keyData;                /* array of scan keys */
 } BTScanOpaqueData;
 
 typedef BTScanOpaqueData *BTScanOpaque;
@@ -276,7 +276,8 @@ extern ScanKey _bt_mkscankey_nodata(Relation rel);
 extern void _bt_freeskey(ScanKey skey);
 extern void _bt_freestack(BTStack stack);
 extern void _bt_orderkeys(Relation relation, BTScanOpaque so);
-extern bool _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, Size *keysok);
+extern bool _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple,
+                                                 ScanDirection dir, bool *continuescan);
 extern BTItem _bt_formitem(IndexTuple itup);
 
 /*