X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Faccess%2Fnbtree%2Fnbtutils.c;h=7e409d616fe401fa3843d2931b6e4eca9ab97099;hb=ab0dfc961b6a821f23d9c40c723d11380ce195a6;hp=b8b9ff780ea27afdd4bdab82b8a5590c3bdb4a92;hpb=f2f5b05655afa80377757a2c335c01b28de24429;p=postgresql diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index b8b9ff780e..7e409d616f 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -3,28 +3,56 @@ * nbtutils.c * Utility code for Postgres btree implementation. * - * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtutils.c,v 1.72 2006/03/05 15:58:21 momjian Exp $ + * src/backend/access/nbtree/nbtutils.c * *------------------------------------------------------------------------- */ #include "postgres.h" -#include "access/genam.h" -#include "access/nbtree.h" -#include "catalog/catalog.h" -#include "executor/execdebug.h" - +#include +#include "access/nbtree.h" +#include "access/reloptions.h" +#include "access/relscan.h" +#include "commands/progress.h" +#include "miscadmin.h" +#include "utils/array.h" +#include "utils/datum.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/rel.h" + + +typedef struct BTSortArrayContext +{ + FmgrInfo flinfo; + Oid collation; + bool reverse; +} BTSortArrayContext; + +static Datum _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey, + StrategyNumber strat, + Datum *elems, int nelems); +static int _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey, + bool reverse, + Datum *elems, int nelems); +static int _bt_compare_array_elements(const void *a, const void *b, void *arg); +static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op, + ScanKey leftarg, ScanKey rightarg, + bool *result); +static bool _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption); static void _bt_mark_scankey_required(ScanKey skey); static bool _bt_check_rowcompare(ScanKey skey, - IndexTuple tuple, TupleDesc tupdesc, - ScanDirection dir, bool *continuescan); + IndexTuple tuple, int tupnatts, TupleDesc tupdesc, + ScanDirection dir, bool *continuescan); +static int _bt_keep_natts(Relation rel, IndexTuple lastleft, + IndexTuple firstright, BTScanInsert itup_key); /* @@ -32,131 +60,621 @@ static bool _bt_check_rowcompare(ScanKey skey, * Build an insertion scan key that contains comparison data from itup * as well as comparator routines appropriate to the key datatypes. * - * The result is intended for use with _bt_compare(). + * When itup is a non-pivot tuple, the returned insertion scan key is + * suitable for finding a place for it to go on the leaf level. Pivot + * tuples can be used to re-find leaf page with matching high key, but + * then caller needs to set scan key's pivotsearch field to true. This + * allows caller to search for a leaf page with a matching high key, + * which is usually to the left of the first leaf page a non-pivot match + * might appear on. + * + * The result is intended for use with _bt_compare() and _bt_truncate(). + * Callers that don't need to fill out the insertion scankey arguments + * (e.g. they use an ad-hoc comparison routine, or only need a scankey + * for _bt_truncate()) can pass a NULL index tuple. The scankey will + * be initialized as if an "all truncated" pivot tuple was passed + * instead. + * + * Note that we may occasionally have to share lock the metapage to + * determine whether or not the keys in the index are expected to be + * unique (i.e. if this is a "heapkeyspace" index). We assume a + * heapkeyspace index when caller passes a NULL tuple, allowing index + * build callers to avoid accessing the non-existent metapage. */ -ScanKey +BTScanInsert _bt_mkscankey(Relation rel, IndexTuple itup) { + BTScanInsert key; ScanKey skey; TupleDesc itupdesc; - int natts; + int indnkeyatts; + int16 *indoption; + int tupnatts; int i; itupdesc = RelationGetDescr(rel); - natts = RelationGetNumberOfAttributes(rel); + indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); + indoption = rel->rd_indoption; + tupnatts = itup ? BTreeTupleGetNAtts(itup, rel) : 0; - skey = (ScanKey) palloc(natts * sizeof(ScanKeyData)); + Assert(tupnatts <= IndexRelationGetNumberOfAttributes(rel)); - for (i = 0; i < natts; i++) + /* + * We'll execute search using scan key constructed on key columns. + * Truncated attributes and non-key attributes are omitted from the final + * scan key. + */ + key = palloc(offsetof(BTScanInsertData, scankeys) + + sizeof(ScanKeyData) * indnkeyatts); + key->heapkeyspace = itup == NULL || _bt_heapkeyspace(rel); + key->nextkey = false; + key->pivotsearch = false; + key->keysz = Min(indnkeyatts, tupnatts); + key->scantid = key->heapkeyspace && itup ? + BTreeTupleGetHeapTID(itup) : NULL; + skey = key->scankeys; + for (i = 0; i < indnkeyatts; i++) { FmgrInfo *procinfo; Datum arg; bool null; + int flags; /* * We can use the cached (default) support procs since no cross-type * comparison can be needed. */ procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC); - arg = index_getattr(itup, i + 1, itupdesc, &null); + + /* + * Key arguments built from truncated attributes (or when caller + * provides no tuple) are defensively represented as NULL values. They + * should never be used. + */ + if (i < tupnatts) + arg = index_getattr(itup, i + 1, itupdesc, &null); + else + { + arg = (Datum) 0; + null = true; + } + flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT); ScanKeyEntryInitializeWithInfo(&skey[i], - null ? SK_ISNULL : 0, + flags, (AttrNumber) (i + 1), InvalidStrategy, InvalidOid, + rel->rd_indcollation[i], procinfo, arg); } - return skey; + return key; +} + +/* + * free a retracement stack made by _bt_search. + */ +void +_bt_freestack(BTStack stack) +{ + BTStack ostack; + + while (stack != NULL) + { + ostack = stack; + stack = stack->bts_parent; + pfree(ostack); + } } + /* - * _bt_mkscankey_nodata - * Build an insertion scan key that contains 3-way comparator routines - * appropriate to the key datatypes, but no comparison data. The - * comparison data ultimately used must match the key datatypes. + * _bt_preprocess_array_keys() -- Preprocess SK_SEARCHARRAY scan keys + * + * If there are any SK_SEARCHARRAY scan keys, deconstruct the array(s) and + * set up BTArrayKeyInfo info for each one that is an equality-type key. + * Prepare modified scan keys in so->arrayKeyData, which will hold the current + * array elements during each primitive indexscan operation. For inequality + * array keys, it's sufficient to find the extreme element value and replace + * the whole array with that scalar value. * - * The result cannot be used with _bt_compare(), unless comparison - * data is first stored into the key entries. Currently this - * routine is only called by nbtsort.c and tuplesort.c, which have - * their own comparison routines. + * Note: the reason we need so->arrayKeyData, rather than just scribbling + * on scan->keyData, is that callers are permitted to call btrescan without + * supplying a new set of scankey data. */ -ScanKey -_bt_mkscankey_nodata(Relation rel) +void +_bt_preprocess_array_keys(IndexScanDesc scan) { - ScanKey skey; - int natts; + BTScanOpaque so = (BTScanOpaque) scan->opaque; + int numberOfKeys = scan->numberOfKeys; + int16 *indoption = scan->indexRelation->rd_indoption; + int numArrayKeys; + ScanKey cur; int i; + MemoryContext oldContext; + + /* Quick check to see if there are any array keys */ + numArrayKeys = 0; + for (i = 0; i < numberOfKeys; i++) + { + cur = &scan->keyData[i]; + if (cur->sk_flags & SK_SEARCHARRAY) + { + numArrayKeys++; + Assert(!(cur->sk_flags & (SK_ROW_HEADER | SK_SEARCHNULL | SK_SEARCHNOTNULL))); + /* If any arrays are null as a whole, we can quit right now. */ + if (cur->sk_flags & SK_ISNULL) + { + so->numArrayKeys = -1; + so->arrayKeyData = NULL; + return; + } + } + } + + /* Quit if nothing to do. */ + if (numArrayKeys == 0) + { + so->numArrayKeys = 0; + so->arrayKeyData = NULL; + return; + } + + /* + * Make a scan-lifespan context to hold array-associated data, or reset it + * if we already have one from a previous rescan cycle. + */ + if (so->arrayContext == NULL) + so->arrayContext = AllocSetContextCreate(CurrentMemoryContext, + "BTree array context", + ALLOCSET_SMALL_SIZES); + else + MemoryContextReset(so->arrayContext); - natts = RelationGetNumberOfAttributes(rel); + oldContext = MemoryContextSwitchTo(so->arrayContext); - skey = (ScanKey) palloc(natts * sizeof(ScanKeyData)); + /* Create modifiable copy of scan->keyData in the workspace context */ + so->arrayKeyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData)); + memcpy(so->arrayKeyData, + scan->keyData, + scan->numberOfKeys * sizeof(ScanKeyData)); - for (i = 0; i < natts; i++) + /* Allocate space for per-array data in the workspace context */ + so->arrayKeys = (BTArrayKeyInfo *) palloc0(numArrayKeys * sizeof(BTArrayKeyInfo)); + + /* Now process each array key */ + numArrayKeys = 0; + for (i = 0; i < numberOfKeys; i++) { - FmgrInfo *procinfo; + ArrayType *arrayval; + int16 elmlen; + bool elmbyval; + char elmalign; + int num_elems; + Datum *elem_values; + bool *elem_nulls; + int num_nonnulls; + int j; + + cur = &so->arrayKeyData[i]; + if (!(cur->sk_flags & SK_SEARCHARRAY)) + continue; /* - * We can use the cached (default) support procs since no cross-type - * comparison can be needed. + * First, deconstruct the array into elements. Anything allocated + * here (including a possibly detoasted array value) is in the + * workspace context. */ - procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC); - ScanKeyEntryInitializeWithInfo(&skey[i], - SK_ISNULL, - (AttrNumber) (i + 1), - InvalidStrategy, - InvalidOid, - procinfo, - (Datum) 0); + arrayval = DatumGetArrayTypeP(cur->sk_argument); + /* We could cache this data, but not clear it's worth it */ + get_typlenbyvalalign(ARR_ELEMTYPE(arrayval), + &elmlen, &elmbyval, &elmalign); + deconstruct_array(arrayval, + ARR_ELEMTYPE(arrayval), + elmlen, elmbyval, elmalign, + &elem_values, &elem_nulls, &num_elems); + + /* + * Compress out any null elements. We can ignore them since we assume + * all btree operators are strict. + */ + num_nonnulls = 0; + for (j = 0; j < num_elems; j++) + { + if (!elem_nulls[j]) + elem_values[num_nonnulls++] = elem_values[j]; + } + + /* We could pfree(elem_nulls) now, but not worth the cycles */ + + /* If there's no non-nulls, the scan qual is unsatisfiable */ + if (num_nonnulls == 0) + { + numArrayKeys = -1; + break; + } + + /* + * If the comparison operator is not equality, then the array qual + * degenerates to a simple comparison against the smallest or largest + * non-null array element, as appropriate. + */ + switch (cur->sk_strategy) + { + case BTLessStrategyNumber: + case BTLessEqualStrategyNumber: + cur->sk_argument = + _bt_find_extreme_element(scan, cur, + BTGreaterStrategyNumber, + elem_values, num_nonnulls); + continue; + case BTEqualStrategyNumber: + /* proceed with rest of loop */ + break; + case BTGreaterEqualStrategyNumber: + case BTGreaterStrategyNumber: + cur->sk_argument = + _bt_find_extreme_element(scan, cur, + BTLessStrategyNumber, + elem_values, num_nonnulls); + continue; + default: + elog(ERROR, "unrecognized StrategyNumber: %d", + (int) cur->sk_strategy); + break; + } + + /* + * Sort the non-null elements and eliminate any duplicates. We must + * sort in the same ordering used by the index column, so that the + * successive primitive indexscans produce data in index order. + */ + num_elems = _bt_sort_array_elements(scan, cur, + (indoption[cur->sk_attno - 1] & INDOPTION_DESC) != 0, + elem_values, num_nonnulls); + + /* + * And set up the BTArrayKeyInfo data. + */ + so->arrayKeys[numArrayKeys].scan_key = i; + so->arrayKeys[numArrayKeys].num_elems = num_elems; + so->arrayKeys[numArrayKeys].elem_values = elem_values; + numArrayKeys++; + } + + so->numArrayKeys = numArrayKeys; + + MemoryContextSwitchTo(oldContext); +} + +/* + * _bt_find_extreme_element() -- get least or greatest array element + * + * scan and skey identify the index column, whose opfamily determines the + * comparison semantics. strat should be BTLessStrategyNumber to get the + * least element, or BTGreaterStrategyNumber to get the greatest. + */ +static Datum +_bt_find_extreme_element(IndexScanDesc scan, ScanKey skey, + StrategyNumber strat, + Datum *elems, int nelems) +{ + Relation rel = scan->indexRelation; + Oid elemtype, + cmp_op; + RegProcedure cmp_proc; + FmgrInfo flinfo; + Datum result; + int i; + + /* + * Determine the nominal datatype of the array elements. We have to + * support the convention that sk_subtype == InvalidOid means the opclass + * input type; this is a hack to simplify life for ScanKeyInit(). + */ + elemtype = skey->sk_subtype; + if (elemtype == InvalidOid) + elemtype = rel->rd_opcintype[skey->sk_attno - 1]; + + /* + * Look up the appropriate comparison operator in the opfamily. + * + * Note: it's possible that this would fail, if the opfamily is + * incomplete, but it seems quite unlikely that an opfamily would omit + * non-cross-type comparison operators for any datatype that it supports + * at all. + */ + cmp_op = get_opfamily_member(rel->rd_opfamily[skey->sk_attno - 1], + elemtype, + elemtype, + strat); + if (!OidIsValid(cmp_op)) + elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", + strat, elemtype, elemtype, + rel->rd_opfamily[skey->sk_attno - 1]); + cmp_proc = get_opcode(cmp_op); + if (!RegProcedureIsValid(cmp_proc)) + elog(ERROR, "missing oprcode for operator %u", cmp_op); + + fmgr_info(cmp_proc, &flinfo); + + Assert(nelems > 0); + result = elems[0]; + for (i = 1; i < nelems; i++) + { + if (DatumGetBool(FunctionCall2Coll(&flinfo, + skey->sk_collation, + elems[i], + result))) + result = elems[i]; + } + + return result; +} + +/* + * _bt_sort_array_elements() -- sort and de-dup array elements + * + * The array elements are sorted in-place, and the new number of elements + * after duplicate removal is returned. + * + * scan and skey identify the index column, whose opfamily determines the + * comparison semantics. If reverse is true, we sort in descending order. + */ +static int +_bt_sort_array_elements(IndexScanDesc scan, ScanKey skey, + bool reverse, + Datum *elems, int nelems) +{ + Relation rel = scan->indexRelation; + Oid elemtype; + RegProcedure cmp_proc; + BTSortArrayContext cxt; + int last_non_dup; + int i; + + if (nelems <= 1) + return nelems; /* no work to do */ + + /* + * Determine the nominal datatype of the array elements. We have to + * support the convention that sk_subtype == InvalidOid means the opclass + * input type; this is a hack to simplify life for ScanKeyInit(). + */ + elemtype = skey->sk_subtype; + if (elemtype == InvalidOid) + elemtype = rel->rd_opcintype[skey->sk_attno - 1]; + + /* + * Look up the appropriate comparison function in the opfamily. + * + * Note: it's possible that this would fail, if the opfamily is + * incomplete, but it seems quite unlikely that an opfamily would omit + * non-cross-type support functions for any datatype that it supports at + * all. + */ + cmp_proc = get_opfamily_proc(rel->rd_opfamily[skey->sk_attno - 1], + elemtype, + elemtype, + BTORDER_PROC); + if (!RegProcedureIsValid(cmp_proc)) + elog(ERROR, "missing support function %d(%u,%u) in opfamily %u", + BTORDER_PROC, elemtype, elemtype, + rel->rd_opfamily[skey->sk_attno - 1]); + + /* Sort the array elements */ + fmgr_info(cmp_proc, &cxt.flinfo); + cxt.collation = skey->sk_collation; + cxt.reverse = reverse; + qsort_arg((void *) elems, nelems, sizeof(Datum), + _bt_compare_array_elements, (void *) &cxt); + + /* Now scan the sorted elements and remove duplicates */ + last_non_dup = 0; + for (i = 1; i < nelems; i++) + { + int32 compare; + + compare = DatumGetInt32(FunctionCall2Coll(&cxt.flinfo, + cxt.collation, + elems[last_non_dup], + elems[i])); + if (compare != 0) + elems[++last_non_dup] = elems[i]; } - return skey; + return last_non_dup + 1; } /* - * free a scan key made by either _bt_mkscankey or _bt_mkscankey_nodata. + * qsort_arg comparator for sorting array elements + */ +static int +_bt_compare_array_elements(const void *a, const void *b, void *arg) +{ + Datum da = *((const Datum *) a); + Datum db = *((const Datum *) b); + BTSortArrayContext *cxt = (BTSortArrayContext *) arg; + int32 compare; + + compare = DatumGetInt32(FunctionCall2Coll(&cxt->flinfo, + cxt->collation, + da, db)); + if (cxt->reverse) + INVERT_COMPARE_RESULT(compare); + return compare; +} + +/* + * _bt_start_array_keys() -- Initialize array keys at start of a scan + * + * Set up the cur_elem counters and fill in the first sk_argument value for + * each array scankey. We can't do this until we know the scan direction. */ void -_bt_freeskey(ScanKey skey) +_bt_start_array_keys(IndexScanDesc scan, ScanDirection dir) { - pfree(skey); + BTScanOpaque so = (BTScanOpaque) scan->opaque; + int i; + + for (i = 0; i < so->numArrayKeys; i++) + { + BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i]; + ScanKey skey = &so->arrayKeyData[curArrayKey->scan_key]; + + Assert(curArrayKey->num_elems > 0); + if (ScanDirectionIsBackward(dir)) + curArrayKey->cur_elem = curArrayKey->num_elems - 1; + else + curArrayKey->cur_elem = 0; + skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem]; + } } /* - * free a retracement stack made by _bt_search. + * _bt_advance_array_keys() -- Advance to next set of array elements + * + * Returns true if there is another set of values to consider, false if not. + * On true result, the scankeys are initialized with the next set of values. + */ +bool +_bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + bool found = false; + int i; + + /* + * We must advance the last array key most quickly, since it will + * correspond to the lowest-order index column among the available + * qualifications. This is necessary to ensure correct ordering of output + * when there are multiple array keys. + */ + for (i = so->numArrayKeys - 1; i >= 0; i--) + { + BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i]; + ScanKey skey = &so->arrayKeyData[curArrayKey->scan_key]; + int cur_elem = curArrayKey->cur_elem; + int num_elems = curArrayKey->num_elems; + + if (ScanDirectionIsBackward(dir)) + { + if (--cur_elem < 0) + { + cur_elem = num_elems - 1; + found = false; /* need to advance next array key */ + } + else + found = true; + } + else + { + if (++cur_elem >= num_elems) + { + cur_elem = 0; + found = false; /* need to advance next array key */ + } + else + found = true; + } + + curArrayKey->cur_elem = cur_elem; + skey->sk_argument = curArrayKey->elem_values[cur_elem]; + if (found) + break; + } + + /* advance parallel scan */ + if (scan->parallel_scan != NULL) + _bt_parallel_advance_array_keys(scan); + + return found; +} + +/* + * _bt_mark_array_keys() -- Handle array keys during btmarkpos + * + * Save the current state of the array keys as the "mark" position. */ void -_bt_freestack(BTStack stack) +_bt_mark_array_keys(IndexScanDesc scan) { - BTStack ostack; + BTScanOpaque so = (BTScanOpaque) scan->opaque; + int i; - while (stack != NULL) + for (i = 0; i < so->numArrayKeys; i++) { - ostack = stack; - stack = stack->bts_parent; - pfree(ostack); + BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i]; + + curArrayKey->mark_elem = curArrayKey->cur_elem; + } +} + +/* + * _bt_restore_array_keys() -- Handle array keys during btrestrpos + * + * Restore the array keys to where they were when the mark was set. + */ +void +_bt_restore_array_keys(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + bool changed = false; + int i; + + /* Restore each array key to its position when the mark was set */ + for (i = 0; i < so->numArrayKeys; i++) + { + BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i]; + ScanKey skey = &so->arrayKeyData[curArrayKey->scan_key]; + int mark_elem = curArrayKey->mark_elem; + + if (curArrayKey->cur_elem != mark_elem) + { + curArrayKey->cur_elem = mark_elem; + skey->sk_argument = curArrayKey->elem_values[mark_elem]; + changed = true; + } + } + + /* + * If we changed any keys, we must redo _bt_preprocess_keys. That might + * sound like overkill, but in cases with multiple keys per index column + * it seems necessary to do the full set of pushups. + */ + if (changed) + { + _bt_preprocess_keys(scan); + /* The mark should have been set on a consistent set of keys... */ + Assert(so->qual_ok); } } -/*---------- +/* * _bt_preprocess_keys() -- Preprocess scan keys * - * The caller-supplied search-type keys (in scan->keyData[]) are copied to - * so->keyData[] with possible transformation. scan->numberOfKeys is - * the number of input keys, so->numberOfKeys gets the number of output - * keys (possibly less, never greater). + * The given search-type keys (in scan->keyData[] or so->arrayKeyData[]) + * are copied to so->keyData[] with possible transformation. + * scan->numberOfKeys is the number of input keys, so->numberOfKeys gets + * the number of output keys (possibly less, never greater). + * + * The output keys are marked with additional sk_flag bits beyond the + * system-standard bits supplied by the caller. The DESC and NULLS_FIRST + * indoption bits for the relevant index attribute are copied into the flags. + * Also, for a DESC column, we commute (flip) all the sk_strategy numbers + * so that the index sorts in the desired direction. * - * The primary purpose of this routine is to discover how many scan keys - * must be satisfied to continue the scan. It also attempts to eliminate - * redundant keys and detect contradictory keys. At present, redundant and - * contradictory keys can only be detected for same-data-type comparisons, - * but that's the usual case so it seems worth doing. + * One key purpose of this routine is to discover which scan keys must be + * satisfied to continue the scan. It also attempts to eliminate redundant + * keys and detect contradictory keys. (If the index opfamily provides + * incomplete sets of cross-type operators, we may fail to detect redundant + * or contradictory keys, but we can survive that.) * * The output keys must be sorted by index attribute. Presently we expect * (but verify) that the input keys are already so sorted --- this is done - * by group_clauses_by_indexkey() in indxpath.c. Some reordering of the keys + * by match_clauses_to_index() in indxpath.c. Some reordering of the keys * within each attribute may be done as a byproduct of the processing here, * but no other code depends on that. * @@ -179,54 +697,58 @@ _bt_freestack(BTStack stack) * If possible, redundant keys are eliminated: we keep only the tightest * >/>= bound and the tightest />= or both + * 4::int AND x > 10::bigint", and we are unable to determine + * which key is more restrictive for lack of a suitable cross-type operator. + * _bt_first will arbitrarily pick one of the keys to do the initial + * positioning with. If it picks x > 4, then the x > 10 condition will fail + * until we reach index entries > 10; but we can't stop the scan just because + * x > 10 is failing. On the other hand, if we are scanning backwards, then + * failure of either key is indeed enough to stop the scan. (In general, when + * inequality keys are present, the initial-positioning code only promises to + * position before the first possible match, not exactly at the first match, + * for a forward scan; or after the last match for a backward scan.) * * As a byproduct of this work, we can detect contradictory quals such - * as "x = 1 AND x > 2". If we see that, we return so->quals_ok = FALSE, + * as "x = 1 AND x > 2". If we see that, we return so->qual_ok = false, * indicating the scan need not be run at all since no tuples can match. - * Again though, only keys with RHS datatype equal to the index datatype - * can be checked for contradictions. - * - * Furthermore, we detect the case where the index is unique and we have - * equality quals for all columns. In this case there can be at most one - * (visible) matching tuple. index_getnext uses this to avoid uselessly - * continuing the scan after finding one match. + * (In this case we do not bother completing the output key array!) + * Again, missing cross-type operators might cause us to fail to prove the + * quals contradictory when they really are, but the scan will work correctly. * - * Row comparison keys are treated the same as comparisons to nondefault - * datatypes: we just transfer them into the preprocessed array without any + * Row comparison keys are currently also treated without any smarts: + * we just transfer them into the preprocessed array without any * editorialization. We can treat them the same as an ordinary inequality * comparison on the row's first index column, for the purposes of the logic * about required keys. * * Note: the reason we have to copy the preprocessed scan keys into private * storage is that we are modifying the array based on comparisons of the - * key argument values, which could change on a rescan. Therefore we can't - * overwrite the caller's data structure. - *---------- + * key argument values, which could change on a rescan or after moving to + * new elements of array keys. Therefore we can't overwrite the source data. */ void _bt_preprocess_keys(IndexScanDesc scan) { - Relation relation = scan->indexRelation; BTScanOpaque so = (BTScanOpaque) scan->opaque; int numberOfKeys = scan->numberOfKeys; + int16 *indoption = scan->indexRelation->rd_indoption; int new_numberOfKeys; int numberOfEqualCols; ScanKey inkeys; ScanKey outkeys; ScanKey cur; ScanKey xform[BTMaxStrategyNumber]; - bool hasOtherTypeEqual; - Datum test; + bool test_result; int i, j; AttrNumber attno; @@ -234,12 +756,18 @@ _bt_preprocess_keys(IndexScanDesc scan) /* initialize result variables */ so->qual_ok = true; so->numberOfKeys = 0; - scan->keys_are_unique = false; if (numberOfKeys < 1) return; /* done if qual-less scan */ - inkeys = scan->keyData; + /* + * Read so->arrayKeyData if array keys are present, else scan->keyData + */ + if (so->arrayKeyData != NULL) + inkeys = so->arrayKeyData; + else + inkeys = scan->keyData; + outkeys = so->keyData; cur = &inkeys[0]; /* we check that input keys are correctly ordered */ @@ -249,21 +777,10 @@ _bt_preprocess_keys(IndexScanDesc scan) /* We can short-circuit most of the work if there's just one key */ if (numberOfKeys == 1) { - /* - * We don't use indices for 'A is null' and 'A is not null' currently - * and 'A < = > <> NULL' will always fail - so qual is not OK if - * comparison value is NULL. - vadim 03/21/97 - */ - if (cur->sk_flags & SK_ISNULL) + /* Apply indoption to scankey (might change sk_strategy!) */ + if (!_bt_fix_scankey_strategy(cur, indoption)) so->qual_ok = false; - else if (relation->rd_index->indisunique && - relation->rd_rel->relnatts == 1) - { - /* it's a unique index, do we have an equality qual? */ - if (cur->sk_strategy == BTEqualStrategyNumber) - scan->keys_are_unique = true; - } - memcpy(outkeys, inkeys, sizeof(ScanKeyData)); + memcpy(outkeys, cur, sizeof(ScanKeyData)); so->numberOfKeys = 1; /* We can mark the qual as required if it's for first index col */ if (cur->sk_attno == 1) @@ -280,15 +797,11 @@ _bt_preprocess_keys(IndexScanDesc scan) /* * Initialize for processing of keys for attr 1. * - * xform[i] points to the currently best scan key of strategy type i+1, if - * any is found with a default operator subtype; it is NULL if we haven't - * yet found such a key for this attr. Scan keys of nondefault subtypes - * are transferred to the output with no processing except for noting if - * they are of "=" type. + * xform[i] points to the currently best scan key of strategy type i+1; it + * is NULL if we haven't yet found such a key for this attr. */ attno = 1; memset(xform, 0, sizeof(xform)); - hasOtherTypeEqual = false; /* * Loop iterates from 0 to numberOfKeys inclusive; we use the last pass to @@ -299,16 +812,11 @@ _bt_preprocess_keys(IndexScanDesc scan) { if (i < numberOfKeys) { - /* See comments above: any NULL implies cannot match qual */ - /* Note: we assume SK_ISNULL is never set in a row header key */ - if (cur->sk_flags & SK_ISNULL) + /* Apply indoption to scankey (might change sk_strategy!) */ + if (!_bt_fix_scankey_strategy(cur, indoption)) { + /* NULL can't be matched, so give up */ so->qual_ok = false; - - /* - * Quit processing so we don't try to invoke comparison - * routines on NULLs. - */ return; } } @@ -326,9 +834,15 @@ _bt_preprocess_keys(IndexScanDesc scan) elog(ERROR, "btree index keys must be ordered by attribute"); /* - * If = has been specified, no other key will be used. In case of - * key > 2 && key == 1 and so on we have to set qual_ok to false - * before discarding the other keys. + * If = has been specified, all other keys can be eliminated as + * redundant. If we have a case like key = 1 AND key > 2, we can + * set qual_ok to false and abandon further processing. + * + * We also have to deal with the case of "key IS NULL", which is + * unsatisfiable in combination with any other index condition. By + * the time we get here, that's been classified as an equality + * check, and we've rejected any combination of it with a regular + * equality condition; but not with other types of conditions. */ if (xform[BTEqualStrategyNumber - 1]) { @@ -340,59 +854,64 @@ _bt_preprocess_keys(IndexScanDesc scan) if (!chk || j == (BTEqualStrategyNumber - 1)) continue; - test = FunctionCall2(&chk->sk_func, - eq->sk_argument, - chk->sk_argument); - if (!DatumGetBool(test)) + + if (eq->sk_flags & SK_SEARCHNULL) { + /* IS NULL is contradictory to anything else */ so->qual_ok = false; - break; + return; } + + if (_bt_compare_scankey_args(scan, chk, eq, chk, + &test_result)) + { + if (!test_result) + { + /* keys proven mutually contradictory */ + so->qual_ok = false; + return; + } + /* else discard the redundant non-equality key */ + xform[j] = NULL; + } + /* else, cannot determine redundancy, keep both keys */ } - xform[BTLessStrategyNumber - 1] = NULL; - xform[BTLessEqualStrategyNumber - 1] = NULL; - xform[BTGreaterEqualStrategyNumber - 1] = NULL; - xform[BTGreaterStrategyNumber - 1] = NULL; /* track number of attrs for which we have "=" keys */ numberOfEqualCols++; } - else - { - /* track number of attrs for which we have "=" keys */ - if (hasOtherTypeEqual) - numberOfEqualCols++; - } - /* keep only one of <, <= */ + /* try to keep only one of <, <= */ if (xform[BTLessStrategyNumber - 1] && xform[BTLessEqualStrategyNumber - 1]) { ScanKey lt = xform[BTLessStrategyNumber - 1]; ScanKey le = xform[BTLessEqualStrategyNumber - 1]; - test = FunctionCall2(&le->sk_func, - lt->sk_argument, - le->sk_argument); - if (DatumGetBool(test)) - xform[BTLessEqualStrategyNumber - 1] = NULL; - else - xform[BTLessStrategyNumber - 1] = NULL; + if (_bt_compare_scankey_args(scan, le, lt, le, + &test_result)) + { + if (test_result) + xform[BTLessEqualStrategyNumber - 1] = NULL; + else + xform[BTLessStrategyNumber - 1] = NULL; + } } - /* keep only one of >, >= */ + /* try to keep only one of >, >= */ if (xform[BTGreaterStrategyNumber - 1] && xform[BTGreaterEqualStrategyNumber - 1]) { ScanKey gt = xform[BTGreaterStrategyNumber - 1]; ScanKey ge = xform[BTGreaterEqualStrategyNumber - 1]; - test = FunctionCall2(&ge->sk_func, - gt->sk_argument, - ge->sk_argument); - if (DatumGetBool(test)) - xform[BTGreaterEqualStrategyNumber - 1] = NULL; - else - xform[BTGreaterStrategyNumber - 1] = NULL; + if (_bt_compare_scankey_args(scan, ge, gt, ge, + &test_result)) + { + if (test_result) + xform[BTGreaterEqualStrategyNumber - 1] = NULL; + else + xform[BTGreaterStrategyNumber - 1] = NULL; + } } /* @@ -421,57 +940,341 @@ _bt_preprocess_keys(IndexScanDesc scan) /* Re-initialize for new attno */ attno = cur->sk_attno; memset(xform, 0, sizeof(xform)); - hasOtherTypeEqual = false; } /* check strategy this key's operator corresponds to */ j = cur->sk_strategy - 1; - /* if row comparison or wrong RHS data type, punt */ - if ((cur->sk_flags & SK_ROW_HEADER) || cur->sk_subtype != InvalidOid) + /* if row comparison, push it directly to the output array */ + if (cur->sk_flags & SK_ROW_HEADER) { ScanKey outkey = &outkeys[new_numberOfKeys++]; memcpy(outkey, cur, sizeof(ScanKeyData)); if (numberOfEqualCols == attno - 1) _bt_mark_scankey_required(outkey); - if (j == (BTEqualStrategyNumber - 1)) - hasOtherTypeEqual = true; + + /* + * We don't support RowCompare using equality; such a qual would + * mess up the numberOfEqualCols tracking. + */ + Assert(j != (BTEqualStrategyNumber - 1)); continue; } /* have we seen one of these before? */ - if (xform[j]) - { - /* yup, keep the more restrictive key */ - test = FunctionCall2(&cur->sk_func, - cur->sk_argument, - xform[j]->sk_argument); - if (DatumGetBool(test)) - xform[j] = cur; - else if (j == (BTEqualStrategyNumber - 1)) - { - /* key == a && key == b, but a != b */ - so->qual_ok = false; - return; - } - } - else + if (xform[j] == NULL) { /* nope, so remember this scankey */ xform[j] = cur; } + else + { + /* yup, keep only the more restrictive key */ + if (_bt_compare_scankey_args(scan, cur, cur, xform[j], + &test_result)) + { + if (test_result) + xform[j] = cur; + else if (j == (BTEqualStrategyNumber - 1)) + { + /* key == a && key == b, but a != b */ + so->qual_ok = false; + return; + } + /* else old key is more restrictive, keep it */ + } + else + { + /* + * We can't determine which key is more restrictive. Keep the + * previous one in xform[j] and push this one directly to the + * output array. + */ + ScanKey outkey = &outkeys[new_numberOfKeys++]; + + memcpy(outkey, cur, sizeof(ScanKeyData)); + if (numberOfEqualCols == attno - 1) + _bt_mark_scankey_required(outkey); + } + } } so->numberOfKeys = new_numberOfKeys; - - /* - * If unique index and we have equality keys for all columns, set - * keys_are_unique flag for higher levels. - */ - if (relation->rd_index->indisunique && - relation->rd_rel->relnatts == numberOfEqualCols) - scan->keys_are_unique = true; +} + +/* + * Compare two scankey values using a specified operator. + * + * The test we want to perform is logically "leftarg op rightarg", where + * leftarg and rightarg are the sk_argument values in those ScanKeys, and + * the comparison operator is the one in the op ScanKey. However, in + * cross-data-type situations we may need to look up the correct operator in + * the index's opfamily: it is the one having amopstrategy = op->sk_strategy + * and amoplefttype/amoprighttype equal to the two argument datatypes. + * + * If the opfamily doesn't supply a complete set of cross-type operators we + * may not be able to make the comparison. If we can make the comparison + * we store the operator result in *result and return true. We return false + * if the comparison could not be made. + * + * Note: op always points at the same ScanKey as either leftarg or rightarg. + * Since we don't scribble on the scankeys, this aliasing should cause no + * trouble. + * + * Note: this routine needs to be insensitive to any DESC option applied + * to the index column. For example, "x < 4" is a tighter constraint than + * "x < 5" regardless of which way the index is sorted. + */ +static bool +_bt_compare_scankey_args(IndexScanDesc scan, ScanKey op, + ScanKey leftarg, ScanKey rightarg, + bool *result) +{ + Relation rel = scan->indexRelation; + Oid lefttype, + righttype, + optype, + opcintype, + cmp_op; + StrategyNumber strat; + + /* + * First, deal with cases where one or both args are NULL. This should + * only happen when the scankeys represent IS NULL/NOT NULL conditions. + */ + if ((leftarg->sk_flags | rightarg->sk_flags) & SK_ISNULL) + { + bool leftnull, + rightnull; + + if (leftarg->sk_flags & SK_ISNULL) + { + Assert(leftarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL)); + leftnull = true; + } + else + leftnull = false; + if (rightarg->sk_flags & SK_ISNULL) + { + Assert(rightarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL)); + rightnull = true; + } + else + rightnull = false; + + /* + * We treat NULL as either greater than or less than all other values. + * Since true > false, the tests below work correctly for NULLS LAST + * logic. If the index is NULLS FIRST, we need to flip the strategy. + */ + strat = op->sk_strategy; + if (op->sk_flags & SK_BT_NULLS_FIRST) + strat = BTCommuteStrategyNumber(strat); + + switch (strat) + { + case BTLessStrategyNumber: + *result = (leftnull < rightnull); + break; + case BTLessEqualStrategyNumber: + *result = (leftnull <= rightnull); + break; + case BTEqualStrategyNumber: + *result = (leftnull == rightnull); + break; + case BTGreaterEqualStrategyNumber: + *result = (leftnull >= rightnull); + break; + case BTGreaterStrategyNumber: + *result = (leftnull > rightnull); + break; + default: + elog(ERROR, "unrecognized StrategyNumber: %d", (int) strat); + *result = false; /* keep compiler quiet */ + break; + } + return true; + } + + /* + * The opfamily we need to worry about is identified by the index column. + */ + Assert(leftarg->sk_attno == rightarg->sk_attno); + + opcintype = rel->rd_opcintype[leftarg->sk_attno - 1]; + + /* + * Determine the actual datatypes of the ScanKey arguments. We have to + * support the convention that sk_subtype == InvalidOid means the opclass + * input type; this is a hack to simplify life for ScanKeyInit(). + */ + lefttype = leftarg->sk_subtype; + if (lefttype == InvalidOid) + lefttype = opcintype; + righttype = rightarg->sk_subtype; + if (righttype == InvalidOid) + righttype = opcintype; + optype = op->sk_subtype; + if (optype == InvalidOid) + optype = opcintype; + + /* + * If leftarg and rightarg match the types expected for the "op" scankey, + * we can use its already-looked-up comparison function. + */ + if (lefttype == opcintype && righttype == optype) + { + *result = DatumGetBool(FunctionCall2Coll(&op->sk_func, + op->sk_collation, + leftarg->sk_argument, + rightarg->sk_argument)); + return true; + } + + /* + * Otherwise, we need to go to the syscache to find the appropriate + * operator. (This cannot result in infinite recursion, since no + * indexscan initiated by syscache lookup will use cross-data-type + * operators.) + * + * If the sk_strategy was flipped by _bt_fix_scankey_strategy, we have to + * un-flip it to get the correct opfamily member. + */ + strat = op->sk_strategy; + if (op->sk_flags & SK_BT_DESC) + strat = BTCommuteStrategyNumber(strat); + + cmp_op = get_opfamily_member(rel->rd_opfamily[leftarg->sk_attno - 1], + lefttype, + righttype, + strat); + if (OidIsValid(cmp_op)) + { + RegProcedure cmp_proc = get_opcode(cmp_op); + + if (RegProcedureIsValid(cmp_proc)) + { + *result = DatumGetBool(OidFunctionCall2Coll(cmp_proc, + op->sk_collation, + leftarg->sk_argument, + rightarg->sk_argument)); + return true; + } + } + + /* Can't make the comparison */ + *result = false; /* suppress compiler warnings */ + return false; +} + +/* + * Adjust a scankey's strategy and flags setting as needed for indoptions. + * + * We copy the appropriate indoption value into the scankey sk_flags + * (shifting to avoid clobbering system-defined flag bits). Also, if + * the DESC option is set, commute (flip) the operator strategy number. + * + * A secondary purpose is to check for IS NULL/NOT NULL scankeys and set up + * the strategy field correctly for them. + * + * Lastly, for ordinary scankeys (not IS NULL/NOT NULL), we check for a + * NULL comparison value. Since all btree operators are assumed strict, + * a NULL means that the qual cannot be satisfied. We return true if the + * comparison value isn't NULL, or false if the scan should be abandoned. + * + * This function is applied to the *input* scankey structure; therefore + * on a rescan we will be looking at already-processed scankeys. Hence + * we have to be careful not to re-commute the strategy if we already did it. + * It's a bit ugly to modify the caller's copy of the scankey but in practice + * there shouldn't be any problem, since the index's indoptions are certainly + * not going to change while the scankey survives. + */ +static bool +_bt_fix_scankey_strategy(ScanKey skey, int16 *indoption) +{ + int addflags; + + addflags = indoption[skey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT; + + /* + * We treat all btree operators as strict (even if they're not so marked + * in pg_proc). This means that it is impossible for an operator condition + * with a NULL comparison constant to succeed, and we can reject it right + * away. + * + * However, we now also support "x IS NULL" clauses as search conditions, + * so in that case keep going. The planner has not filled in any + * particular strategy in this case, so set it to BTEqualStrategyNumber + * --- we can treat IS NULL as an equality operator for purposes of search + * strategy. + * + * Likewise, "x IS NOT NULL" is supported. We treat that as either "less + * than NULL" in a NULLS LAST index, or "greater than NULL" in a NULLS + * FIRST index. + * + * Note: someday we might have to fill in sk_collation from the index + * column's collation. At the moment this is a non-issue because we'll + * never actually call the comparison operator on a NULL. + */ + if (skey->sk_flags & SK_ISNULL) + { + /* SK_ISNULL shouldn't be set in a row header scankey */ + Assert(!(skey->sk_flags & SK_ROW_HEADER)); + + /* Set indoption flags in scankey (might be done already) */ + skey->sk_flags |= addflags; + + /* Set correct strategy for IS NULL or NOT NULL search */ + if (skey->sk_flags & SK_SEARCHNULL) + { + skey->sk_strategy = BTEqualStrategyNumber; + skey->sk_subtype = InvalidOid; + skey->sk_collation = InvalidOid; + } + else if (skey->sk_flags & SK_SEARCHNOTNULL) + { + if (skey->sk_flags & SK_BT_NULLS_FIRST) + skey->sk_strategy = BTGreaterStrategyNumber; + else + skey->sk_strategy = BTLessStrategyNumber; + skey->sk_subtype = InvalidOid; + skey->sk_collation = InvalidOid; + } + else + { + /* regular qual, so it cannot be satisfied */ + return false; + } + + /* Needn't do the rest */ + return true; + } + + /* Adjust strategy for DESC, if we didn't already */ + if ((addflags & SK_BT_DESC) && !(skey->sk_flags & SK_BT_DESC)) + skey->sk_strategy = BTCommuteStrategyNumber(skey->sk_strategy); + skey->sk_flags |= addflags; + + /* If it's a row header, fix row member flags and strategies similarly */ + if (skey->sk_flags & SK_ROW_HEADER) + { + ScanKey subkey = (ScanKey) DatumGetPointer(skey->sk_argument); + + for (;;) + { + Assert(subkey->sk_flags & SK_ROW_MEMBER); + addflags = indoption[subkey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT; + if ((addflags & SK_BT_DESC) && !(subkey->sk_flags & SK_BT_DESC)) + subkey->sk_strategy = BTCommuteStrategyNumber(subkey->sk_strategy); + subkey->sk_flags |= addflags; + if (subkey->sk_flags & SK_ROW_END) + break; + subkey++; + } + } + + return true; } /* @@ -479,11 +1282,9 @@ _bt_preprocess_keys(IndexScanDesc scan) * * Depending on the operator type, the key may be required for both scan * directions or just one. Also, if the key is a row comparison header, - * we have to mark the appropriate subsidiary ScanKeys as required. In - * such cases, the first subsidiary key is required, but subsequent ones - * are required only as long as they correspond to successive index columns. - * Otherwise the row comparison ordering is different from the index ordering - * and so we can't stop the scan on the basis of those lower-order columns. + * we have to mark its first subsidiary ScanKey as required. (Subsequent + * subsidiary ScanKeys are normally for lower-order columns, and thus + * cannot be required, since they're after the first non-equality scankey.) * * Note: when we set required-key flag bits in a subsidiary scankey, we are * scribbling on a data structure belonging to the index AM's caller, not on @@ -494,7 +1295,7 @@ _bt_preprocess_keys(IndexScanDesc scan) static void _bt_mark_scankey_required(ScanKey skey) { - int addflags; + int addflags; switch (skey->sk_strategy) { @@ -520,95 +1321,47 @@ _bt_mark_scankey_required(ScanKey skey) if (skey->sk_flags & SK_ROW_HEADER) { - ScanKey subkey = (ScanKey) DatumGetPointer(skey->sk_argument); - AttrNumber attno = skey->sk_attno; - - /* First subkey should be same as the header says */ - Assert(subkey->sk_attno == attno); + ScanKey subkey = (ScanKey) DatumGetPointer(skey->sk_argument); - for (;;) - { - Assert(subkey->sk_flags & SK_ROW_MEMBER); - Assert(subkey->sk_strategy == skey->sk_strategy); - if (subkey->sk_attno != attno) - break; /* non-adjacent key, so not required */ - subkey->sk_flags |= addflags; - if (subkey->sk_flags & SK_ROW_END) - break; - subkey++; - attno++; - } + /* First subkey should be same column/operator as the header */ + Assert(subkey->sk_flags & SK_ROW_MEMBER); + Assert(subkey->sk_attno == skey->sk_attno); + Assert(subkey->sk_strategy == skey->sk_strategy); + subkey->sk_flags |= addflags; } } /* * Test whether an indextuple satisfies all the scankey conditions. * - * If so, copy its TID into scan->xs_ctup.t_self, and return TRUE. - * If not, return FALSE (xs_ctup is not changed). + * Return true if so, false if not. If the tuple fails to pass the qual, + * we also determine whether there's any need to continue the scan beyond + * this tuple, and set *continuescan accordingly. See comments for + * _bt_preprocess_keys(), above, about how this is done. * - * If the tuple fails to pass the qual, we also determine whether there's - * any need to continue the scan beyond this tuple, and set *continuescan - * accordingly. See comments for _bt_preprocess_keys(), above, about how - * this is done. + * Forward scan callers can pass a high key tuple in the hopes of having + * us set *continuescanthat to false, and avoiding an unnecessary visit to + * the page to the right. * * scan: index scan descriptor (containing a search-type scankey) - * page: buffer page containing index tuple - * offnum: offset number of index tuple (must be a valid item!) + * tuple: index tuple to test + * tupnatts: number of attributes in tupnatts (high key may be truncated) * dir: direction we are scanning in * continuescan: output parameter (will be set correctly in all cases) */ bool -_bt_checkkeys(IndexScanDesc scan, - Page page, OffsetNumber offnum, +_bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, int tupnatts, ScanDirection dir, bool *continuescan) { - ItemId iid = PageGetItemId(page, offnum); - bool tuple_valid; - IndexTuple tuple; TupleDesc tupdesc; BTScanOpaque so; int keysz; int ikey; ScanKey key; - *continuescan = true; /* default assumption */ - - /* - * If the scan specifies not to return killed tuples, then we treat - * a killed tuple as not passing the qual. Most of the time, it's a - * win to not bother examining the tuple's index keys, but just return - * immediately with continuescan = true to proceed to the next tuple. - * However, if this is the last tuple on the page, we should check - * the index keys to prevent uselessly advancing to the next page. - */ - if (scan->ignore_killed_tuples && ItemIdDeleted(iid)) - { - /* return immediately if there are more tuples on the page */ - if (ScanDirectionIsForward(dir)) - { - if (offnum < PageGetMaxOffsetNumber(page)) - return false; - } - else - { - BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); - - if (offnum > P_FIRSTDATAKEY(opaque)) - return false; - } - /* - * OK, we want to check the keys, but we'll return FALSE even - * if the tuple passes the key tests. - */ - tuple_valid = false; - } - else - tuple_valid = true; - - tuple = (IndexTuple) PageGetItem(page, iid); + Assert(BTreeTupleGetNAtts(tuple, scan->indexRelation) == tupnatts); - IncrIndexProcessed(); + *continuescan = true; /* default assumption */ tupdesc = RelationGetDescr(scan->indexRelation); so = (BTScanOpaque) scan->opaque; @@ -620,10 +1373,23 @@ _bt_checkkeys(IndexScanDesc scan, bool isNull; Datum test; + if (key->sk_attno > tupnatts) + { + /* + * This attribute is truncated (must be high key). The value for + * this attribute in the first non-pivot tuple on the page to the + * right could be any possible value. Assume that truncated + * attribute passes the qual. + */ + Assert(ScanDirectionIsForward(dir)); + continue; + } + /* row-comparison keys need special processing */ if (key->sk_flags & SK_ROW_HEADER) { - if (_bt_check_rowcompare(key, tuple, tupdesc, dir, continuescan)) + if (_bt_check_rowcompare(key, tuple, tupnatts, tupdesc, dir, + continuescan)) continue; return false; } @@ -633,25 +1399,73 @@ _bt_checkkeys(IndexScanDesc scan, tupdesc, &isNull); - /* btree doesn't support 'A is null' clauses, yet */ if (key->sk_flags & SK_ISNULL) { - /* we shouldn't get here, really; see _bt_preprocess_keys() */ - *continuescan = false; + /* Handle IS NULL/NOT NULL tests */ + if (key->sk_flags & SK_SEARCHNULL) + { + if (isNull) + continue; /* tuple satisfies this qual */ + } + else + { + Assert(key->sk_flags & SK_SEARCHNOTNULL); + if (!isNull) + continue; /* tuple satisfies this qual */ + } + + /* + * Tuple fails this qual. If it's a required qual for the current + * scan direction, then we can conclude no further tuples will + * pass, either. + */ + if ((key->sk_flags & SK_BT_REQFWD) && + ScanDirectionIsForward(dir)) + *continuescan = false; + else if ((key->sk_flags & SK_BT_REQBKWD) && + ScanDirectionIsBackward(dir)) + *continuescan = false; + + /* + * In any case, this indextuple doesn't match the qual. + */ return false; } if (isNull) { - /* - * Since NULLs are sorted after non-NULLs, we know we have reached - * the upper limit of the range of values for this index attr. On - * a forward scan, we can stop if this qual is one of the "must - * match" subset. On a backward scan, however, we should keep - * going. - */ - if ((key->sk_flags & SK_BT_REQFWD) && ScanDirectionIsForward(dir)) - *continuescan = false; + if (key->sk_flags & SK_BT_NULLS_FIRST) + { + /* + * Since NULLs are sorted before non-NULLs, we know we have + * reached the lower limit of the range of values for this + * index attr. On a backward scan, we can stop if this qual + * is one of the "must match" subset. We can stop regardless + * of whether the qual is > or <, so long as it's required, + * because it's not possible for any future tuples to pass. On + * a forward scan, however, we must keep going, because we may + * have initially positioned to the start of the index. + */ + if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) && + ScanDirectionIsBackward(dir)) + *continuescan = false; + } + else + { + /* + * Since NULLs are sorted after non-NULLs, we know we have + * reached the upper limit of the range of values for this + * index attr. On a forward scan, we can stop if this qual is + * one of the "must match" subset. We can stop regardless of + * whether the qual is > or <, so long as it's required, + * because it's not possible for any future tuples to pass. On + * a backward scan, however, we must keep going, because we + * may have initially positioned to the end of the index. + */ + if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) && + ScanDirectionIsForward(dir)) + *continuescan = false; + } /* * In any case, this indextuple doesn't match the qual. @@ -659,7 +1473,8 @@ _bt_checkkeys(IndexScanDesc scan, return false; } - test = FunctionCall2(&key->sk_func, datum, key->sk_argument); + test = FunctionCall2Coll(&key->sk_func, key->sk_collation, + datum, key->sk_argument); if (!DatumGetBool(test)) { @@ -688,10 +1503,7 @@ _bt_checkkeys(IndexScanDesc scan, } /* If we get here, the tuple passes all index quals. */ - if (tuple_valid) - scan->xs_ctup.t_self = tuple->t_tid; - - return tuple_valid; + return true; } /* @@ -704,8 +1516,8 @@ _bt_checkkeys(IndexScanDesc scan, * This is a subroutine for _bt_checkkeys, which see for more info. */ static bool -_bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc, - ScanDirection dir, bool *continuescan) +_bt_check_rowcompare(ScanKey skey, IndexTuple tuple, int tupnatts, + TupleDesc tupdesc, ScanDirection dir, bool *continuescan) { ScanKey subkey = (ScanKey) DatumGetPointer(skey->sk_argument); int32 cmpresult = 0; @@ -721,7 +1533,22 @@ _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc, bool isNull; Assert(subkey->sk_flags & SK_ROW_MEMBER); - Assert(subkey->sk_strategy == skey->sk_strategy); + + if (subkey->sk_attno > tupnatts) + { + /* + * This attribute is truncated (must be high key). The value for + * this attribute in the first non-pivot tuple on the page to the + * right could be any possible value. Assume that truncated + * attribute passes the qual. + */ + Assert(ScanDirectionIsForward(dir)); + cmpresult = 0; + if (subkey->sk_flags & SK_ROW_END) + break; + subkey++; + continue; + } datum = index_getattr(tuple, subkey->sk_attno, @@ -730,16 +1557,38 @@ _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc, if (isNull) { - /* - * Since NULLs are sorted after non-NULLs, we know we have reached - * the upper limit of the range of values for this index attr. On - * a forward scan, we can stop if this qual is one of the "must - * match" subset. On a backward scan, however, we should keep - * going. - */ - if ((subkey->sk_flags & SK_BT_REQFWD) && - ScanDirectionIsForward(dir)) - *continuescan = false; + if (subkey->sk_flags & SK_BT_NULLS_FIRST) + { + /* + * Since NULLs are sorted before non-NULLs, we know we have + * reached the lower limit of the range of values for this + * index attr. On a backward scan, we can stop if this qual + * is one of the "must match" subset. We can stop regardless + * of whether the qual is > or <, so long as it's required, + * because it's not possible for any future tuples to pass. On + * a forward scan, however, we must keep going, because we may + * have initially positioned to the start of the index. + */ + if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) && + ScanDirectionIsBackward(dir)) + *continuescan = false; + } + else + { + /* + * Since NULLs are sorted after non-NULLs, we know we have + * reached the upper limit of the range of values for this + * index attr. On a forward scan, we can stop if this qual is + * one of the "must match" subset. We can stop regardless of + * whether the qual is > or <, so long as it's required, + * because it's not possible for any future tuples to pass. On + * a backward scan, however, we must keep going, because we + * may have initially positioned to the end of the index. + */ + if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) && + ScanDirectionIsForward(dir)) + *continuescan = false; + } /* * In any case, this indextuple doesn't match the qual. @@ -752,9 +1601,8 @@ _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc, /* * Unlike the simple-scankey case, this isn't a disallowed case. * But it can never match. If all the earlier row comparison - * columns are required for the scan direction, we can stop - * the scan, because there can't be another tuple that will - * succeed. + * columns are required for the scan direction, we can stop the + * scan, because there can't be another tuple that will succeed. */ if (subkey != (ScanKey) DatumGetPointer(skey->sk_argument)) subkey--; @@ -768,9 +1616,13 @@ _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc, } /* Perform the test --- three-way comparison not bool operator */ - cmpresult = DatumGetInt32(FunctionCall2(&subkey->sk_func, - datum, - subkey->sk_argument)); + cmpresult = DatumGetInt32(FunctionCall2Coll(&subkey->sk_func, + subkey->sk_collation, + datum, + subkey->sk_argument)); + + if (subkey->sk_flags & SK_BT_DESC) + INVERT_COMPARE_RESULT(cmpresult); /* Done comparing if unequal, else advance to next column */ if (cmpresult != 0) @@ -788,7 +1640,7 @@ _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc, */ switch (subkey->sk_strategy) { - /* EQ and NE cases aren't allowed here */ + /* EQ and NE cases aren't allowed here */ case BTLessStrategyNumber: result = (cmpresult < 0); break; @@ -812,8 +1664,8 @@ _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc, { /* * Tuple fails this qual. If it's a required qual for the current - * scan direction, then we can conclude no further tuples will - * pass, either. Note we have to look at the deciding column, not + * scan direction, then we can conclude no further tuples will pass, + * either. Note we have to look at the deciding column, not * necessarily the first or last column of the row condition. */ if ((subkey->sk_flags & SK_BT_REQFWD) && @@ -826,3 +1678,896 @@ _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc, return result; } + +/* + * _bt_killitems - set LP_DEAD state for items an indexscan caller has + * told us were killed + * + * scan->opaque, referenced locally through so, contains information about the + * current page and killed tuples thereon (generally, this should only be + * called if so->numKilled > 0). + * + * The caller does not have a lock on the page and may or may not have the + * page pinned in a buffer. Note that read-lock is sufficient for setting + * LP_DEAD status (which is only a hint). + * + * We match items by heap TID before assuming they are the right ones to + * delete. We cope with cases where items have moved right due to insertions. + * If an item has moved off the current page due to a split, we'll fail to + * find it and do nothing (this is not an error case --- we assume the item + * will eventually get marked in a future indexscan). + * + * Note that if we hold a pin on the target page continuously from initially + * reading the items until applying this function, VACUUM cannot have deleted + * any items from the page, and so there is no need to search left from the + * recorded offset. (This observation also guarantees that the item is still + * the right one to delete, which might otherwise be questionable since heap + * TIDs can get recycled.) This holds true even if the page has been modified + * by inserts and page splits, so there is no need to consult the LSN. + * + * If the pin was released after reading the page, then we re-read it. If it + * has been modified since we read it (as determined by the LSN), we dare not + * flag any entries because it is possible that the old entry was vacuumed + * away and the TID was re-used by a completely different heap tuple. + */ +void +_bt_killitems(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + Page page; + BTPageOpaque opaque; + OffsetNumber minoff; + OffsetNumber maxoff; + int i; + int numKilled = so->numKilled; + bool killedsomething = false; + + Assert(BTScanPosIsValid(so->currPos)); + + /* + * Always reset the scan state, so we don't look for same items on other + * pages. + */ + so->numKilled = 0; + + if (BTScanPosIsPinned(so->currPos)) + { + /* + * We have held the pin on this page since we read the index tuples, + * so all we need to do is lock it. The pin will have prevented + * re-use of any TID on the page, so there is no need to check the + * LSN. + */ + LockBuffer(so->currPos.buf, BT_READ); + + page = BufferGetPage(so->currPos.buf); + } + else + { + Buffer buf; + + /* Attempt to re-read the buffer, getting pin and lock. */ + buf = _bt_getbuf(scan->indexRelation, so->currPos.currPage, BT_READ); + + /* It might not exist anymore; in which case we can't hint it. */ + if (!BufferIsValid(buf)) + return; + + page = BufferGetPage(buf); + if (BufferGetLSNAtomic(buf) == so->currPos.lsn) + so->currPos.buf = buf; + else + { + /* Modified while not pinned means hinting is not safe. */ + _bt_relbuf(scan->indexRelation, buf); + return; + } + } + + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + minoff = P_FIRSTDATAKEY(opaque); + maxoff = PageGetMaxOffsetNumber(page); + + for (i = 0; i < numKilled; i++) + { + int itemIndex = so->killedItems[i]; + BTScanPosItem *kitem = &so->currPos.items[itemIndex]; + OffsetNumber offnum = kitem->indexOffset; + + Assert(itemIndex >= so->currPos.firstItem && + itemIndex <= so->currPos.lastItem); + if (offnum < minoff) + continue; /* pure paranoia */ + while (offnum <= maxoff) + { + ItemId iid = PageGetItemId(page, offnum); + IndexTuple ituple = (IndexTuple) PageGetItem(page, iid); + + if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid)) + { + /* found the item */ + ItemIdMarkDead(iid); + killedsomething = true; + break; /* out of inner search loop */ + } + offnum = OffsetNumberNext(offnum); + } + } + + /* + * Since this can be redone later if needed, mark as dirty hint. + * + * Whenever we mark anything LP_DEAD, we also set the page's + * BTP_HAS_GARBAGE flag, which is likewise just a hint. + */ + if (killedsomething) + { + opaque->btpo_flags |= BTP_HAS_GARBAGE; + MarkBufferDirtyHint(so->currPos.buf, true); + } + + LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); +} + + +/* + * The following routines manage a shared-memory area in which we track + * assignment of "vacuum cycle IDs" to currently-active btree vacuuming + * operations. There is a single counter which increments each time we + * start a vacuum to assign it a cycle ID. Since multiple vacuums could + * be active concurrently, we have to track the cycle ID for each active + * vacuum; this requires at most MaxBackends entries (usually far fewer). + * We assume at most one vacuum can be active for a given index. + * + * Access to the shared memory area is controlled by BtreeVacuumLock. + * In principle we could use a separate lmgr locktag for each index, + * but a single LWLock is much cheaper, and given the short time that + * the lock is ever held, the concurrency hit should be minimal. + */ + +typedef struct BTOneVacInfo +{ + LockRelId relid; /* global identifier of an index */ + BTCycleId cycleid; /* cycle ID for its active VACUUM */ +} BTOneVacInfo; + +typedef struct BTVacInfo +{ + BTCycleId cycle_ctr; /* cycle ID most recently assigned */ + int num_vacuums; /* number of currently active VACUUMs */ + int max_vacuums; /* allocated length of vacuums[] array */ + BTOneVacInfo vacuums[FLEXIBLE_ARRAY_MEMBER]; +} BTVacInfo; + +static BTVacInfo *btvacinfo; + + +/* + * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index, + * or zero if there is no active VACUUM + * + * Note: for correct interlocking, the caller must already hold pin and + * exclusive lock on each buffer it will store the cycle ID into. This + * ensures that even if a VACUUM starts immediately afterwards, it cannot + * process those pages until the page split is complete. + */ +BTCycleId +_bt_vacuum_cycleid(Relation rel) +{ + BTCycleId result = 0; + int i; + + /* Share lock is enough since this is a read-only operation */ + LWLockAcquire(BtreeVacuumLock, LW_SHARED); + + for (i = 0; i < btvacinfo->num_vacuums; i++) + { + BTOneVacInfo *vac = &btvacinfo->vacuums[i]; + + if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId && + vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId) + { + result = vac->cycleid; + break; + } + } + + LWLockRelease(BtreeVacuumLock); + return result; +} + +/* + * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation + * + * Note: the caller must guarantee that it will eventually call + * _bt_end_vacuum, else we'll permanently leak an array slot. To ensure + * that this happens even in elog(FATAL) scenarios, the appropriate coding + * is not just a PG_TRY, but + * PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel)) + */ +BTCycleId +_bt_start_vacuum(Relation rel) +{ + BTCycleId result; + int i; + BTOneVacInfo *vac; + + LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE); + + /* + * Assign the next cycle ID, being careful to avoid zero as well as the + * reserved high values. + */ + result = ++(btvacinfo->cycle_ctr); + if (result == 0 || result > MAX_BT_CYCLE_ID) + result = btvacinfo->cycle_ctr = 1; + + /* Let's just make sure there's no entry already for this index */ + for (i = 0; i < btvacinfo->num_vacuums; i++) + { + vac = &btvacinfo->vacuums[i]; + if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId && + vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId) + { + /* + * Unlike most places in the backend, we have to explicitly + * release our LWLock before throwing an error. This is because + * we expect _bt_end_vacuum() to be called before transaction + * abort cleanup can run to release LWLocks. + */ + LWLockRelease(BtreeVacuumLock); + elog(ERROR, "multiple active vacuums for index \"%s\"", + RelationGetRelationName(rel)); + } + } + + /* OK, add an entry */ + if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums) + { + LWLockRelease(BtreeVacuumLock); + elog(ERROR, "out of btvacinfo slots"); + } + vac = &btvacinfo->vacuums[btvacinfo->num_vacuums]; + vac->relid = rel->rd_lockInfo.lockRelId; + vac->cycleid = result; + btvacinfo->num_vacuums++; + + LWLockRelease(BtreeVacuumLock); + return result; +} + +/* + * _bt_end_vacuum --- mark a btree VACUUM operation as done + * + * Note: this is deliberately coded not to complain if no entry is found; + * this allows the caller to put PG_TRY around the start_vacuum operation. + */ +void +_bt_end_vacuum(Relation rel) +{ + int i; + + LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE); + + /* Find the array entry */ + for (i = 0; i < btvacinfo->num_vacuums; i++) + { + BTOneVacInfo *vac = &btvacinfo->vacuums[i]; + + if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId && + vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId) + { + /* Remove it by shifting down the last entry */ + *vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1]; + btvacinfo->num_vacuums--; + break; + } + } + + LWLockRelease(BtreeVacuumLock); +} + +/* + * _bt_end_vacuum wrapped as an on_shmem_exit callback function + */ +void +_bt_end_vacuum_callback(int code, Datum arg) +{ + _bt_end_vacuum((Relation) DatumGetPointer(arg)); +} + +/* + * BTreeShmemSize --- report amount of shared memory space needed + */ +Size +BTreeShmemSize(void) +{ + Size size; + + size = offsetof(BTVacInfo, vacuums); + size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo))); + return size; +} + +/* + * BTreeShmemInit --- initialize this module's shared memory + */ +void +BTreeShmemInit(void) +{ + bool found; + + btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State", + BTreeShmemSize(), + &found); + + if (!IsUnderPostmaster) + { + /* Initialize shared memory area */ + Assert(!found); + + /* + * It doesn't really matter what the cycle counter starts at, but + * having it always start the same doesn't seem good. Seed with + * low-order bits of time() instead. + */ + btvacinfo->cycle_ctr = (BTCycleId) time(NULL); + + btvacinfo->num_vacuums = 0; + btvacinfo->max_vacuums = MaxBackends; + } + else + Assert(found); +} + +bytea * +btoptions(Datum reloptions, bool validate) +{ + return default_reloptions(reloptions, validate, RELOPT_KIND_BTREE); +} + +/* + * btproperty() -- Check boolean properties of indexes. + * + * This is optional, but handling AMPROP_RETURNABLE here saves opening the rel + * to call btcanreturn. + */ +bool +btproperty(Oid index_oid, int attno, + IndexAMProperty prop, const char *propname, + bool *res, bool *isnull) +{ + switch (prop) + { + case AMPROP_RETURNABLE: + /* answer only for columns, not AM or whole index */ + if (attno == 0) + return false; + /* otherwise, btree can always return data */ + *res = true; + return true; + + default: + return false; /* punt to generic code */ + } +} + +/* + * btbuildphasename() -- Return name of index build phase. + */ +char * +btbuildphasename(int64 phasenum) +{ + switch (phasenum) + { + case PROGRESS_CREATEIDX_SUBPHASE_INITIALIZE: + return "initializing"; + case PROGRESS_BTREE_PHASE_INDEXBUILD_TABLESCAN: + return "scanning table"; + case PROGRESS_BTREE_PHASE_PERFORMSORT_1: + return "sorting live tuples"; + case PROGRESS_BTREE_PHASE_PERFORMSORT_2: + return "sorting dead tuples"; + case PROGRESS_BTREE_PHASE_LEAF_LOAD: + return "loading tuples in tree"; + default: + return NULL; + } +} + +/* + * _bt_truncate() -- create tuple without unneeded suffix attributes. + * + * Returns truncated pivot index tuple allocated in caller's memory context, + * with key attributes copied from caller's firstright argument. If rel is + * an INCLUDE index, non-key attributes will definitely be truncated away, + * since they're not part of the key space. More aggressive suffix + * truncation can take place when it's clear that the returned tuple does not + * need one or more suffix key attributes. We only need to keep firstright + * attributes up to and including the first non-lastleft-equal attribute. + * Caller's insertion scankey is used to compare the tuples; the scankey's + * argument values are not considered here. + * + * Sometimes this routine will return a new pivot tuple that takes up more + * space than firstright, because a new heap TID attribute had to be added to + * distinguish lastleft from firstright. This should only happen when the + * caller is in the process of splitting a leaf page that has many logical + * duplicates, where it's unavoidable. + * + * Note that returned tuple's t_tid offset will hold the number of attributes + * present, so the original item pointer offset is not represented. Caller + * should only change truncated tuple's downlink. Note also that truncated + * key attributes are treated as containing "minus infinity" values by + * _bt_compare(). + * + * In the worst case (when a heap TID is appended) the size of the returned + * tuple is the size of the first right tuple plus an additional MAXALIGN()'d + * item pointer. This guarantee is important, since callers need to stay + * under the 1/3 of a page restriction on tuple size. If this routine is ever + * taught to truncate within an attribute/datum, it will need to avoid + * returning an enlarged tuple to caller when truncation + TOAST compression + * ends up enlarging the final datum. + */ +IndexTuple +_bt_truncate(Relation rel, IndexTuple lastleft, IndexTuple firstright, + BTScanInsert itup_key) +{ + TupleDesc itupdesc = RelationGetDescr(rel); + int16 natts = IndexRelationGetNumberOfAttributes(rel); + int16 nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); + int keepnatts; + IndexTuple pivot; + ItemPointer pivotheaptid; + Size newsize; + + /* + * We should only ever truncate leaf index tuples. It's never okay to + * truncate a second time. + */ + Assert(BTreeTupleGetNAtts(lastleft, rel) == natts); + Assert(BTreeTupleGetNAtts(firstright, rel) == natts); + + /* Determine how many attributes must be kept in truncated tuple */ + keepnatts = _bt_keep_natts(rel, lastleft, firstright, itup_key); + +#ifdef DEBUG_NO_TRUNCATE + /* Force truncation to be ineffective for testing purposes */ + keepnatts = nkeyatts + 1; +#endif + + if (keepnatts <= natts) + { + IndexTuple tidpivot; + + pivot = index_truncate_tuple(itupdesc, firstright, keepnatts); + + /* + * If there is a distinguishing key attribute within new pivot tuple, + * there is no need to add an explicit heap TID attribute + */ + if (keepnatts <= nkeyatts) + { + BTreeTupleSetNAtts(pivot, keepnatts); + return pivot; + } + + /* + * Only truncation of non-key attributes was possible, since key + * attributes are all equal. It's necessary to add a heap TID + * attribute to the new pivot tuple. + */ + Assert(natts != nkeyatts); + newsize = IndexTupleSize(pivot) + MAXALIGN(sizeof(ItemPointerData)); + tidpivot = palloc0(newsize); + memcpy(tidpivot, pivot, IndexTupleSize(pivot)); + /* cannot leak memory here */ + pfree(pivot); + pivot = tidpivot; + } + else + { + /* + * No truncation was possible, since key attributes are all equal. + * It's necessary to add a heap TID attribute to the new pivot tuple. + */ + Assert(natts == nkeyatts); + newsize = IndexTupleSize(firstright) + MAXALIGN(sizeof(ItemPointerData)); + pivot = palloc0(newsize); + memcpy(pivot, firstright, IndexTupleSize(firstright)); + } + + /* + * We have to use heap TID as a unique-ifier in the new pivot tuple, since + * no non-TID key attribute in the right item readily distinguishes the + * right side of the split from the left side. Use enlarged space that + * holds a copy of first right tuple; place a heap TID value within the + * extra space that remains at the end. + * + * nbtree conceptualizes this case as an inability to truncate away any + * key attribute. We must use an alternative representation of heap TID + * within pivots because heap TID is only treated as an attribute within + * nbtree (e.g., there is no pg_attribute entry). + */ + Assert(itup_key->heapkeyspace); + pivot->t_info &= ~INDEX_SIZE_MASK; + pivot->t_info |= newsize; + + /* + * Lehman & Yao use lastleft as the leaf high key in all cases, but don't + * consider suffix truncation. It seems like a good idea to follow that + * example in cases where no truncation takes place -- use lastleft's heap + * TID. (This is also the closest value to negative infinity that's + * legally usable.) + */ + pivotheaptid = (ItemPointer) ((char *) pivot + newsize - + sizeof(ItemPointerData)); + ItemPointerCopy(&lastleft->t_tid, pivotheaptid); + + /* + * Lehman and Yao require that the downlink to the right page, which is to + * be inserted into the parent page in the second phase of a page split be + * a strict lower bound on items on the right page, and a non-strict upper + * bound for items on the left page. Assert that heap TIDs follow these + * invariants, since a heap TID value is apparently needed as a + * tiebreaker. + */ +#ifndef DEBUG_NO_TRUNCATE + Assert(ItemPointerCompare(&lastleft->t_tid, &firstright->t_tid) < 0); + Assert(ItemPointerCompare(pivotheaptid, &lastleft->t_tid) >= 0); + Assert(ItemPointerCompare(pivotheaptid, &firstright->t_tid) < 0); +#else + + /* + * Those invariants aren't guaranteed to hold for lastleft + firstright + * heap TID attribute values when they're considered here only because + * DEBUG_NO_TRUNCATE is defined (a heap TID is probably not actually + * needed as a tiebreaker). DEBUG_NO_TRUNCATE must therefore use a heap + * TID value that always works as a strict lower bound for items to the + * right. In particular, it must avoid using firstright's leading key + * attribute values along with lastleft's heap TID value when lastleft's + * TID happens to be greater than firstright's TID. + */ + ItemPointerCopy(&firstright->t_tid, pivotheaptid); + + /* + * Pivot heap TID should never be fully equal to firstright. Note that + * the pivot heap TID will still end up equal to lastleft's heap TID when + * that's the only usable value. + */ + ItemPointerSetOffsetNumber(pivotheaptid, + OffsetNumberPrev(ItemPointerGetOffsetNumber(pivotheaptid))); + Assert(ItemPointerCompare(pivotheaptid, &firstright->t_tid) < 0); +#endif + + BTreeTupleSetNAtts(pivot, nkeyatts); + BTreeTupleSetAltHeapTID(pivot); + + return pivot; +} + +/* + * _bt_keep_natts - how many key attributes to keep when truncating. + * + * Caller provides two tuples that enclose a split point. Caller's insertion + * scankey is used to compare the tuples; the scankey's argument values are + * not considered here. + * + * This can return a number of attributes that is one greater than the + * number of key attributes for the index relation. This indicates that the + * caller must use a heap TID as a unique-ifier in new pivot tuple. + */ +static int +_bt_keep_natts(Relation rel, IndexTuple lastleft, IndexTuple firstright, + BTScanInsert itup_key) +{ + int nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); + TupleDesc itupdesc = RelationGetDescr(rel); + int keepnatts; + ScanKey scankey; + + /* + * Be consistent about the representation of BTREE_VERSION 2/3 tuples + * across Postgres versions; don't allow new pivot tuples to have + * truncated key attributes there. _bt_compare() treats truncated key + * attributes as having the value minus infinity, which would break + * searches within !heapkeyspace indexes. + */ + if (!itup_key->heapkeyspace) + { + Assert(nkeyatts != IndexRelationGetNumberOfAttributes(rel)); + return nkeyatts; + } + + scankey = itup_key->scankeys; + keepnatts = 1; + for (int attnum = 1; attnum <= nkeyatts; attnum++, scankey++) + { + Datum datum1, + datum2; + bool isNull1, + isNull2; + + datum1 = index_getattr(lastleft, attnum, itupdesc, &isNull1); + datum2 = index_getattr(firstright, attnum, itupdesc, &isNull2); + + if (isNull1 != isNull2) + break; + + if (!isNull1 && + DatumGetInt32(FunctionCall2Coll(&scankey->sk_func, + scankey->sk_collation, + datum1, + datum2)) != 0) + break; + + keepnatts++; + } + + return keepnatts; +} + +/* + * _bt_keep_natts_fast - fast bitwise variant of _bt_keep_natts. + * + * This is exported so that a candidate split point can have its effect on + * suffix truncation inexpensively evaluated ahead of time when finding a + * split location. A naive bitwise approach to datum comparisons is used to + * save cycles. + * + * The approach taken here usually provides the same answer as _bt_keep_natts + * will (for the same pair of tuples from a heapkeyspace index), since the + * majority of btree opclasses can never indicate that two datums are equal + * unless they're bitwise equal (once detoasted). Similarly, result may + * differ from the _bt_keep_natts result when either tuple has TOASTed datums, + * though this is barely possible in practice. + * + * These issues must be acceptable to callers, typically because they're only + * concerned about making suffix truncation as effective as possible without + * leaving excessive amounts of free space on either side of page split. + * Callers can rely on the fact that attributes considered equal here are + * definitely also equal according to _bt_keep_natts. + */ +int +_bt_keep_natts_fast(Relation rel, IndexTuple lastleft, IndexTuple firstright) +{ + TupleDesc itupdesc = RelationGetDescr(rel); + int keysz = IndexRelationGetNumberOfKeyAttributes(rel); + int keepnatts; + + keepnatts = 1; + for (int attnum = 1; attnum <= keysz; attnum++) + { + Datum datum1, + datum2; + bool isNull1, + isNull2; + Form_pg_attribute att; + + datum1 = index_getattr(lastleft, attnum, itupdesc, &isNull1); + datum2 = index_getattr(firstright, attnum, itupdesc, &isNull2); + att = TupleDescAttr(itupdesc, attnum - 1); + + if (isNull1 != isNull2) + break; + + if (!isNull1 && + !datumIsEqual(datum1, datum2, att->attbyval, att->attlen)) + break; + + keepnatts++; + } + + return keepnatts; +} + +/* + * _bt_check_natts() -- Verify tuple has expected number of attributes. + * + * Returns value indicating if the expected number of attributes were found + * for a particular offset on page. This can be used as a general purpose + * sanity check. + * + * Testing a tuple directly with BTreeTupleGetNAtts() should generally be + * preferred to calling here. That's usually more convenient, and is always + * more explicit. Call here instead when offnum's tuple may be a negative + * infinity tuple that uses the pre-v11 on-disk representation, or when a low + * context check is appropriate. This routine is as strict as possible about + * what is expected on each version of btree. + */ +bool +_bt_check_natts(Relation rel, bool heapkeyspace, Page page, OffsetNumber offnum) +{ + int16 natts = IndexRelationGetNumberOfAttributes(rel); + int16 nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); + BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); + IndexTuple itup; + int tupnatts; + + /* + * We cannot reliably test a deleted or half-deleted page, since they have + * dummy high keys + */ + if (P_IGNORE(opaque)) + return true; + + Assert(offnum >= FirstOffsetNumber && + offnum <= PageGetMaxOffsetNumber(page)); + + /* + * Mask allocated for number of keys in index tuple must be able to fit + * maximum possible number of index attributes + */ + StaticAssertStmt(BT_N_KEYS_OFFSET_MASK >= INDEX_MAX_KEYS, + "BT_N_KEYS_OFFSET_MASK can't fit INDEX_MAX_KEYS"); + + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); + tupnatts = BTreeTupleGetNAtts(itup, rel); + + if (P_ISLEAF(opaque)) + { + if (offnum >= P_FIRSTDATAKEY(opaque)) + { + /* + * Non-pivot tuples currently never use alternative heap TID + * representation -- even those within heapkeyspace indexes + */ + if ((itup->t_info & INDEX_ALT_TID_MASK) != 0) + return false; + + /* + * Leaf tuples that are not the page high key (non-pivot tuples) + * should never be truncated. (Note that tupnatts must have been + * inferred, rather than coming from an explicit on-disk + * representation.) + */ + return tupnatts == natts; + } + else + { + /* + * Rightmost page doesn't contain a page high key, so tuple was + * checked above as ordinary leaf tuple + */ + Assert(!P_RIGHTMOST(opaque)); + + /* + * !heapkeyspace high key tuple contains only key attributes. Note + * that tupnatts will only have been explicitly represented in + * !heapkeyspace indexes that happen to have non-key attributes. + */ + if (!heapkeyspace) + return tupnatts == nkeyatts; + + /* Use generic heapkeyspace pivot tuple handling */ + } + } + else /* !P_ISLEAF(opaque) */ + { + if (offnum == P_FIRSTDATAKEY(opaque)) + { + /* + * The first tuple on any internal page (possibly the first after + * its high key) is its negative infinity tuple. Negative + * infinity tuples are always truncated to zero attributes. They + * are a particular kind of pivot tuple. + */ + if (heapkeyspace) + return tupnatts == 0; + + /* + * The number of attributes won't be explicitly represented if the + * negative infinity tuple was generated during a page split that + * occurred with a version of Postgres before v11. There must be + * a problem when there is an explicit representation that is + * non-zero, or when there is no explicit representation and the + * tuple is evidently not a pre-pg_upgrade tuple. + * + * Prior to v11, downlinks always had P_HIKEY as their offset. Use + * that to decide if the tuple is a pre-v11 tuple. + */ + return tupnatts == 0 || + ((itup->t_info & INDEX_ALT_TID_MASK) == 0 && + ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); + } + else + { + /* + * !heapkeyspace downlink tuple with separator key contains only + * key attributes. Note that tupnatts will only have been + * explicitly represented in !heapkeyspace indexes that happen to + * have non-key attributes. + */ + if (!heapkeyspace) + return tupnatts == nkeyatts; + + /* Use generic heapkeyspace pivot tuple handling */ + } + + } + + /* Handle heapkeyspace pivot tuples (excluding minus infinity items) */ + Assert(heapkeyspace); + + /* + * Explicit representation of the number of attributes is mandatory with + * heapkeyspace index pivot tuples, regardless of whether or not there are + * non-key attributes. + */ + if ((itup->t_info & INDEX_ALT_TID_MASK) == 0) + return false; + + /* + * Heap TID is a tiebreaker key attribute, so it cannot be untruncated + * when any other key attribute is truncated + */ + if (BTreeTupleGetHeapTID(itup) != NULL && tupnatts != nkeyatts) + return false; + + /* + * Pivot tuple must have at least one untruncated key attribute (minus + * infinity pivot tuples are the only exception). Pivot tuples can never + * represent that there is a value present for a key attribute that + * exceeds pg_index.indnkeyatts for the index. + */ + return tupnatts > 0 && tupnatts <= nkeyatts; +} + +/* + * + * _bt_check_third_page() -- check whether tuple fits on a btree page at all. + * + * We actually need to be able to fit three items on every page, so restrict + * any one item to 1/3 the per-page available space. Note that itemsz should + * not include the ItemId overhead. + * + * It might be useful to apply TOAST methods rather than throw an error here. + * Using out of line storage would break assumptions made by suffix truncation + * and by contrib/amcheck, though. + */ +void +_bt_check_third_page(Relation rel, Relation heap, bool needheaptidspace, + Page page, IndexTuple newtup) +{ + Size itemsz; + BTPageOpaque opaque; + + itemsz = MAXALIGN(IndexTupleSize(newtup)); + + /* Double check item size against limit */ + if (itemsz <= BTMaxItemSize(page)) + return; + + /* + * Tuple is probably too large to fit on page, but it's possible that the + * index uses version 2 or version 3, or that page is an internal page, in + * which case a slightly higher limit applies. + */ + if (!needheaptidspace && itemsz <= BTMaxItemSizeNoHeapTid(page)) + return; + + /* + * Internal page insertions cannot fail here, because that would mean that + * an earlier leaf level insertion that should have failed didn't + */ + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + if (!P_ISLEAF(opaque)) + elog(ERROR, "cannot insert oversized tuple of size %zu on internal page of index \"%s\"", + itemsz, RelationGetRelationName(rel)); + + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("index row size %zu exceeds btree version %u maximum %zu for index \"%s\"", + itemsz, + needheaptidspace ? BTREE_VERSION : BTREE_NOVAC_VERSION, + needheaptidspace ? BTMaxItemSize(page) : + BTMaxItemSizeNoHeapTid(page), + RelationGetRelationName(rel)), + errdetail("Index row references tuple (%u,%u) in relation \"%s\".", + ItemPointerGetBlockNumber(&newtup->t_tid), + ItemPointerGetOffsetNumber(&newtup->t_tid), + RelationGetRelationName(heap)), + errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n" + "Consider a function index of an MD5 hash of the value, " + "or use full text indexing."), + errtableconstraint(heap, RelationGetRelationName(rel)))); +}