From: Robert Haas Date: Fri, 7 Nov 2014 20:50:09 +0000 (-0500) Subject: Use the sortsupport infrastructure in more cases. X-Git-Tag: REL9_5_ALPHA1~1251 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=5ea86e6e6;p=postgresql Use the sortsupport infrastructure in more cases. This removes some fmgr overhead from cases such as btree index builds. Peter Geoghegan, reviewed by Andreas Karlsson and me. --- diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 31d48210b2..e15784e1ba 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -686,6 +686,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) int i, keysz = RelationGetNumberOfAttributes(wstate->index); ScanKey indexScanKey = NULL; + SortSupport sortKeys; if (merge) { @@ -701,6 +702,31 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) true, &should_free2); indexScanKey = _bt_mkscankey_nodata(wstate->index); + /* Prepare SortSupport data for each column */ + sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData)); + + for (i = 0; i < keysz; i++) + { + SortSupport sortKey = sortKeys + i; + ScanKey scanKey = indexScanKey + i; + int16 strategy; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = scanKey->sk_collation; + sortKey->ssup_nulls_first = + (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; + sortKey->ssup_attno = scanKey->sk_attno; + + AssertState(sortKey->ssup_attno != 0); + + strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? + BTGreaterStrategyNumber : BTLessStrategyNumber; + + PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey); + } + + _bt_freeskey(indexScanKey); + for (;;) { load1 = true; /* load BTSpool next ? */ @@ -713,43 +739,20 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) { for (i = 1; i <= keysz; i++) { - ScanKey entry; + SortSupport entry; Datum attrDatum1, attrDatum2; bool isNull1, isNull2; int32 compare; - entry = indexScanKey + i - 1; + entry = sortKeys + i - 1; attrDatum1 = index_getattr(itup, i, tupdes, &isNull1); attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2); - if (isNull1) - { - if (isNull2) - compare = 0; /* NULL "=" NULL */ - else if (entry->sk_flags & SK_BT_NULLS_FIRST) - compare = -1; /* NULL "<" NOT_NULL */ - else - compare = 1; /* NULL ">" NOT_NULL */ - } - else if (isNull2) - { - if (entry->sk_flags & SK_BT_NULLS_FIRST) - compare = 1; /* NOT_NULL ">" NULL */ - else - compare = -1; /* NOT_NULL "<" NULL */ - } - else - { - compare = - DatumGetInt32(FunctionCall2Coll(&entry->sk_func, - entry->sk_collation, - attrDatum1, - attrDatum2)); - - if (entry->sk_flags & SK_BT_DESC) - compare = -compare; - } + + compare = ApplySortComparator(attrDatum1, isNull1, + attrDatum2, isNull2, + entry); if (compare > 0) { load1 = false; @@ -783,7 +786,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) true, &should_free2); } } - _bt_freeskey(indexScanKey); + pfree(sortKeys); } else { diff --git a/src/backend/utils/sort/sortsupport.c b/src/backend/utils/sort/sortsupport.c index 2240fd0100..16342f388d 100644 --- a/src/backend/utils/sort/sortsupport.c +++ b/src/backend/utils/sort/sortsupport.c @@ -21,6 +21,7 @@ #include "access/nbtree.h" #include "fmgr.h" #include "utils/lsyscache.h" +#include "utils/rel.h" #include "utils/sortsupport.h" @@ -86,28 +87,14 @@ PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup) } /* - * Fill in SortSupport given an ordering operator (btree "<" or ">" operator). - * - * Caller must previously have zeroed the SortSupportData structure and then - * filled in ssup_cxt, ssup_collation, and ssup_nulls_first. This will fill - * in ssup_reverse as well as the comparator function pointer. + * Look up and call sortsupport function to setup SortSupport comparator; + * or if no such function exists or it declines to set up the appropriate + * state, prepare a suitable shim. */ -void -PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup) +static void +FinishSortSupportFunction(Oid opfamily, Oid opcintype, SortSupport ssup) { Oid sortSupportFunction; - Oid opfamily; - Oid opcintype; - int16 strategy; - - Assert(ssup->comparator == NULL); - - /* Find the operator in pg_amop */ - if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype, - &strategy)) - elog(ERROR, "operator %u is not a valid ordering operator", - orderingOp); - ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber); /* Look for a sort support function */ sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype, @@ -136,3 +123,57 @@ PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup) PrepareSortSupportComparisonShim(sortFunction, ssup); } } + +/* + * Fill in SortSupport given an ordering operator (btree "<" or ">" operator). + * + * Caller must previously have zeroed the SortSupportData structure and then + * filled in ssup_cxt, ssup_collation, and ssup_nulls_first. This will fill + * in ssup_reverse as well as the comparator function pointer. + */ +void +PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup) +{ + Oid opfamily; + Oid opcintype; + int16 strategy; + + Assert(ssup->comparator == NULL); + + /* Find the operator in pg_amop */ + if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype, + &strategy)) + elog(ERROR, "operator %u is not a valid ordering operator", + orderingOp); + ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber); + + FinishSortSupportFunction(opfamily, opcintype, ssup); +} + +/* + * Fill in SortSupport given an index relation, attribute, and strategy. + * + * Caller must previously have zeroed the SortSupportData structure and then + * filled in ssup_cxt, ssup_attno, ssup_collation, and ssup_nulls_first. This + * will fill in ssup_reverse (based on the supplied strategy), as well as the + * comparator function pointer. + */ +void +PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy, + SortSupport ssup) +{ + Oid opfamily = indexRel->rd_opfamily[ssup->ssup_attno - 1]; + Oid opcintype = indexRel->rd_opcintype[ssup->ssup_attno - 1]; + + Assert(ssup->comparator == NULL); + + /* Find the operator in pg_amop */ + if (indexRel->rd_rel->relam != BTREE_AM_OID) + elog(ERROR, "unexpected non-btree AM: %u", indexRel->rd_rel->relam); + if (strategy != BTGreaterStrategyNumber && + strategy != BTLessStrategyNumber) + elog(ERROR, "unexpected sort support strategy: %d", strategy); + ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber); + + FinishSortSupportFunction(opfamily, opcintype, ssup); +} diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index 2d15da0bfa..b68a85df22 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -256,13 +256,6 @@ struct Tuplesortstate void (*readtup) (Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); - /* - * Function to reverse the sort direction from its current state. (We - * could dispense with this if we wanted to enforce that all variants - * represent the sort key information alike.) - */ - void (*reversedirection) (Tuplesortstate *state); - /* * This array holds the tuples now in sort memory. If we are in state * INITIAL, the tuples are in no particular order; if we are in state @@ -340,8 +333,9 @@ struct Tuplesortstate bool markpos_eof; /* saved "eof_reached" */ /* - * These variables are specific to the MinimalTuple case; they are set by - * tuplesort_begin_heap and used only by the MinimalTuple routines. + * The sortKeys variable is used by every case other than the hash index + * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the + * MinimalTuple and CLUSTER routines, though. */ TupleDesc tupDesc; SortSupport sortKeys; /* array of length nKeys */ @@ -354,8 +348,7 @@ struct Tuplesortstate /* * These variables are specific to the CLUSTER case; they are set by - * tuplesort_begin_cluster. Note CLUSTER also uses tupDesc and - * indexScanKey. + * tuplesort_begin_cluster. */ IndexInfo *indexInfo; /* info about index being used for reference */ EState *estate; /* for evaluating index expressions */ @@ -368,7 +361,6 @@ struct Tuplesortstate Relation indexRel; /* index being built */ /* These are specific to the index_btree subcase: */ - ScanKey indexScanKey; bool enforceUnique; /* complain if we find duplicate tuples */ /* These are specific to the index_hash subcase: */ @@ -395,7 +387,6 @@ struct Tuplesortstate #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) -#define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state)) #define LACKMEM(state) ((state)->availMem < 0) #define USEMEM(state,amt) ((state)->availMem -= (amt)) #define FREEMEM(state,amt) ((state)->availMem += (amt)) @@ -464,6 +455,7 @@ static void sort_bounded_heap(Tuplesortstate *state); static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple, int tupleindex, bool checkIndex); static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex); +static void reversedirection(Tuplesortstate *state); static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); static void markrunend(Tuplesortstate *state, int tapenum); static int comparetup_heap(const SortTuple *a, const SortTuple *b, @@ -473,7 +465,6 @@ static void writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_heap(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); -static void reversedirection_heap(Tuplesortstate *state); static int comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup); @@ -490,8 +481,6 @@ static void writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_index(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); -static void reversedirection_index_btree(Tuplesortstate *state); -static void reversedirection_index_hash(Tuplesortstate *state); static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup); @@ -499,7 +488,6 @@ static void writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_datum(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); -static void reversedirection_datum(Tuplesortstate *state); static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); /* @@ -629,7 +617,6 @@ tuplesort_begin_heap(TupleDesc tupDesc, state->copytup = copytup_heap; state->writetup = writetup_heap; state->readtup = readtup_heap; - state->reversedirection = reversedirection_heap; state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ @@ -665,7 +652,9 @@ tuplesort_begin_cluster(TupleDesc tupDesc, int workMem, bool randomAccess) { Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); + ScanKey indexScanKey; MemoryContext oldcontext; + int i; Assert(indexRel->rd_rel->relam == BTREE_AM_OID); @@ -691,13 +680,13 @@ tuplesort_begin_cluster(TupleDesc tupDesc, state->copytup = copytup_cluster; state->writetup = writetup_cluster; state->readtup = readtup_cluster; - state->reversedirection = reversedirection_index_btree; state->indexInfo = BuildIndexInfo(indexRel); - state->indexScanKey = _bt_mkscankey_nodata(indexRel); state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ + indexScanKey = _bt_mkscankey_nodata(indexRel); + if (state->indexInfo->ii_Expressions != NULL) { TupleTableSlot *slot; @@ -715,6 +704,32 @@ tuplesort_begin_cluster(TupleDesc tupDesc, econtext->ecxt_scantuple = slot; } + /* Prepare SortSupport data for each column */ + state->sortKeys = (SortSupport) palloc0(state->nKeys * + sizeof(SortSupportData)); + + for (i = 0; i < state->nKeys; i++) + { + SortSupport sortKey = state->sortKeys + i; + ScanKey scanKey = indexScanKey + i; + int16 strategy; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = scanKey->sk_collation; + sortKey->ssup_nulls_first = + (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; + sortKey->ssup_attno = scanKey->sk_attno; + + AssertState(sortKey->ssup_attno != 0); + + strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? + BTGreaterStrategyNumber : BTLessStrategyNumber; + + PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); + } + + _bt_freeskey(indexScanKey); + MemoryContextSwitchTo(oldcontext); return state; @@ -727,7 +742,9 @@ tuplesort_begin_index_btree(Relation heapRel, int workMem, bool randomAccess) { Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); + ScanKey indexScanKey; MemoryContext oldcontext; + int i; oldcontext = MemoryContextSwitchTo(state->sortcontext); @@ -751,13 +768,40 @@ tuplesort_begin_index_btree(Relation heapRel, state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; - state->reversedirection = reversedirection_index_btree; state->heapRel = heapRel; state->indexRel = indexRel; - state->indexScanKey = _bt_mkscankey_nodata(indexRel); state->enforceUnique = enforceUnique; + indexScanKey = _bt_mkscankey_nodata(indexRel); + state->nKeys = RelationGetNumberOfAttributes(indexRel); + + /* Prepare SortSupport data for each column */ + state->sortKeys = (SortSupport) palloc0(state->nKeys * + sizeof(SortSupportData)); + + for (i = 0; i < state->nKeys; i++) + { + SortSupport sortKey = state->sortKeys + i; + ScanKey scanKey = indexScanKey + i; + int16 strategy; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = scanKey->sk_collation; + sortKey->ssup_nulls_first = + (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; + sortKey->ssup_attno = scanKey->sk_attno; + + AssertState(sortKey->ssup_attno != 0); + + strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? + BTGreaterStrategyNumber : BTLessStrategyNumber; + + PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); + } + + _bt_freeskey(indexScanKey); + MemoryContextSwitchTo(oldcontext); return state; @@ -788,7 +832,6 @@ tuplesort_begin_index_hash(Relation heapRel, state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; - state->reversedirection = reversedirection_index_hash; state->heapRel = heapRel; state->indexRel = indexRel; @@ -831,7 +874,6 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, state->copytup = copytup_datum; state->writetup = writetup_datum; state->readtup = readtup_datum; - state->reversedirection = reversedirection_datum; state->datumType = datumType; @@ -2599,7 +2641,7 @@ make_bounded_heap(Tuplesortstate *state) Assert(tupcount >= state->bound); /* Reverse sort direction so largest entry will be at root */ - REVERSEDIRECTION(state); + reversedirection(state); state->memtupcount = 0; /* make the heap empty */ for (i = 0; i < tupcount; i++) @@ -2663,7 +2705,7 @@ sort_bounded_heap(Tuplesortstate *state) * Reverse sort direction back to the original state. This is not * actually necessary but seems like a good idea for tidiness. */ - REVERSEDIRECTION(state); + reversedirection(state); state->status = TSS_SORTEDINMEM; state->boundUsed = true; @@ -2753,6 +2795,24 @@ tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex) memtuples[i] = *tuple; } +/* + * Function to reverse the sort direction from its current state + * + * It is not safe to call this when performing hash tuplesorts + */ +static void +reversedirection(Tuplesortstate *state) +{ + SortSupport sortKey = state->sortKeys; + int nkey; + + for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++) + { + sortKey->ssup_reverse = !sortKey->ssup_reverse; + sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first; + } +} + /* * Tape interface routines @@ -2780,73 +2840,6 @@ markrunend(Tuplesortstate *state, int tapenum) } -/* - * Inline-able copy of FunctionCall2Coll() to save some cycles in sorting. - */ -static inline Datum -myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2) -{ - FunctionCallInfoData fcinfo; - Datum result; - - InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL); - - fcinfo.arg[0] = arg1; - fcinfo.arg[1] = arg2; - fcinfo.argnull[0] = false; - fcinfo.argnull[1] = false; - - result = FunctionCallInvoke(&fcinfo); - - /* Check for null result, since caller is clearly not expecting one */ - if (fcinfo.isnull) - elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid); - - return result; -} - -/* - * Apply a sort function (by now converted to fmgr lookup form) - * and return a 3-way comparison result. This takes care of handling - * reverse-sort and NULLs-ordering properly. We assume that DESC and - * NULLS_FIRST options are encoded in sk_flags the same way btree does it. - */ -static inline int32 -inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation, - Datum datum1, bool isNull1, - Datum datum2, bool isNull2) -{ - int32 compare; - - if (isNull1) - { - if (isNull2) - compare = 0; /* NULL "=" NULL */ - else if (sk_flags & SK_BT_NULLS_FIRST) - compare = -1; /* NULL "<" NOT_NULL */ - else - compare = 1; /* NULL ">" NOT_NULL */ - } - else if (isNull2) - { - if (sk_flags & SK_BT_NULLS_FIRST) - compare = 1; /* NOT_NULL ">" NULL */ - else - compare = -1; /* NOT_NULL "<" NULL */ - } - else - { - compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation, - datum1, datum2)); - - if (sk_flags & SK_BT_DESC) - compare = -compare; - } - - return compare; -} - - /* * Routines specialized for HeapTuple (actually MinimalTuple) case */ @@ -2972,20 +2965,6 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup, &stup->isnull1); } -static void -reversedirection_heap(Tuplesortstate *state) -{ - SortSupport sortKey = state->sortKeys; - int nkey; - - for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++) - { - sortKey->ssup_reverse = !sortKey->ssup_reverse; - sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first; - } -} - - /* * Routines specialized for the CLUSTER case (HeapTuple data, with * comparisons per a btree index definition) @@ -2995,7 +2974,7 @@ static int comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) { - ScanKey scanKey = state->indexScanKey; + SortSupport sortKey = state->sortKeys; HeapTuple ltup; HeapTuple rtup; TupleDesc tupDesc; @@ -3005,14 +2984,13 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b, /* Compare the leading sort key, if it's simple */ if (state->indexInfo->ii_KeyAttrNumbers[0] != 0) { - compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags, - scanKey->sk_collation, - a->datum1, a->isnull1, - b->datum1, b->isnull1); + compare = ApplySortComparator(a->datum1, a->isnull1, + b->datum1, b->isnull1, + sortKey); if (compare != 0 || state->nKeys == 1) return compare; /* Compare additional columns the hard way */ - scanKey++; + sortKey++; nkey = 1; } else @@ -3030,7 +3008,7 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b, /* If not expression index, just compare the proper heap attrs */ tupDesc = state->tupDesc; - for (; nkey < state->nKeys; nkey++, scanKey++) + for (; nkey < state->nKeys; nkey++, sortKey++) { AttrNumber attno = state->indexInfo->ii_KeyAttrNumbers[nkey]; Datum datum1, @@ -3041,11 +3019,9 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b, datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1); datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2); - compare = inlineApplySortFunction(&scanKey->sk_func, - scanKey->sk_flags, - scanKey->sk_collation, - datum1, isnull1, - datum2, isnull2); + compare = ApplySortComparator(datum1, isnull1, + datum2, isnull2, + sortKey); if (compare != 0) return compare; } @@ -3077,15 +3053,13 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b, FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, r_index_values, r_index_isnull); - for (; nkey < state->nKeys; nkey++, scanKey++) + for (; nkey < state->nKeys; nkey++, sortKey++) { - compare = inlineApplySortFunction(&scanKey->sk_func, - scanKey->sk_flags, - scanKey->sk_collation, - l_index_values[nkey], - l_index_isnull[nkey], - r_index_values[nkey], - r_index_isnull[nkey]); + compare = ApplySortComparator(l_index_values[nkey], + l_index_isnull[nkey], + r_index_values[nkey], + r_index_isnull[nkey], + sortKey); if (compare != 0) return compare; } @@ -3180,7 +3154,7 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b, * is also special handling for enforcing uniqueness, and special treatment * for equal keys at the end. */ - ScanKey scanKey = state->indexScanKey; + SortSupport sortKey = state->sortKeys; IndexTuple tuple1; IndexTuple tuple2; int keysz; @@ -3190,10 +3164,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b, int32 compare; /* Compare the leading sort key */ - compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags, - scanKey->sk_collation, - a->datum1, a->isnull1, - b->datum1, b->isnull1); + compare = ApplySortComparator(a->datum1, a->isnull1, + b->datum1, b->isnull1, + sortKey); if (compare != 0) return compare; @@ -3206,8 +3179,8 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b, tuple2 = (IndexTuple) b->tuple; keysz = state->nKeys; tupDes = RelationGetDescr(state->indexRel); - scanKey++; - for (nkey = 2; nkey <= keysz; nkey++, scanKey++) + sortKey++; + for (nkey = 2; nkey <= keysz; nkey++, sortKey++) { Datum datum1, datum2; @@ -3217,10 +3190,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b, datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1); datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2); - compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags, - scanKey->sk_collation, - datum1, isnull1, - datum2, isnull2); + compare = ApplySortComparator(datum1, isnull1, + datum2, isnull2, + sortKey); if (compare != 0) return compare; /* done when we find unequal attributes */ @@ -3394,26 +3366,6 @@ readtup_index(Tuplesortstate *state, SortTuple *stup, &stup->isnull1); } -static void -reversedirection_index_btree(Tuplesortstate *state) -{ - ScanKey scanKey = state->indexScanKey; - int nkey; - - for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++) - { - scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST); - } -} - -static void -reversedirection_index_hash(Tuplesortstate *state) -{ - /* We don't support reversing direction in a hash index sort */ - elog(ERROR, "reversedirection_index_hash is not implemented"); -} - - /* * Routines specialized for DatumTuple case */ @@ -3512,13 +3464,6 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, &tuplen, sizeof(tuplen)); } -static void -reversedirection_datum(Tuplesortstate *state) -{ - state->onlyKey->ssup_reverse = !state->onlyKey->ssup_reverse; - state->onlyKey->ssup_nulls_first = !state->onlyKey->ssup_nulls_first; -} - /* * Convenience routine to free a tuple previously loaded into sort memory */ diff --git a/src/include/utils/sortsupport.h b/src/include/utils/sortsupport.h index 4417143c34..faae703e0e 100644 --- a/src/include/utils/sortsupport.h +++ b/src/include/utils/sortsupport.h @@ -48,6 +48,7 @@ #define SORTSUPPORT_H #include "access/attnum.h" +#include "utils/relcache.h" typedef struct SortSupportData *SortSupport; @@ -152,5 +153,7 @@ ApplySortComparator(Datum datum1, bool isNull1, /* Other functions in utils/sort/sortsupport.c */ extern void PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup); extern void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup); +extern void PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy, + SortSupport ssup); #endif /* SORTSUPPORT_H */