]> granicus.if.org Git - postgresql/commitdiff
Use the sortsupport infrastructure in more cases.
authorRobert Haas <rhaas@postgresql.org>
Fri, 7 Nov 2014 20:50:09 +0000 (15:50 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 7 Nov 2014 20:50:55 +0000 (15:50 -0500)
This removes some fmgr overhead from cases such as btree index builds.

Peter Geoghegan, reviewed by Andreas Karlsson and me.

src/backend/access/nbtree/nbtsort.c
src/backend/utils/sort/sortsupport.c
src/backend/utils/sort/tuplesort.c
src/include/utils/sortsupport.h

index 31d48210b2231a83b3841612c5bc2bbf229b6af0..e15784e1ba9f415139ad5553794d5ffbdf3ab00c 100644 (file)
@@ -686,6 +686,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
        int                     i,
                                keysz = RelationGetNumberOfAttributes(wstate->index);
        ScanKey         indexScanKey = NULL;
+       SortSupport sortKeys;
 
        if (merge)
        {
@@ -701,6 +702,31 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
                                                                                true, &should_free2);
                indexScanKey = _bt_mkscankey_nodata(wstate->index);
 
+               /* Prepare SortSupport data for each column */
+               sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
+
+               for (i = 0; i < keysz; i++)
+               {
+                       SortSupport sortKey = sortKeys + i;
+                       ScanKey         scanKey = indexScanKey + i;
+                       int16           strategy;
+
+                       sortKey->ssup_cxt = CurrentMemoryContext;
+                       sortKey->ssup_collation = scanKey->sk_collation;
+                       sortKey->ssup_nulls_first =
+                               (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+                       sortKey->ssup_attno = scanKey->sk_attno;
+
+                       AssertState(sortKey->ssup_attno != 0);
+
+                       strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+                               BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+                       PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
+               }
+
+               _bt_freeskey(indexScanKey);
+
                for (;;)
                {
                        load1 = true;           /* load BTSpool next ? */
@@ -713,43 +739,20 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
                        {
                                for (i = 1; i <= keysz; i++)
                                {
-                                       ScanKey         entry;
+                                       SortSupport     entry;
                                        Datum           attrDatum1,
                                                                attrDatum2;
                                        bool            isNull1,
                                                                isNull2;
                                        int32           compare;
 
-                                       entry = indexScanKey + i - 1;
+                                       entry = sortKeys + i - 1;
                                        attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
                                        attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
-                                       if (isNull1)
-                                       {
-                                               if (isNull2)
-                                                       compare = 0;            /* NULL "=" NULL */
-                                               else if (entry->sk_flags & SK_BT_NULLS_FIRST)
-                                                       compare = -1;           /* NULL "<" NOT_NULL */
-                                               else
-                                                       compare = 1;            /* NULL ">" NOT_NULL */
-                                       }
-                                       else if (isNull2)
-                                       {
-                                               if (entry->sk_flags & SK_BT_NULLS_FIRST)
-                                                       compare = 1;            /* NOT_NULL ">" NULL */
-                                               else
-                                                       compare = -1;           /* NOT_NULL "<" NULL */
-                                       }
-                                       else
-                                       {
-                                               compare =
-                                                       DatumGetInt32(FunctionCall2Coll(&entry->sk_func,
-                                                                                                                entry->sk_collation,
-                                                                                                                       attrDatum1,
-                                                                                                                       attrDatum2));
-
-                                               if (entry->sk_flags & SK_BT_DESC)
-                                                       compare = -compare;
-                                       }
+
+                                       compare = ApplySortComparator(attrDatum1, isNull1,
+                                                                                                 attrDatum2, isNull2,
+                                                                                                 entry);
                                        if (compare > 0)
                                        {
                                                load1 = false;
@@ -783,7 +786,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
                                                                                                true, &should_free2);
                        }
                }
-               _bt_freeskey(indexScanKey);
+               pfree(sortKeys);
        }
        else
        {
index 2240fd01001ecdc1f390a0c73d87879d207980ba..16342f388db5f0bc8c4da2aaddfe7dab15d634d9 100644 (file)
@@ -21,6 +21,7 @@
 #include "access/nbtree.h"
 #include "fmgr.h"
 #include "utils/lsyscache.h"
+#include "utils/rel.h"
 #include "utils/sortsupport.h"
 
 
@@ -86,28 +87,14 @@ PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup)
 }
 
 /*
- * Fill in SortSupport given an ordering operator (btree "<" or ">" operator).
- *
- * Caller must previously have zeroed the SortSupportData structure and then
- * filled in ssup_cxt, ssup_collation, and ssup_nulls_first.  This will fill
- * in ssup_reverse as well as the comparator function pointer.
+ * Look up and call sortsupport function to setup SortSupport comparator;
+ * or if no such function exists or it declines to set up the appropriate
+ * state, prepare a suitable shim.
  */
-void
-PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
+static void
+FinishSortSupportFunction(Oid opfamily, Oid opcintype, SortSupport ssup)
 {
        Oid                     sortSupportFunction;
-       Oid                     opfamily;
-       Oid                     opcintype;
-       int16           strategy;
-
-       Assert(ssup->comparator == NULL);
-
-       /* Find the operator in pg_amop */
-       if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype,
-                                                                       &strategy))
-               elog(ERROR, "operator %u is not a valid ordering operator",
-                        orderingOp);
-       ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
 
        /* Look for a sort support function */
        sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype,
