*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtutils.c,v 1.79 2006/10/04 00:29:49 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtutils.c,v 1.80 2006/12/28 23:16:39 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "miscadmin.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
+#include "utils/lsyscache.h"
+static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
+ ScanKey leftarg, ScanKey rightarg,
+ bool *result);
static void _bt_mark_scankey_required(ScanKey skey);
static bool _bt_check_rowcompare(ScanKey skey,
IndexTuple tuple, TupleDesc tupdesc,
}
-/*----------
+/*
* _bt_preprocess_keys() -- Preprocess scan keys
*
* The caller-supplied search-type keys (in scan->keyData[]) are copied to
*
* The primary purpose of this routine is to discover how many scan keys
* must be satisfied to continue the scan. It also attempts to eliminate
- * redundant keys and detect contradictory keys. At present, redundant and
- * contradictory keys can only be detected for same-data-type comparisons,
- * but that's the usual case so it seems worth doing.
+ * redundant keys and detect contradictory keys. (If the index opfamily
+ * provides incomplete sets of cross-type operators, we may fail to detect
+ * redundant or contradictory keys, but we can survive that.)
*
* The output keys must be sorted by index attribute. Presently we expect
* (but verify) that the input keys are already so sorted --- this is done
* If possible, redundant keys are eliminated: we keep only the tightest
* >/>= bound and the tightest </<= bound, and if there's an = key then
* that's the only one returned. (So, we return either a single = key,
- * or one or two boundary-condition keys for each attr.) However, we can
- * only detect redundant keys when the right-hand datatypes are all equal
- * to the index datatype, because we do not know suitable operators for
- * comparing right-hand values of two different datatypes. (In theory
- * we could handle comparison of a RHS of the index datatype with a RHS of
- * another type, but that seems too much pain for too little gain.) So,
- * keys whose operator has a nondefault subtype (ie, its RHS is not of the
- * index datatype) are ignored here, except for noting whether they include
- * an "=" condition or not. The logic about required keys still works if
- * we don't eliminate redundant keys.
+ * or one or two boundary-condition keys for each attr.) However, if we
+ * cannot compare two keys for lack of a suitable cross-type operator,
+ * we cannot eliminate either. If there are two such keys of the same
+ * operator strategy, the second one is just pushed into the output array
+ * without further processing here. We may also emit both >/>= or both
+ * </<= keys if we can't compare them. The logic about required keys still
+ * works if we don't eliminate redundant keys.
*
* As a byproduct of this work, we can detect contradictory quals such
- * as "x = 1 AND x > 2". If we see that, we return so->quals_ok = FALSE,
+ * as "x = 1 AND x > 2". If we see that, we return so->qual_ok = FALSE,
* indicating the scan need not be run at all since no tuples can match.
- * Again though, only keys with RHS datatype equal to the index datatype
- * can be checked for contradictions.
+ * (In this case we do not bother completing the output key array!)
+ * Again, missing cross-type operators might cause us to fail to prove the
+ * quals contradictory when they really are, but the scan will work correctly.
*
- * Row comparison keys are treated the same as comparisons to nondefault
- * datatypes: we just transfer them into the preprocessed array without any
+ * Row comparison keys are currently also treated without any smarts:
+ * we just transfer them into the preprocessed array without any
* editorialization. We can treat them the same as an ordinary inequality
* comparison on the row's first index column, for the purposes of the logic
* about required keys.
* storage is that we are modifying the array based on comparisons of the
* key argument values, which could change on a rescan. Therefore we can't
* overwrite the caller's data structure.
- *----------
*/
void
_bt_preprocess_keys(IndexScanDesc scan)
ScanKey outkeys;
ScanKey cur;
ScanKey xform[BTMaxStrategyNumber];
- bool hasOtherTypeEqual;
- Datum test;
+ bool test_result;
int i,
j;
AttrNumber attno;
/*
* Initialize for processing of keys for attr 1.
*
- * xform[i] points to the currently best scan key of strategy type i+1, if
- * any is found with a default operator subtype; it is NULL if we haven't
- * yet found such a key for this attr. Scan keys of nondefault subtypes
- * are transferred to the output with no processing except for noting if
- * they are of "=" type.
+ * xform[i] points to the currently best scan key of strategy type i+1;
+ * it is NULL if we haven't yet found such a key for this attr.
*/
attno = 1;
memset(xform, 0, sizeof(xform));
- hasOtherTypeEqual = false;
/*
* Loop iterates from 0 to numberOfKeys inclusive; we use the last pass to
elog(ERROR, "btree index keys must be ordered by attribute");
/*
- * If = has been specified, no other key will be used. In case of
- * key > 2 && key == 1 and so on we have to set qual_ok to false
- * before discarding the other keys.
+ * If = has been specified, all other keys can be eliminated as
+ * redundant. In case of key > 2 && key == 1 we can set qual_ok
+ * to false and abandon further processing.
*/
if (xform[BTEqualStrategyNumber - 1])
{
if (!chk || j == (BTEqualStrategyNumber - 1))
continue;
- test = FunctionCall2(&chk->sk_func,
- eq->sk_argument,
- chk->sk_argument);
- if (!DatumGetBool(test))
+ if (_bt_compare_scankey_args(scan, chk, eq, chk,
+ &test_result))
{
- so->qual_ok = false;
- break;
+ if (!test_result)
+ {
+ /* keys proven mutually contradictory */
+ so->qual_ok = false;
+ return;
+ }
+ /* else discard the redundant non-equality key */
+ xform[j] = NULL;
}
+ /* else, cannot determine redundancy, keep both keys */
}
- xform[BTLessStrategyNumber - 1] = NULL;
- xform[BTLessEqualStrategyNumber - 1] = NULL;
- xform[BTGreaterEqualStrategyNumber - 1] = NULL;
- xform[BTGreaterStrategyNumber - 1] = NULL;
/* track number of attrs for which we have "=" keys */
numberOfEqualCols++;
}
- else
- {
- /* track number of attrs for which we have "=" keys */
- if (hasOtherTypeEqual)
- numberOfEqualCols++;
- }
- /* keep only one of <, <= */
+ /* try to keep only one of <, <= */
if (xform[BTLessStrategyNumber - 1]
&& xform[BTLessEqualStrategyNumber - 1])
{
ScanKey lt = xform[BTLessStrategyNumber - 1];
ScanKey le = xform[BTLessEqualStrategyNumber - 1];
- test = FunctionCall2(&le->sk_func,
- lt->sk_argument,
- le->sk_argument);
- if (DatumGetBool(test))
- xform[BTLessEqualStrategyNumber - 1] = NULL;
- else
- xform[BTLessStrategyNumber - 1] = NULL;
+ if (_bt_compare_scankey_args(scan, le, lt, le,
+ &test_result))
+ {
+ if (test_result)
+ xform[BTLessEqualStrategyNumber - 1] = NULL;
+ else
+ xform[BTLessStrategyNumber - 1] = NULL;
+ }
}
- /* keep only one of >, >= */
+ /* try to keep only one of >, >= */
if (xform[BTGreaterStrategyNumber - 1]
&& xform[BTGreaterEqualStrategyNumber - 1])
{
ScanKey gt = xform[BTGreaterStrategyNumber - 1];
ScanKey ge = xform[BTGreaterEqualStrategyNumber - 1];
- test = FunctionCall2(&ge->sk_func,
- gt->sk_argument,
- ge->sk_argument);
- if (DatumGetBool(test))
- xform[BTGreaterEqualStrategyNumber - 1] = NULL;
- else
- xform[BTGreaterStrategyNumber - 1] = NULL;
+ if (_bt_compare_scankey_args(scan, ge, gt, ge,
+ &test_result))
+ {
+ if (test_result)
+ xform[BTGreaterEqualStrategyNumber - 1] = NULL;
+ else
+ xform[BTGreaterStrategyNumber - 1] = NULL;
+ }
}
/*
/* Re-initialize for new attno */
attno = cur->sk_attno;
memset(xform, 0, sizeof(xform));
- hasOtherTypeEqual = false;
}
/* check strategy this key's operator corresponds to */
j = cur->sk_strategy - 1;
- /* if row comparison or wrong RHS data type, punt */
- if ((cur->sk_flags & SK_ROW_HEADER) || cur->sk_subtype != InvalidOid)
+ /* if row comparison, push it directly to the output array */
+ if (cur->sk_flags & SK_ROW_HEADER)
{
ScanKey outkey = &outkeys[new_numberOfKeys++];
memcpy(outkey, cur, sizeof(ScanKeyData));
if (numberOfEqualCols == attno - 1)
_bt_mark_scankey_required(outkey);
- if (j == (BTEqualStrategyNumber - 1))
- hasOtherTypeEqual = true;
+ /*
+ * We don't support RowCompare using equality; such a qual would
+ * mess up the numberOfEqualCols tracking.
+ */
+ Assert(j != (BTEqualStrategyNumber - 1));
continue;
}
/* have we seen one of these before? */
- if (xform[j])
+ if (xform[j] == NULL)
{
- /* yup, keep the more restrictive key */
- test = FunctionCall2(&cur->sk_func,
- cur->sk_argument,
- xform[j]->sk_argument);
- if (DatumGetBool(test))
- xform[j] = cur;
- else if (j == (BTEqualStrategyNumber - 1))
- {
- /* key == a && key == b, but a != b */
- so->qual_ok = false;
- return;
- }
+ /* nope, so remember this scankey */
+ xform[j] = cur;
}
else
{
- /* nope, so remember this scankey */
- xform[j] = cur;
+ /* yup, keep only the more restrictive key */
+ if (_bt_compare_scankey_args(scan, cur, cur, xform[j],
+ &test_result))
+ {
+ if (test_result)
+ xform[j] = cur;
+ else if (j == (BTEqualStrategyNumber - 1))
+ {
+ /* key == a && key == b, but a != b */
+ so->qual_ok = false;
+ return;
+ }
+ /* else old key is more restrictive, keep it */
+ }
+ else
+ {
+ /*
+ * We can't determine which key is more restrictive. Keep
+ * the previous one in xform[j] and push this one directly
+ * to the output array.
+ */
+ ScanKey outkey = &outkeys[new_numberOfKeys++];
+
+ memcpy(outkey, cur, sizeof(ScanKeyData));
+ if (numberOfEqualCols == attno - 1)
+ _bt_mark_scankey_required(outkey);
+ }
}
}
so->numberOfKeys = new_numberOfKeys;
}
+/*
+ * Compare two scankey values using a specified operator. Both values
+ * must be already known non-NULL.
+ *
+ * The test we want to perform is logically "leftarg op rightarg", where
+ * leftarg and rightarg are the sk_argument values in those ScanKeys, and
+ * the comparison operator is the one in the op ScanKey. However, in
+ * cross-data-type situations we may need to look up the correct operator in
+ * the index's opfamily: it is the one having amopstrategy = op->sk_strategy
+ * and amoplefttype/amoprighttype equal to the two argument datatypes.
+ *
+ * If the opfamily doesn't supply a complete set of cross-type operators we
+ * may not be able to make the comparison. If we can make the comparison
+ * we store the operator result in *result and return TRUE. We return FALSE
+ * if the comparison could not be made.
+ *
+ * Note: op always points at the same ScanKey as either leftarg or rightarg.
+ * Since we don't scribble on the scankeys, this aliasing should cause no
+ * trouble.
+ */
+static bool
+_bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
+ ScanKey leftarg, ScanKey rightarg,
+ bool *result)
+{
+ Relation rel = scan->indexRelation;
+ Oid lefttype,
+ righttype,
+ optype,
+ opcintype,
+ cmp_op;
+
+ /*
+ * The opfamily we need to worry about is identified by the index column.
+ */
+ Assert(leftarg->sk_attno == rightarg->sk_attno);
+
+ opcintype = rel->rd_opcintype[leftarg->sk_attno - 1];
+
+ /*
+ * Determine the actual datatypes of the ScanKey arguments. We have to
+ * support the convention that sk_subtype == InvalidOid means the opclass
+ * input type; this is a hack to simplify life for ScanKeyInit().
+ */
+ lefttype = leftarg->sk_subtype;
+ if (lefttype == InvalidOid)
+ lefttype = opcintype;
+ righttype = rightarg->sk_subtype;
+ if (righttype == InvalidOid)
+ righttype = opcintype;
+ optype = op->sk_subtype;
+ if (optype == InvalidOid)
+ optype = opcintype;
+
+ /*
+ * If leftarg and rightarg match the types expected for the "op" scankey,
+ * we can use its already-looked-up comparison function.
+ */
+ if (lefttype == opcintype && righttype == optype)
+ {
+ *result = DatumGetBool(FunctionCall2(&op->sk_func,
+ leftarg->sk_argument,
+ rightarg->sk_argument));
+ return true;
+ }
+
+ /*
+ * Otherwise, we need to go to the syscache to find the appropriate
+ * operator. (This cannot result in infinite recursion, since no
+ * indexscan initiated by syscache lookup will use cross-data-type
+ * operators.)
+ */
+ cmp_op = get_opfamily_member(rel->rd_opfamily[leftarg->sk_attno - 1],
+ lefttype,
+ righttype,
+ op->sk_strategy);
+ if (OidIsValid(cmp_op))
+ {
+ RegProcedure cmp_proc = get_opcode(cmp_op);
+
+ if (RegProcedureIsValid(cmp_proc))
+ {
+ *result = DatumGetBool(OidFunctionCall2(cmp_proc,
+ leftarg->sk_argument,
+ rightarg->sk_argument));
+ return true;
+ }
+ }
+
+ /* Can't make the comparison */
+ *result = false; /* suppress compiler warnings */
+ return false;
+}
+
/*
* Mark a scankey as "required to continue the scan".
*