* Search code for postgres btrees.
*
*
- * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.116 2008/01/01 19:45:46 momjian Exp $
+ * src/backend/access/nbtree/nbtsearch.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
-#include "access/genam.h"
#include "access/nbtree.h"
+#include "access/relscan.h"
+#include "miscadmin.h"
#include "pgstat.h"
+#include "storage/predicate.h"
#include "utils/lsyscache.h"
+#include "utils/rel.h"
static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
OffsetNumber offnum);
+static void _bt_saveitem(BTScanOpaque so, int itemIndex,
+ OffsetNumber offnum, IndexTuple itup);
static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
static Buffer _bt_walk_left(Relation rel, Buffer buf);
static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
* to flip the sign of the comparison result. (Unless it's a DESC
* column, in which case we *don't* flip the sign.)
*/
- result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
- datum,
- scankey->sk_argument));
+ result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
+ scankey->sk_collation,
+ datum,
+ scankey->sk_argument));
if (!(scankey->sk_flags & SK_BT_DESC))
result = -result;
* if backwards scan, the last item) in the tree that satisfies the
* qualifications in the scan key. On success exit, the page containing
* the current index tuple is pinned but not locked, and data about
- * the matching tuple(s) on the page has been loaded into so->currPos,
- * and scan->xs_ctup.t_self is set to the heap TID of the current tuple.
+ * the matching tuple(s) on the page has been loaded into so->currPos.
+ * scan->xs_ctup.t_self is set to the heap TID of the current tuple,
+ * and if requested, scan->xs_itup points to a copy of the index tuple.
*
* If there are no matching items in the index, we return FALSE, with no
* pins or locks held.
bool goback;
ScanKey startKeys[INDEX_MAX_KEYS];
ScanKeyData scankeys[INDEX_MAX_KEYS];
+ ScanKeyData notnullkeys[INDEX_MAX_KEYS];
int keysCount = 0;
int i;
StrategyNumber strat_total;
+ BTScanPosItem *currItem;
pgstat_count_index_scan(rel);
* one we use --- by definition, they are either redundant or
* contradictory.
*
+ * Any regular (not SK_SEARCHNULL) key implies a NOT NULL qualifier.
+ * If the index stores nulls at the end of the index we'll be starting
+ * from, and we have no boundary key for the column (which means the key
+ * we deduced NOT NULL from is an inequality key that constrains the other
+ * end of the index), then we cons up an explicit SK_SEARCHNOTNULL key to
+ * use as a boundary key. If we didn't do this, we might find ourselves
+ * traversing a lot of null entries at the start of the scan.
+ *
* In this loop, row-comparison keys are treated the same as keys on their
* first (leftmost) columns. We'll add on lower-order columns of the row
* comparison below, if possible.
{
AttrNumber curattr;
ScanKey chosen;
+ ScanKey impliesNN;
ScanKey cur;
/*
*/
curattr = 1;
chosen = NULL;
+ /* Also remember any scankey that implies a NOT NULL constraint */
+ impliesNN = NULL;
/*
* Loop iterates from 0 to numberOfKeys inclusive; we use the last
{
/*
* Done looking at keys for curattr. If we didn't find a
- * usable boundary key, quit; else save the boundary key
- * pointer in startKeys.
+ * usable boundary key, see if we can deduce a NOT NULL key.
+ */
+ if (chosen == NULL && impliesNN != NULL &&
+ ((impliesNN->sk_flags & SK_BT_NULLS_FIRST) ?
+ ScanDirectionIsForward(dir) :
+ ScanDirectionIsBackward(dir)))
+ {
+ /* Yes, so build the key in notnullkeys[keysCount] */
+ chosen = ¬nullkeys[keysCount];
+ ScanKeyEntryInitialize(chosen,
+ (SK_SEARCHNOTNULL | SK_ISNULL |
+ (impliesNN->sk_flags &
+ (SK_BT_DESC | SK_BT_NULLS_FIRST))),
+ curattr,
+ ((impliesNN->sk_flags & SK_BT_NULLS_FIRST) ?
+ BTGreaterStrategyNumber :
+ BTLessStrategyNumber),
+ InvalidOid,
+ InvalidOid,
+ InvalidOid,
+ (Datum) 0);
+ }
+
+ /*
+ * If we still didn't find a usable boundary key, quit; else
+ * save the boundary key pointer in startKeys.
*/
if (chosen == NULL)
break;
*/
curattr = cur->sk_attno;
chosen = NULL;
+ impliesNN = NULL;
}
- /* Can we use this key as a starting boundary for this attr? */
+ /*
+ * Can we use this key as a starting boundary for this attr?
+ *
+ * If not, does it imply a NOT NULL constraint? (Because
+ * SK_SEARCHNULL keys are always assigned BTEqualStrategyNumber,
+ * *any* inequality key works for that; we need not test.)
+ */
switch (cur->sk_strategy)
{
case BTLessStrategyNumber:
case BTLessEqualStrategyNumber:
- if (chosen == NULL && ScanDirectionIsBackward(dir))
- chosen = cur;
+ if (chosen == NULL)
+ {
+ if (ScanDirectionIsBackward(dir))
+ chosen = cur;
+ else
+ impliesNN = cur;
+ }
break;
case BTEqualStrategyNumber:
/* override any non-equality choice */
break;
case BTGreaterEqualStrategyNumber:
case BTGreaterStrategyNumber:
- if (chosen == NULL && ScanDirectionIsForward(dir))
- chosen = cur;
+ if (chosen == NULL)
+ {
+ if (ScanDirectionIsForward(dir))
+ chosen = cur;
+ else
+ impliesNN = cur;
+ }
break;
}
}
cur->sk_attno,
InvalidStrategy,
cur->sk_subtype,
+ cur->sk_collation,
procinfo,
cur->sk_argument);
}
cur->sk_attno,
InvalidStrategy,
cur->sk_subtype,
+ cur->sk_collation,
cmp_proc,
cur->sk_argument);
}
if (!BufferIsValid(buf))
{
- /* Only get here if index is completely empty */
+ /*
+ * We only get here if the index is completely empty. Lock relation
+ * because nothing finer to lock exists.
+ */
+ PredicateLockRelation(rel, scan->xs_snapshot);
return false;
}
+ else
+ PredicateLockPage(rel, BufferGetBlockNumber(buf),
+ scan->xs_snapshot);
/* initialize moreLeft/moreRight appropriately for scan direction */
if (ScanDirectionIsForward(dir))
LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
/* OK, itemIndex says what to return */
- scan->xs_ctup.t_self = so->currPos.items[so->currPos.itemIndex].heapTid;
+ currItem = &so->currPos.items[so->currPos.itemIndex];
+ scan->xs_ctup.t_self = currItem->heapTid;
+ if (scan->xs_want_itup)
+ scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);
return true;
}
* previously returned.
*
* On successful exit, scan->xs_ctup.t_self is set to the TID of the
- * next heap tuple, and so->currPos is updated as needed.
+ * next heap tuple, and if requested, scan->xs_itup points to a copy of
+ * the index tuple. so->currPos is updated as needed.
*
* On failure exit (no more tuples), we release pin and set
* so->currPos.buf to InvalidBuffer.
_bt_next(IndexScanDesc scan, ScanDirection dir)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ BTScanPosItem *currItem;
/*
* Advance to next tuple on current page; or if there's no more, try to
}
/* OK, itemIndex says what to return */
- scan->xs_ctup.t_self = so->currPos.items[so->currPos.itemIndex].heapTid;
+ currItem = &so->currPos.items[so->currPos.itemIndex];
+ scan->xs_ctup.t_self = currItem->heapTid;
+ if (scan->xs_want_itup)
+ scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);
return true;
}
OffsetNumber minoff;
OffsetNumber maxoff;
int itemIndex;
+ IndexTuple itup;
bool continuescan;
/* we must have the buffer pinned and locked */
*/
so->currPos.nextPage = opaque->btpo_next;
+ /* initialize tuple workspace to empty */
+ so->currPos.nextTupleOffset = 0;
+
if (ScanDirectionIsForward(dir))
{
/* load items[] in ascending order */
while (offnum <= maxoff)
{
- if (_bt_checkkeys(scan, page, offnum, dir, &continuescan))
+ itup = _bt_checkkeys(scan, page, offnum, dir, &continuescan);
+ if (itup != NULL)
{
/* tuple passes all scan key conditions, so remember it */
- /* _bt_checkkeys put the heap ptr into scan->xs_ctup.t_self */
- so->currPos.items[itemIndex].heapTid = scan->xs_ctup.t_self;
- so->currPos.items[itemIndex].indexOffset = offnum;
+ _bt_saveitem(so, itemIndex, offnum, itup);
itemIndex++;
}
if (!continuescan)
while (offnum >= minoff)
{
- if (_bt_checkkeys(scan, page, offnum, dir, &continuescan))
+ itup = _bt_checkkeys(scan, page, offnum, dir, &continuescan);
+ if (itup != NULL)
{
/* tuple passes all scan key conditions, so remember it */
- /* _bt_checkkeys put the heap ptr into scan->xs_ctup.t_self */
itemIndex--;
- so->currPos.items[itemIndex].heapTid = scan->xs_ctup.t_self;
- so->currPos.items[itemIndex].indexOffset = offnum;
+ _bt_saveitem(so, itemIndex, offnum, itup);
}
if (!continuescan)
{
return (so->currPos.firstItem <= so->currPos.lastItem);
}
+/* Save an index item into so->currPos.items[itemIndex] */
+static void
+_bt_saveitem(BTScanOpaque so, int itemIndex,
+ OffsetNumber offnum, IndexTuple itup)
+{
+ BTScanPosItem *currItem = &so->currPos.items[itemIndex];
+
+ currItem->heapTid = itup->t_tid;
+ currItem->indexOffset = offnum;
+ if (so->currTuples)
+ {
+ Size itupsz = IndexTupleSize(itup);
+
+ currItem->tupleOffset = so->currPos.nextTupleOffset;
+ memcpy(so->currTuples + so->currPos.nextTupleOffset, itup, itupsz);
+ so->currPos.nextTupleOffset += MAXALIGN(itupsz);
+ }
+}
+
/*
* _bt_steppage() -- Step to next page containing valid data for scan
*
memcpy(&so->markPos, &so->currPos,
offsetof(BTScanPosData, items[1]) +
so->currPos.lastItem * sizeof(BTScanPosItem));
+ if (so->markTuples)
+ memcpy(so->markTuples, so->currTuples,
+ so->currPos.nextTupleOffset);
so->markPos.itemIndex = so->markItemIndex;
so->markItemIndex = -1;
}
for (;;)
{
- /* if we're at end of scan, release the buffer and return */
+ /* release the previous buffer */
+ _bt_relbuf(rel, so->currPos.buf);
+ so->currPos.buf = InvalidBuffer;
+ /* if we're at end of scan, give up */
if (blkno == P_NONE || !so->currPos.moreRight)
- {
- _bt_relbuf(rel, so->currPos.buf);
- so->currPos.buf = InvalidBuffer;
return false;
- }
+ /* check for interrupts while we're not holding any buffer lock */
+ CHECK_FOR_INTERRUPTS();
/* step right one page */
- so->currPos.buf = _bt_relandgetbuf(rel, so->currPos.buf,
- blkno, BT_READ);
+ so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
/* check for deleted page */
page = BufferGetPage(so->currPos.buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
+ PredicateLockPage(rel, blkno, scan->xs_snapshot);
/* see if there are any matches on this page */
/* note that this will clear moreRight if we can stop */
if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque)))
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
+ PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf), scan->xs_snapshot);
/* see if there are any matches on this page */
/* note that this will clear moreLeft if we can stop */
if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
obknum = BufferGetBlockNumber(buf);
/* step left */
blkno = lblkno = opaque->btpo_prev;
- buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
+ _bt_relbuf(rel, buf);
+ /* check for interrupts while we're not holding any buffer lock */
+ CHECK_FOR_INTERRUPTS();
+ buf = _bt_getbuf(rel, blkno, BT_READ);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
buf = _bt_gettrueroot(rel);
if (!BufferIsValid(buf))
- {
- /* empty index... */
return InvalidBuffer;
- }
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Page page;
BTPageOpaque opaque;
OffsetNumber start;
+ BTScanPosItem *currItem;
/*
* Scan down to the leftmost or rightmost leaf page. This is a simplified
if (!BufferIsValid(buf))
{
- /* empty index... */
+ /*
+ * Empty index. Lock the whole relation, as nothing finer to lock
+ * exists.
+ */
+ PredicateLockRelation(rel, scan->xs_snapshot);
so->currPos.buf = InvalidBuffer;
return false;
}
+ PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(P_ISLEAF(opaque));
LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
/* OK, itemIndex says what to return */
- scan->xs_ctup.t_self = so->currPos.items[so->currPos.itemIndex].heapTid;
+ currItem = &so->currPos.items[so->currPos.itemIndex];
+ scan->xs_ctup.t_self = currItem->heapTid;
+ if (scan->xs_want_itup)
+ scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);
return true;
}