@@ -136,3 +123,57 @@ PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
                PrepareSortSupportComparisonShim(sortFunction, ssup);
        }
 }
+
+/*
+ * Fill in SortSupport given an ordering operator (btree "<" or ">" operator).
+ *
+ * Caller must previously have zeroed the SortSupportData structure and then
+ * filled in ssup_cxt, ssup_collation, and ssup_nulls_first.  This will fill
+ * in ssup_reverse as well as the comparator function pointer.
+ */
+void
+PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
+{
+       Oid                     opfamily;
+       Oid                     opcintype;
+       int16           strategy;
+
+       Assert(ssup->comparator == NULL);
+
+       /* Find the operator in pg_amop */
+       if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype,
+                                                                       &strategy))
+               elog(ERROR, "operator %u is not a valid ordering operator",
+                        orderingOp);
+       ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
+
+       FinishSortSupportFunction(opfamily, opcintype, ssup);
+}
+
+/*
+ * Fill in SortSupport given an index relation, attribute, and strategy.
+ *
+ * Caller must previously have zeroed the SortSupportData structure and then
+ * filled in ssup_cxt, ssup_attno, ssup_collation, and ssup_nulls_first.  This
+ * will fill in ssup_reverse (based on the supplied strategy), as well as the
+ * comparator function pointer.
+ */
+void
+PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy,
+                                                          SortSupport ssup)
+{
+       Oid                     opfamily = indexRel->rd_opfamily[ssup->ssup_attno - 1];
+       Oid                     opcintype = indexRel->rd_opcintype[ssup->ssup_attno - 1];
+
+       Assert(ssup->comparator == NULL);
+
+       /* Find the operator in pg_amop */
+       if (indexRel->rd_rel->relam != BTREE_AM_OID)
+               elog(ERROR, "unexpected non-btree AM: %u", indexRel->rd_rel->relam);
+       if (strategy != BTGreaterStrategyNumber &&
+               strategy != BTLessStrategyNumber)
+               elog(ERROR, "unexpected sort support strategy: %d", strategy);
+       ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
+
+       FinishSortSupportFunction(opfamily, opcintype, ssup);
+}
index 2d15da0bfa796d9303e9597f496e19e40c3e50aa..b68a85df223fd2a726db94267f2b0be237150335 100644 (file)
@@ -256,13 +256,6 @@ struct Tuplesortstate
        void            (*readtup) (Tuplesortstate *state, SortTuple *stup,
                                                                                int tapenum, unsigned int len);
 
-       /*
-        * Function to reverse the sort direction from its current state. (We
-        * could dispense with this if we wanted to enforce that all variants
-        * represent the sort key information alike.)
-        */
-       void            (*reversedirection) (Tuplesortstate *state);
-
        /*
         * This array holds the tuples now in sort memory.  If we are in state
         * INITIAL, the tuples are in no particular order; if we are in state
@@ -340,8 +333,9 @@ struct Tuplesortstate
        bool            markpos_eof;    /* saved "eof_reached" */
 
        /*
-        * These variables are specific to the MinimalTuple case; they are set by
-        * tuplesort_begin_heap and used only by the MinimalTuple routines.
+        * The sortKeys variable is used by every case other than the hash index
+        * case; it is set by tuplesort_begin_xxx.  tupDesc is only used by the
+        * MinimalTuple and CLUSTER routines, though.
         */
        TupleDesc       tupDesc;
        SortSupport sortKeys;           /* array of length nKeys */
