]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/nbtree/nbtsearch.c
Update copyright for 2014
[postgresql] / src / backend / access / nbtree / nbtsearch.c
index 22943dcd020285f0cb1451e7ab3343738dbb7c5e..bc62fbc02e6c189438f1b693b1ca97f5b6068f88 100644 (file)
@@ -4,25 +4,30 @@
  *       Search code for postgres btrees.
  *
  *
- * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.116 2008/01/01 19:45:46 momjian Exp $
+ *       src/backend/access/nbtree/nbtsearch.c
  *
  *-------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
-#include "access/genam.h"
 #include "access/nbtree.h"
+#include "access/relscan.h"
+#include "miscadmin.h"
 #include "pgstat.h"
+#include "storage/predicate.h"
 #include "utils/lsyscache.h"
+#include "utils/rel.h"
 
 
 static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
                         OffsetNumber offnum);
+static void _bt_saveitem(BTScanOpaque so, int itemIndex,
+                        OffsetNumber offnum, IndexTuple itup);
 static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
 static Buffer _bt_walk_left(Relation rel, Buffer buf);
 static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
@@ -398,9 +403,10 @@ _bt_compare(Relation rel,
                         * to flip the sign of the comparison result.  (Unless it's a DESC
                         * column, in which case we *don't* flip the sign.)
                         */
-                       result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
-                                                                                                datum,
-                                                                                                scankey->sk_argument));
+                       result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
+                                                                                                        scankey->sk_collation,
+                                                                                                        datum,
+                                                                                                        scankey->sk_argument));
 
                        if (!(scankey->sk_flags & SK_BT_DESC))
                                result = -result;
@@ -425,8 +431,9 @@ _bt_compare(Relation rel,
  *             if backwards scan, the last item) in the tree that satisfies the
  *             qualifications in the scan key.  On success exit, the page containing
  *             the current index tuple is pinned but not locked, and data about
- *             the matching tuple(s) on the page has been loaded into so->currPos,
- *             and scan->xs_ctup.t_self is set to the heap TID of the current tuple.
+ *             the matching tuple(s) on the page has been loaded into so->currPos.
+ *             scan->xs_ctup.t_self is set to the heap TID of the current tuple,
+ *             and if requested, scan->xs_itup points to a copy of the index tuple.
  *
  * If there are no matching items in the index, we return FALSE, with no
  * pins or locks held.
@@ -449,9 +456,11 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        bool            goback;
        ScanKey         startKeys[INDEX_MAX_KEYS];
        ScanKeyData scankeys[INDEX_MAX_KEYS];
+       ScanKeyData notnullkeys[INDEX_MAX_KEYS];
        int                     keysCount = 0;
        int                     i;
        StrategyNumber strat_total;
+       BTScanPosItem *currItem;
 
        pgstat_count_index_scan(rel);
 
@@ -498,6 +507,14 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
         * one we use --- by definition, they are either redundant or
         * contradictory.
         *
+        * Any regular (not SK_SEARCHNULL) key implies a NOT NULL qualifier.
+        * If the index stores nulls at the end of the index we'll be starting
+        * from, and we have no boundary key for the column (which means the key
+        * we deduced NOT NULL from is an inequality key that constrains the other
+        * end of the index), then we cons up an explicit SK_SEARCHNOTNULL key to
+        * use as a boundary key.  If we didn't do this, we might find ourselves
+        * traversing a lot of null entries at the start of the scan.
+        *
         * In this loop, row-comparison keys are treated the same as keys on their
         * first (leftmost) columns.  We'll add on lower-order columns of the row
         * comparison below, if possible.