@@ -354,8 +348,7 @@ struct Tuplesortstate
 
        /*
         * These variables are specific to the CLUSTER case; they are set by
-        * tuplesort_begin_cluster.  Note CLUSTER also uses tupDesc and
-        * indexScanKey.
+        * tuplesort_begin_cluster.
         */
        IndexInfo  *indexInfo;          /* info about index being used for reference */
        EState     *estate;                     /* for evaluating index expressions */
@@ -368,7 +361,6 @@ struct Tuplesortstate
        Relation        indexRel;               /* index being built */
 
        /* These are specific to the index_btree subcase: */
-       ScanKey         indexScanKey;
        bool            enforceUnique;  /* complain if we find duplicate tuples */
 
        /* These are specific to the index_hash subcase: */
@@ -395,7 +387,6 @@ struct Tuplesortstate
 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
 #define WRITETUP(state,tape,stup)      ((*(state)->writetup) (state, tape, stup))
 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
-#define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
 #define LACKMEM(state)         ((state)->availMem < 0)
 #define USEMEM(state,amt)      ((state)->availMem -= (amt))
 #define FREEMEM(state,amt)     ((state)->availMem += (amt))
@@ -464,6 +455,7 @@ static void sort_bounded_heap(Tuplesortstate *state);
 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
                                          int tupleindex, bool checkIndex);
 static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
+static void reversedirection(Tuplesortstate *state);
 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
 static void markrunend(Tuplesortstate *state, int tapenum);
 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
@@ -473,7 +465,6 @@ static void writetup_heap(Tuplesortstate *state, int tapenum,
                          SortTuple *stup);
 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
                         int tapenum, unsigned int len);
-static void reversedirection_heap(Tuplesortstate *state);
 static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
                                   Tuplesortstate *state);
 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -490,8 +481,6 @@ static void writetup_index(Tuplesortstate *state, int tapenum,
                           SortTuple *stup);
 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
                          int tapenum, unsigned int len);
-static void reversedirection_index_btree(Tuplesortstate *state);
-static void reversedirection_index_hash(Tuplesortstate *state);
 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
                                 Tuplesortstate *state);
 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -499,7 +488,6 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
                           SortTuple *stup);
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
                          int tapenum, unsigned int len);
-static void reversedirection_datum(Tuplesortstate *state);
 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
 
 /*
@@ -629,7 +617,6 @@ tuplesort_begin_heap(TupleDesc tupDesc,
        state->copytup = copytup_heap;
        state->writetup = writetup_heap;
        state->readtup = readtup_heap;
-       state->reversedirection = reversedirection_heap;
 
        state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
 
@@ -665,7 +652,9 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
                                                int workMem, bool randomAccess)
 {
        Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       ScanKey                 indexScanKey;
        MemoryContext oldcontext;
+       int                             i;
 
        Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
 
@@ -691,13 +680,13 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
        state->copytup = copytup_cluster;
        state->writetup = writetup_cluster;
        state->readtup = readtup_cluster;
-       state->reversedirection = reversedirection_index_btree;
 
        state->indexInfo = BuildIndexInfo(indexRel);
-       state->indexScanKey = _bt_mkscankey_nodata(indexRel);
 
        state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
 
+       indexScanKey = _bt_mkscankey_nodata(indexRel);
+
        if (state->indexInfo->ii_Expressions != NULL)
        {
                TupleTableSlot *slot;
@@ -715,6 +704,32 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
                econtext->ecxt_scantuple = slot;
        }
 
+       /* Prepare SortSupport data for each column */
+       state->sortKeys = (SortSupport) palloc0(state->nKeys *
+                                                                                       sizeof(SortSupportData));
+
+       for (i = 0; i < state->nKeys; i++)
+       {
+               SortSupport sortKey = state->sortKeys + i;
+               ScanKey         scanKey = indexScanKey + i;
+               int16           strategy;
+
+               sortKey->ssup_cxt = CurrentMemoryContext;
+               sortKey->ssup_collation = scanKey->sk_collation;
+               sortKey->ssup_nulls_first =
+                       (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+               sortKey->ssup_attno = scanKey->sk_attno;
+
+               AssertState(sortKey->ssup_attno != 0);
+
+               strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+                       BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+               PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
+       }
+
+       _bt_freeskey(indexScanKey);
+
        MemoryContextSwitchTo(oldcontext);
 
        return state;
@@ -727,7 +742,9 @@ tuplesort_begin_index_btree(Relation heapRel,
                                                        int workMem, bool randomAccess)
 {
        Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       ScanKey                 indexScanKey;
        MemoryContext oldcontext;
+       int                             i;
 
        oldcontext = MemoryContextSwitchTo(state->sortcontext);
 
@@ -751,13 +768,40 @@ tuplesort_begin_index_btree(Relation heapRel,
        state->copytup = copytup_index;
        state->writetup = writetup_index;
        state->readtup = readtup_index;
-       state->reversedirection = reversedirection_index_btree;
 
        state->heapRel = heapRel;
        state->indexRel = indexRel;
-       state->indexScanKey = _bt_mkscankey_nodata(indexRel);
        state->enforceUnique = enforceUnique;
 
+       indexScanKey = _bt_mkscankey_nodata(indexRel);
+       state->nKeys = RelationGetNumberOfAttributes(indexRel);
+
+       /* Prepare SortSupport data for each column */
+       state->sortKeys = (SortSupport) palloc0(state->nKeys *
+                                                                                       sizeof(SortSupportData));
+
+       for (i = 0; i < state->nKeys; i++)
+       {
+               SortSupport sortKey = state->sortKeys + i;
+               ScanKey         scanKey = indexScanKey + i;
+               int16           strategy;
+
+               sortKey->ssup_cxt = CurrentMemoryContext;
+               sortKey->ssup_collation = scanKey->sk_collation;
+               sortKey->ssup_nulls_first =
+                       (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+               sortKey->ssup_attno = scanKey->sk_attno;
+
+               AssertState(sortKey->ssup_attno != 0);
+
+               strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+                       BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+               PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
+       }
+
+       _bt_freeskey(indexScanKey);
+
        MemoryContextSwitchTo(oldcontext);
 
        return state;
@@ -788,7 +832,6 @@ tuplesort_begin_index_hash(Relation heapRel,
        state->copytup = copytup_index;
        state->writetup = writetup_index;
        state->readtup = readtup_index;
-       state->reversedirection = reversedirection_index_hash;
 
        state->heapRel = heapRel;
        state->indexRel = indexRel;
@@ -831,7 +874,6 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
        state->copytup = copytup_datum;
        state->writetup = writetup_datum;
        state->readtup = readtup_datum;
-       state->reversedirection = reversedirection_datum;
 
        state->datumType = datumType;
 
@@ -2599,7 +2641,7 @@ make_bounded_heap(Tuplesortstate *state)
        Assert(tupcount >= state->bound);
 
        /* Reverse sort direction so largest entry will be at root */
-       REVERSEDIRECTION(state);
+       reversedirection(state);
 
        state->memtupcount = 0;         /* make the heap empty */
        for (i = 0; i < tupcount; i++)
@@ -2663,7 +2705,7 @@ sort_bounded_heap(Tuplesortstate *state)
         * Reverse sort direction back to the original state.  This is not
         * actually necessary but seems like a good idea for tidiness.
         */
-       REVERSEDIRECTION(state);
+       reversedirection(state);
 
        state->status = TSS_SORTEDINMEM;
        state->boundUsed = true;
@@ -2753,6 +2795,24 @@ tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
        memtuples[i] = *tuple;
 }
 
+/*
+ * Function to reverse the sort direction from its current state
+ *
+ * It is not safe to call this when performing hash tuplesorts
+ */
+static void
+reversedirection(Tuplesortstate *state)
+{
+       SortSupport sortKey = state->sortKeys;
+       int                     nkey;
+
+       for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
+       {
+               sortKey->ssup_reverse = !sortKey->ssup_reverse;
+               sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
+       }
+}
+
 
 /*
  * Tape interface routines
@@ -2780,73 +2840,6 @@ markrunend(Tuplesortstate *state, int tapenum)
 }
 
 
-/*
- * Inline-able copy of FunctionCall2Coll() to save some cycles in sorting.
- */
-static inline Datum
-myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
-{
-       FunctionCallInfoData fcinfo;
-       Datum           result;
-
-       InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL);
-
-       fcinfo.arg[0] = arg1;
-       fcinfo.arg[1] = arg2;
-       fcinfo.argnull[0] = false;
-       fcinfo.argnull[1] = false;
-
-       result = FunctionCallInvoke(&fcinfo);
-
-       /* Check for null result, since caller is clearly not expecting one */
-       if (fcinfo.isnull)
-               elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
-
-       return result;
-}
-
-/*
- * Apply a sort function (by now converted to fmgr lookup form)
- * and return a 3-way comparison result.  This takes care of handling
- * reverse-sort and NULLs-ordering properly.  We assume that DESC and
- * NULLS_FIRST options are encoded in sk_flags the same way btree does it.
- */
-static inline int32
-inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation,
-                                               Datum datum1, bool isNull1,
-                                               Datum datum2, bool isNull2)
-{
-       int32           compare;
-
-       if (isNull1)
-       {
-               if (isNull2)
-                       compare = 0;            /* NULL "=" NULL */
-               else if (sk_flags & SK_BT_NULLS_FIRST)
-                       compare = -1;           /* NULL "<" NOT_NULL */
-               else
-                       compare = 1;            /* NULL ">" NOT_NULL */
-       }
-       else if (isNull2)
-       {
-               if (sk_flags & SK_BT_NULLS_FIRST)
-                       compare = 1;            /* NOT_NULL ">" NULL */
-               else
-                       compare = -1;           /* NOT_NULL "<" NULL */
-       }
-       else
-       {
-               compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation,
-                                                                                                       datum1, datum2));
-
-               if (sk_flags & SK_BT_DESC)
-                       compare = -compare;
-       }
-
-       return compare;
-}
-
-
 /*
  * Routines specialized for HeapTuple (actually MinimalTuple) case
  */