@@ -511,6 +528,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        {
                AttrNumber      curattr;
                ScanKey         chosen;
+               ScanKey         impliesNN;
                ScanKey         cur;
 
                /*
@@ -520,6 +538,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                 */
                curattr = 1;
                chosen = NULL;
+               /* Also remember any scankey that implies a NOT NULL constraint */
+               impliesNN = NULL;
 
                /*
                 * Loop iterates from 0 to numberOfKeys inclusive; we use the last
@@ -532,8 +552,32 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                        {
                                /*
                                 * Done looking at keys for curattr.  If we didn't find a
-                                * usable boundary key, quit; else save the boundary key
-                                * pointer in startKeys.
+                                * usable boundary key, see if we can deduce a NOT NULL key.
+                                */
+                               if (chosen == NULL && impliesNN != NULL &&
+                                       ((impliesNN->sk_flags & SK_BT_NULLS_FIRST) ?
+                                        ScanDirectionIsForward(dir) :
+                                        ScanDirectionIsBackward(dir)))
+                               {
+                                       /* Yes, so build the key in notnullkeys[keysCount] */
+                                       chosen = &notnullkeys[keysCount];
+                                       ScanKeyEntryInitialize(chosen,
+                                                                                  (SK_SEARCHNOTNULL | SK_ISNULL |
+                                                                                       (impliesNN->sk_flags &
+                                                                                 (SK_BT_DESC | SK_BT_NULLS_FIRST))),
+                                                                                  curattr,
+                                                                ((impliesNN->sk_flags & SK_BT_NULLS_FIRST) ?
+                                                                 BTGreaterStrategyNumber :
+                                                                 BTLessStrategyNumber),
+                                                                                  InvalidOid,
+                                                                                  InvalidOid,
+                                                                                  InvalidOid,
+                                                                                  (Datum) 0);
+                               }
+
+                               /*
+                                * If we still didn't find a usable boundary key, quit; else
+                                * save the boundary key pointer in startKeys.
                                 */
                                if (chosen == NULL)
                                        break;
@@ -566,15 +610,27 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                                 */
                                curattr = cur->sk_attno;
                                chosen = NULL;
+                               impliesNN = NULL;
                        }
 
-                       /* Can we use this key as a starting boundary for this attr? */
+                       /*
+                        * Can we use this key as a starting boundary for this attr?
+                        *
+                        * If not, does it imply a NOT NULL constraint?  (Because
+                        * SK_SEARCHNULL keys are always assigned BTEqualStrategyNumber,
+                        * *any* inequality key works for that; we need not test.)
+                        */
                        switch (cur->sk_strategy)
                        {
                                case BTLessStrategyNumber:
                                case BTLessEqualStrategyNumber:
-                                       if (chosen == NULL && ScanDirectionIsBackward(dir))
-                                               chosen = cur;
+                                       if (chosen == NULL)
+                                       {
+                                               if (ScanDirectionIsBackward(dir))
+                                                       chosen = cur;
+                                               else
+                                                       impliesNN = cur;
+                                       }
                                        break;
                                case BTEqualStrategyNumber:
                                        /* override any non-equality choice */
@@ -582,8 +638,13 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                                        break;
                                case BTGreaterEqualStrategyNumber:
                                case BTGreaterStrategyNumber:
-                                       if (chosen == NULL && ScanDirectionIsForward(dir))
-                                               chosen = cur;
+                                       if (chosen == NULL)
+                                       {
+                                               if (ScanDirectionIsForward(dir))
+                                                       chosen = cur;
+                                               else
+                                                       impliesNN = cur;
+                                       }
                                        break;
                        }
                }
@@ -709,6 +770,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                                                                                           cur->sk_attno,
                                                                                           InvalidStrategy,
                                                                                           cur->sk_subtype,
+                                                                                          cur->sk_collation,
                                                                                           procinfo,
                                                                                           cur->sk_argument);
                        }
@@ -729,6 +791,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                                                                           cur->sk_attno,
                                                                           InvalidStrategy,
                                                                           cur->sk_subtype,
+                                                                          cur->sk_collation,
                                                                           cmp_proc,
                                                                           cur->sk_argument);
                        }
@@ -840,9 +903,16 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 
        if (!BufferIsValid(buf))
        {
-               /* Only get here if index is completely empty */
+               /*
+                * We only get here if the index is completely empty. Lock relation
+                * because nothing finer to lock exists.
+                */
+               PredicateLockRelation(rel, scan->xs_snapshot);
                return false;
        }
+       else
+               PredicateLockPage(rel, BufferGetBlockNumber(buf),
+                                                 scan->xs_snapshot);
 
        /* initialize moreLeft/moreRight appropriately for scan direction */
        if (ScanDirectionIsForward(dir))
@@ -899,7 +969,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
 
        /* OK, itemIndex says what to return */