@@ -2972,20 +2965,6 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
                                                                &stup->isnull1);
 }
 
-static void
-reversedirection_heap(Tuplesortstate *state)
-{
-       SortSupport sortKey = state->sortKeys;
-       int                     nkey;
-
-       for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
-       {
-               sortKey->ssup_reverse = !sortKey->ssup_reverse;
-               sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
-       }
-}
-
-
 /*
  * Routines specialized for the CLUSTER case (HeapTuple data, with
  * comparisons per a btree index definition)
@@ -2995,7 +2974,7 @@ static int
 comparetup_cluster(const SortTuple *a, const SortTuple *b,
                                   Tuplesortstate *state)
 {
-       ScanKey         scanKey = state->indexScanKey;
+       SortSupport     sortKey = state->sortKeys;
        HeapTuple       ltup;
        HeapTuple       rtup;
        TupleDesc       tupDesc;
@@ -3005,14 +2984,13 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
        /* Compare the leading sort key, if it's simple */
        if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
        {
-               compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
-                                                                                 scanKey->sk_collation,
-                                                                                 a->datum1, a->isnull1,
-                                                                                 b->datum1, b->isnull1);
+               compare = ApplySortComparator(a->datum1, a->isnull1,
+                                                                         b->datum1, b->isnull1,
+                                                                         sortKey);
                if (compare != 0 || state->nKeys == 1)
                        return compare;
                /* Compare additional columns the hard way */
-               scanKey++;
+               sortKey++;
                nkey = 1;
        }
        else
@@ -3030,7 +3008,7 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
                /* If not expression index, just compare the proper heap attrs */
                tupDesc = state->tupDesc;
 
-               for (; nkey < state->nKeys; nkey++, scanKey++)
+               for (; nkey < state->nKeys; nkey++, sortKey++)
                {
                        AttrNumber      attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
                        Datum           datum1,
@@ -3041,11 +3019,9 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
                        datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
                        datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
 
-                       compare = inlineApplySortFunction(&scanKey->sk_func,
-                                                                                         scanKey->sk_flags,
-                                                                                         scanKey->sk_collation,
-                                                                                         datum1, isnull1,
-                                                                                         datum2, isnull2);
+                       compare = ApplySortComparator(datum1, isnull1,
+                                                                                 datum2, isnull2,
+                                                                                 sortKey);
                        if (compare != 0)
                                return compare;
                }
@@ -3077,15 +3053,13 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
                FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
                                           r_index_values, r_index_isnull);
 
-               for (; nkey < state->nKeys; nkey++, scanKey++)
+               for (; nkey < state->nKeys; nkey++, sortKey++)
                {
-                       compare = inlineApplySortFunction(&scanKey->sk_func,
-                                                                                         scanKey->sk_flags,
-                                                                                         scanKey->sk_collation,
-                                                                                         l_index_values[nkey],
-                                                                                         l_index_isnull[nkey],
-                                                                                         r_index_values[nkey],
-                                                                                         r_index_isnull[nkey]);
+                       compare = ApplySortComparator(l_index_values[nkey],
+                                                                                 l_index_isnull[nkey],
+                                                                                 r_index_values[nkey],
+                                                                                 r_index_isnull[nkey],
+                                                                                 sortKey);
                        if (compare != 0)
                                return compare;
                }
@@ -3180,7 +3154,7 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
         * is also special handling for enforcing uniqueness, and special treatment
         * for equal keys at the end.
         */
-       ScanKey         scanKey = state->indexScanKey;
+       SortSupport     sortKey = state->sortKeys;
        IndexTuple      tuple1;
        IndexTuple      tuple2;
        int                     keysz;
@@ -3190,10 +3164,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
        int32           compare;
 
        /* Compare the leading sort key */
-       compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
-                                                                         scanKey->sk_collation,
-                                                                         a->datum1, a->isnull1,
-                                                                         b->datum1, b->isnull1);
+       compare = ApplySortComparator(a->datum1, a->isnull1,
+                                                                 b->datum1, b->isnull1,
+                                                                 sortKey);
        if (compare != 0)
                return compare;
 
@@ -3206,8 +3179,8 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
        tuple2 = (IndexTuple) b->tuple;
        keysz = state->nKeys;
        tupDes = RelationGetDescr(state->indexRel);
-       scanKey++;
-       for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
+       sortKey++;
+       for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
        {
                Datum           datum1,
                                        datum2;
@@ -3217,10 +3190,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
                datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
                datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
 
-               compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
-                                                                                 scanKey->sk_collation,
-                                                                                 datum1, isnull1,
-                                                                                 datum2, isnull2);
+               compare = ApplySortComparator(datum1, isnull1,
+                                                                         datum2, isnull2,
+                                                                         sortKey);
                if (compare != 0)
                        return compare;         /* done when we find unequal attributes */
 
@@ -3394,26 +3366,6 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
                                                                 &stup->isnull1);
 }
 
-static void
-reversedirection_index_btree(Tuplesortstate *state)
-{
-       ScanKey         scanKey = state->indexScanKey;
-       int                     nkey;
-
-       for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
-       {
-               scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
-       }
-}
-
-static void
-reversedirection_index_hash(Tuplesortstate *state)
-{
-       /* We don't support reversing direction in a hash index sort */
-       elog(ERROR, "reversedirection_index_hash is not implemented");
-}
-
-
 /*
  * Routines specialized for DatumTuple case
  */
@@ -3512,13 +3464,6 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
                                                         &tuplen, sizeof(tuplen));
 }
 
-static void
-reversedirection_datum(Tuplesortstate *state)
-{
-       state->onlyKey->ssup_reverse = !state->onlyKey->ssup_reverse;
-       state->onlyKey->ssup_nulls_first = !state->onlyKey->ssup_nulls_first;
-}
-
 /*
  * Convenience routine to free a tuple previously loaded into sort memory
  */
index 4417143c345e87fdfa73a87651427c07d0a93678..faae703e0e5dc5e5cdadcf208d827554dece7b7b 100644 (file)
@@ -48,6 +48,7 @@
 #define SORTSUPPORT_H
 
 #include "access/attnum.h"
+#include "utils/relcache.h"
 
 typedef struct SortSupportData *SortSupport;
 
@@ -152,5 +153,7 @@ ApplySortComparator(Datum datum1, bool isNull1,
 /* Other functions in utils/sort/sortsupport.c */
 extern void PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup);
 extern void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup);
+extern void PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy,
+                                                                                  SortSupport ssup);
 
 #endif   /* SORTSUPPORT_H */