-       scan->xs_ctup.t_self = so->currPos.items[so->currPos.itemIndex].heapTid;
+       currItem = &so->currPos.items[so->currPos.itemIndex];
+       scan->xs_ctup.t_self = currItem->heapTid;
+       if (scan->xs_want_itup)
+               scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);
 
        return true;
 }
@@ -912,7 +985,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
  *             previously returned.
  *
  *             On successful exit, scan->xs_ctup.t_self is set to the TID of the
- *             next heap tuple, and so->currPos is updated as needed.
+ *             next heap tuple, and if requested, scan->xs_itup points to a copy of
+ *             the index tuple.  so->currPos is updated as needed.
  *
  *             On failure exit (no more tuples), we release pin and set
  *             so->currPos.buf to InvalidBuffer.
@@ -921,6 +995,7 @@ bool
 _bt_next(IndexScanDesc scan, ScanDirection dir)
 {
        BTScanOpaque so = (BTScanOpaque) scan->opaque;
+       BTScanPosItem *currItem;
 
        /*
         * Advance to next tuple on current page; or if there's no more, try to
@@ -954,7 +1029,10 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
        }
 
        /* OK, itemIndex says what to return */
-       scan->xs_ctup.t_self = so->currPos.items[so->currPos.itemIndex].heapTid;
+       currItem = &so->currPos.items[so->currPos.itemIndex];
+       scan->xs_ctup.t_self = currItem->heapTid;
+       if (scan->xs_want_itup)
+               scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);
 
        return true;
 }
@@ -983,6 +1061,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
        OffsetNumber minoff;
        OffsetNumber maxoff;
        int                     itemIndex;
+       IndexTuple      itup;
        bool            continuescan;
 
        /* we must have the buffer pinned and locked */
@@ -1000,6 +1079,9 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
         */
        so->currPos.nextPage = opaque->btpo_next;
 
+       /* initialize tuple workspace to empty */
+       so->currPos.nextTupleOffset = 0;
+
        if (ScanDirectionIsForward(dir))
        {
                /* load items[] in ascending order */
@@ -1009,12 +1091,11 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 
                while (offnum <= maxoff)
                {
-                       if (_bt_checkkeys(scan, page, offnum, dir, &continuescan))
+                       itup = _bt_checkkeys(scan, page, offnum, dir, &continuescan);
+                       if (itup != NULL)
                        {
                                /* tuple passes all scan key conditions, so remember it */
-                               /* _bt_checkkeys put the heap ptr into scan->xs_ctup.t_self */
-                               so->currPos.items[itemIndex].heapTid = scan->xs_ctup.t_self;
-                               so->currPos.items[itemIndex].indexOffset = offnum;
+                               _bt_saveitem(so, itemIndex, offnum, itup);
                                itemIndex++;
                        }
                        if (!continuescan)
@@ -1041,13 +1122,12 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 
                while (offnum >= minoff)
                {
-                       if (_bt_checkkeys(scan, page, offnum, dir, &continuescan))
+                       itup = _bt_checkkeys(scan, page, offnum, dir, &continuescan);
+                       if (itup != NULL)
                        {
                                /* tuple passes all scan key conditions, so remember it */
-                               /* _bt_checkkeys put the heap ptr into scan->xs_ctup.t_self */
                                itemIndex--;
-                               so->currPos.items[itemIndex].heapTid = scan->xs_ctup.t_self;
-                               so->currPos.items[itemIndex].indexOffset = offnum;
+                               _bt_saveitem(so, itemIndex, offnum, itup);
                        }
                        if (!continuescan)
                        {
@@ -1068,6 +1148,25 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
        return (so->currPos.firstItem <= so->currPos.lastItem);
 }
 
+/* Save an index item into so->currPos.items[itemIndex] */
+static void
+_bt_saveitem(BTScanOpaque so, int itemIndex,
+                        OffsetNumber offnum, IndexTuple itup)
+{
+       BTScanPosItem *currItem = &so->currPos.items[itemIndex];
+
+       currItem->heapTid = itup->t_tid;
+       currItem->indexOffset = offnum;
+       if (so->currTuples)
+       {
+               Size            itupsz = IndexTupleSize(itup);
+
+               currItem->tupleOffset = so->currPos.nextTupleOffset;
+               memcpy(so->currTuples + so->currPos.nextTupleOffset, itup, itupsz);
+               so->currPos.nextTupleOffset += MAXALIGN(itupsz);
+       }
+}
+
 /*
  *     _bt_steppage() -- Step to next page containing valid data for scan
  *
@@ -1106,6 +1205,9 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
                memcpy(&so->markPos, &so->currPos,
                           offsetof(BTScanPosData, items[1]) +
                           so->currPos.lastItem * sizeof(BTScanPosItem));
+               if (so->markTuples)
+                       memcpy(so->markTuples, so->currTuples,
+                                  so->currPos.nextTupleOffset);
                so->markPos.itemIndex = so->markItemIndex;
                so->markItemIndex = -1;
        }
@@ -1123,21 +1225,22 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
 
                for (;;)
                {
-                       /* if we're at end of scan, release the buffer and return */
+                       /* release the previous buffer */
+                       _bt_relbuf(rel, so->currPos.buf);
+                       so->currPos.buf = InvalidBuffer;
+                       /* if we're at end of scan, give up */
                        if (blkno == P_NONE || !so->currPos.moreRight)
-                       {
-                               _bt_relbuf(rel, so->currPos.buf);
-                               so->currPos.buf = InvalidBuffer;
                                return false;
-                       }
+                       /* check for interrupts while we're not holding any buffer lock */
+                       CHECK_FOR_INTERRUPTS();
                        /* step right one page */
-                       so->currPos.buf = _bt_relandgetbuf(rel, so->currPos.buf,
-                                                                                          blkno, BT_READ);
+                       so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
                        /* check for deleted page */
                        page = BufferGetPage(so->currPos.buf);
                        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
                        if (!P_IGNORE(opaque))
                        {
+                               PredicateLockPage(rel, blkno, scan->xs_snapshot);
                                /* see if there are any matches on this page */
                                /* note that this will clear moreRight if we can stop */
                                if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque)))
@@ -1185,6 +1288,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
                        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
                        if (!P_IGNORE(opaque))
                        {
+                               PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf), scan->xs_snapshot);
                                /* see if there are any matches on this page */
                                /* note that this will clear moreLeft if we can stop */
                                if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
@@ -1236,7 +1340,10 @@ _bt_walk_left(Relation rel, Buffer buf)
                obknum = BufferGetBlockNumber(buf);
                /* step left */
                blkno = lblkno = opaque->btpo_prev;
-               buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
+               _bt_relbuf(rel, buf);
+               /* check for interrupts while we're not holding any buffer lock */
+               CHECK_FOR_INTERRUPTS();
+               buf = _bt_getbuf(rel, blkno, BT_READ);
                page = BufferGetPage(buf);
                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
@@ -1343,10 +1450,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
                buf = _bt_gettrueroot(rel);
 
        if (!BufferIsValid(buf))
-       {
-               /* empty index... */
                return InvalidBuffer;
-       }
 
        page = BufferGetPage(buf);
        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -1413,6 +1517,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
        Page            page;
        BTPageOpaque opaque;
        OffsetNumber start;
+       BTScanPosItem *currItem;
 
        /*
         * Scan down to the leftmost or rightmost leaf page.  This is a simplified
@@ -1423,11 +1528,16 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
 
        if (!BufferIsValid(buf))
        {
-               /* empty index... */
+               /*
+                * Empty index. Lock the whole relation, as nothing finer to lock
+                * exists.
+                */
+               PredicateLockRelation(rel, scan->xs_snapshot);
                so->currPos.buf = InvalidBuffer;
                return false;
        }
 
+       PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot);
        page = BufferGetPage(buf);
        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
        Assert(P_ISLEAF(opaque));
@@ -1485,7 +1595,10 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
        LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
 
        /* OK, itemIndex says what to return */
-       scan->xs_ctup.t_self = so->currPos.items[so->currPos.itemIndex].heapTid;
+       currItem = &so->currPos.items[so->currPos.itemIndex];
+       scan->xs_ctup.t_self = currItem->heapTid;
+       if (scan->xs_want_itup)
+               scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);
 
        return true;
 }