]> granicus.if.org Git - postgresql/blobdiff - src/backend/utils/cache/lsyscache.c
Create a "sort support" interface API for faster sorting.
[postgresql] / src / backend / utils / cache / lsyscache.c
index 4769f6f35eb37b2b5d6251920f465b0f4ee67fa1..7a4306e93f5545764cbd02e6aa042120b385790c 100644 (file)
@@ -3,11 +3,11 @@
  * lsyscache.c
  *       Convenience routines for common queries in the system catalog cache.
  *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/cache/lsyscache.c,v 1.167 2010/02/14 18:42:17 rhaas Exp $
+ *       src/backend/utils/cache/lsyscache.c
  *
  * NOTES
  *       Eventually, the index information should go through here, too.
 #include "bootstrap/bootstrap.h"
 #include "catalog/pg_amop.h"
 #include "catalog/pg_amproc.h"
+#include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_range.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
+#include "utils/typcache.h"
 
 /* Hook for plugins to get control in get_attavgwidth() */
 get_attavgwidth_hook_type get_attavgwidth_hook = NULL;
@@ -45,12 +50,15 @@ get_attavgwidth_hook_type get_attavgwidth_hook = NULL;
  * op_in_opfamily
  *
  *             Return t iff operator 'opno' is in operator family 'opfamily'.
+ *
+ * This function only considers search operators, not ordering operators.
  */
 bool
 op_in_opfamily(Oid opno, Oid opfamily)
 {
-       return SearchSysCacheExists2(AMOPOPID,
+       return SearchSysCacheExists3(AMOPOPID,
                                                                 ObjectIdGetDatum(opno),
+                                                                CharGetDatum(AMOP_SEARCH),
                                                                 ObjectIdGetDatum(opfamily));
 }
 
@@ -59,6 +67,8 @@ op_in_opfamily(Oid opno, Oid opfamily)
  *
  *             Get the operator's strategy number within the specified opfamily,
  *             or 0 if it's not a member of the opfamily.
+ *
+ * This function only considers search operators, not ordering operators.
  */
 int
 get_op_opfamily_strategy(Oid opno, Oid opfamily)
@@ -67,8 +77,9 @@ get_op_opfamily_strategy(Oid opno, Oid opfamily)
        Form_pg_amop amop_tup;
        int                     result;
 
-       tp = SearchSysCache2(AMOPOPID,
+       tp = SearchSysCache3(AMOPOPID,
                                                 ObjectIdGetDatum(opno),
+                                                CharGetDatum(AMOP_SEARCH),
                                                 ObjectIdGetDatum(opfamily));
        if (!HeapTupleIsValid(tp))
                return 0;
@@ -78,6 +89,31 @@ get_op_opfamily_strategy(Oid opno, Oid opfamily)
        return result;
 }
 
+/*
+ * get_op_opfamily_sortfamily
+ *
+ *             If the operator is an ordering operator within the specified opfamily,
+ *             return its amopsortfamily OID; else return InvalidOid.
+ */
+Oid
+get_op_opfamily_sortfamily(Oid opno, Oid opfamily)
+{
+       HeapTuple       tp;
+       Form_pg_amop amop_tup;
+       Oid                     result;
+
+       tp = SearchSysCache3(AMOPOPID,
+                                                ObjectIdGetDatum(opno),
+                                                CharGetDatum(AMOP_ORDER),
+                                                ObjectIdGetDatum(opfamily));
+       if (!HeapTupleIsValid(tp))
+               return InvalidOid;
+       amop_tup = (Form_pg_amop) GETSTRUCT(tp);
+       result = amop_tup->amopsortfamily;
+       ReleaseSysCache(tp);
+       return result;
+}
+
 /*
  * get_op_opfamily_properties
  *
@@ -88,7 +124,7 @@ get_op_opfamily_strategy(Oid opno, Oid opfamily)
  * therefore we raise an error if the tuple is not found.
  */
 void
-get_op_opfamily_properties(Oid opno, Oid opfamily,
+get_op_opfamily_properties(Oid opno, Oid opfamily, bool ordering_op,
                                                   int *strategy,
                                                   Oid *lefttype,
                                                   Oid *righttype)
@@ -96,8 +132,9 @@ get_op_opfamily_properties(Oid opno, Oid opfamily,
        HeapTuple       tp;
        Form_pg_amop amop_tup;
 
-       tp = SearchSysCache2(AMOPOPID,
+       tp = SearchSysCache3(AMOPOPID,
                                                 ObjectIdGetDatum(opno),
+                                                CharGetDatum(ordering_op ? AMOP_ORDER : AMOP_SEARCH),
                                                 ObjectIdGetDatum(opfamily));
        if (!HeapTupleIsValid(tp))
                elog(ERROR, "operator %u is not a member of opfamily %u",
@@ -207,19 +244,22 @@ get_ordering_op_properties(Oid opno,
 }
 
 /*
- * get_compare_function_for_ordering_op
- *             Get the OID of the datatype-specific btree comparison function
+ * get_sort_function_for_ordering_op
+ *             Get the OID of the datatype-specific btree sort support function,
+ *             or if there is none, the btree comparison function,
  *             associated with an ordering operator (a "<" or ">" operator).
  *
- * *cmpfunc receives the comparison function OID.
+ * *sortfunc receives the support or comparison function OID.
+ * *issupport is set TRUE if it's a support func, FALSE if a comparison func.
  * *reverse is set FALSE if the operator is "<", TRUE if it's ">"
- * (indicating the comparison result must be negated before use).
+ * (indicating that comparison results must be negated before use).
  *
  * Returns TRUE if successful, FALSE if no btree function can be found.
  * (This indicates that the operator is not a valid ordering operator.)
  */
 bool
-get_compare_function_for_ordering_op(Oid opno, Oid *cmpfunc, bool *reverse)
+get_sort_function_for_ordering_op(Oid opno, Oid *sortfunc,
+                                                                 bool *issupport, bool *reverse)
 {
        Oid                     opfamily;
        Oid                     opcintype;
@@ -230,21 +270,31 @@ get_compare_function_for_ordering_op(Oid opno, Oid *cmpfunc, bool *reverse)
                                                                   &opfamily, &opcintype, &strategy))
        {
                /* Found a suitable opfamily, get matching support function */
-               *cmpfunc = get_opfamily_proc(opfamily,
-                                                                        opcintype,
-                                                                        opcintype,
-                                                                        BTORDER_PROC);
-
-               if (!OidIsValid(*cmpfunc))              /* should not happen */
-                       elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
-                                BTORDER_PROC, opcintype, opcintype, opfamily);
+               *sortfunc = get_opfamily_proc(opfamily,
+                                                                         opcintype,
+                                                                         opcintype,
+                                                                         BTSORTSUPPORT_PROC);
+               if (OidIsValid(*sortfunc))
+                       *issupport = true;
+               else
+               {
+                       /* opfamily doesn't provide sort support, get comparison func */
+                       *sortfunc = get_opfamily_proc(opfamily,
+                                                                                 opcintype,
+                                                                                 opcintype,
+                                                                                 BTORDER_PROC);
+                       if (!OidIsValid(*sortfunc))             /* should not happen */
+                               elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
+                                        BTORDER_PROC, opcintype, opcintype, opfamily);
+                       *issupport = false;
+               }
                *reverse = (strategy == BTGreaterStrategyNumber);
                return true;
        }
 
        /* ensure outputs are set on failure */
-       *cmpfunc = InvalidOid;
-
+       *sortfunc = InvalidOid;
+       *issupport = false;
        *reverse = false;
        return false;
 }
@@ -587,52 +637,30 @@ get_op_hash_functions(Oid opno,
 /*
  * get_op_btree_interpretation
  *             Given an operator's OID, find out which btree opfamilies it belongs to,
- *             and what strategy number it has within each one.  The results are
- *             returned as an OID list and a parallel integer list.
+ *             and what properties it has within each one.  The results are returned
+ *             as a palloc'd list of OpBtreeInterpretation structs.
  *
  * In addition to the normal btree operators, we consider a <> operator to be
  * a "member" of an opfamily if its negator is an equality operator of the
  * opfamily.  ROWCOMPARE_NE is returned as the strategy number for this case.
  */
-void
-get_op_btree_interpretation(Oid opno, List **opfamilies, List **opstrats)
+List *
+get_op_btree_interpretation(Oid opno)
 {
+       List       *result = NIL;
+       OpBtreeInterpretation *thisresult;
        CatCList   *catlist;
-       bool            op_negated;
        int                     i;
 
-       *opfamilies = NIL;
-       *opstrats = NIL;
-
        /*
         * Find all the pg_amop entries containing the operator.
         */
        catlist = SearchSysCacheList1(AMOPOPID, ObjectIdGetDatum(opno));
 
-       /*
-        * If we can't find any opfamily containing the op, perhaps it is a <>
-        * operator.  See if it has a negator that is in an opfamily.
-        */
-       op_negated = false;
-       if (catlist->n_members == 0)
-       {
-               Oid                     op_negator = get_negator(opno);
-
-               if (OidIsValid(op_negator))
-               {
-                       op_negated = true;
-                       ReleaseSysCacheList(catlist);
-                       catlist = SearchSysCacheList1(AMOPOPID, 
-                                                                                 ObjectIdGetDatum(op_negator));
-               }
-       }
-
-       /* Now search the opfamilies */
        for (i = 0; i < catlist->n_members; i++)
        {
                HeapTuple       op_tuple = &catlist->members[i]->tuple;
                Form_pg_amop op_form = (Form_pg_amop) GETSTRUCT(op_tuple);
-               Oid                     opfamily_id;
                StrategyNumber op_strategy;
 
                /* must be btree */
@@ -640,23 +668,66 @@ get_op_btree_interpretation(Oid opno, List **opfamilies, List **opstrats)
                        continue;
 
                /* Get the operator's btree strategy number */
-               opfamily_id = op_form->amopfamily;
                op_strategy = (StrategyNumber) op_form->amopstrategy;
                Assert(op_strategy >= 1 && op_strategy <= 5);
 
-               if (op_negated)
+               thisresult = (OpBtreeInterpretation *)
+                       palloc(sizeof(OpBtreeInterpretation));
+               thisresult->opfamily_id = op_form->amopfamily;
+               thisresult->strategy = op_strategy;
+               thisresult->oplefttype = op_form->amoplefttype;
+               thisresult->oprighttype = op_form->amoprighttype;
+               result = lappend(result, thisresult);
+       }
+
+       ReleaseSysCacheList(catlist);
+
+       /*
+        * If we didn't find any btree opfamily containing the operator, perhaps
+        * it is a <> operator.  See if it has a negator that is in an opfamily.
+        */
+       if (result == NIL)
+       {
+               Oid                     op_negator = get_negator(opno);
+
+               if (OidIsValid(op_negator))
                {
-                       /* Only consider negators that are = */
-                       if (op_strategy != BTEqualStrategyNumber)
-                               continue;
-                       op_strategy = ROWCOMPARE_NE;
-               }
+                       catlist = SearchSysCacheList1(AMOPOPID,
+                                                                                 ObjectIdGetDatum(op_negator));
+
+                       for (i = 0; i < catlist->n_members; i++)
+                       {
+                               HeapTuple       op_tuple = &catlist->members[i]->tuple;
+                               Form_pg_amop op_form = (Form_pg_amop) GETSTRUCT(op_tuple);
+                               StrategyNumber op_strategy;
 
-               *opfamilies = lappend_oid(*opfamilies, opfamily_id);
-               *opstrats = lappend_int(*opstrats, op_strategy);
+                               /* must be btree */
+                               if (op_form->amopmethod != BTREE_AM_OID)
+                                       continue;
+
+                               /* Get the operator's btree strategy number */
+                               op_strategy = (StrategyNumber) op_form->amopstrategy;
+                               Assert(op_strategy >= 1 && op_strategy <= 5);
+
+                               /* Only consider negators that are = */
+                               if (op_strategy != BTEqualStrategyNumber)
+                                       continue;
+
+                               /* OK, report it with "strategy" ROWCOMPARE_NE */
+                               thisresult = (OpBtreeInterpretation *)
+                                       palloc(sizeof(OpBtreeInterpretation));
+                               thisresult->opfamily_id = op_form->amopfamily;
+                               thisresult->strategy = ROWCOMPARE_NE;
+                               thisresult->oplefttype = op_form->amoplefttype;
+                               thisresult->oprighttype = op_form->amoprighttype;
+                               result = lappend(result, thisresult);
+                       }
+
+                       ReleaseSysCacheList(catlist);
+               }
        }
 
-       ReleaseSysCacheList(catlist);
+       return result;
 }
 
 /*
@@ -870,17 +941,17 @@ get_atttypmod(Oid relid, AttrNumber attnum)
 }
 
 /*
- * get_atttypetypmod
+ * get_atttypetypmodcoll
  *
- *             A two-fer: given the relation id and the attribute number,
- *             fetch both type OID and atttypmod in a single cache lookup.
+ *             A three-fer: given the relation id and the attribute number,
+ *             fetch atttypid, atttypmod, and attcollation in a single cache lookup.
  *
  * Unlike the otherwise-similar get_atttype/get_atttypmod, this routine
  * raises an error if it can't obtain the information.
  */
 void
-get_atttypetypmod(Oid relid, AttrNumber attnum,
-                                 Oid *typid, int32 *typmod)
+get_atttypetypmodcoll(Oid relid, AttrNumber attnum,
+                                         Oid *typid, int32 *typmod, Oid *collid)
 {
        HeapTuple       tp;
        Form_pg_attribute att_tup;
@@ -895,9 +966,40 @@ get_atttypetypmod(Oid relid, AttrNumber attnum,
 
        *typid = att_tup->atttypid;
        *typmod = att_tup->atttypmod;
+       *collid = att_tup->attcollation;
        ReleaseSysCache(tp);
 }
 
+/*                             ---------- COLLATION CACHE ----------                                    */
+
+/*
+ * get_collation_name
+ *             Returns the name of a given pg_collation entry.
+ *
+ * Returns a palloc'd copy of the string, or NULL if no such constraint.
+ *
+ * NOTE: since collation name is not unique, be wary of code that uses this
+ * for anything except preparing error messages.
+ */
+char *
+get_collation_name(Oid colloid)
+{
+       HeapTuple       tp;
+
+       tp = SearchSysCache1(COLLOID, ObjectIdGetDatum(colloid));
+       if (HeapTupleIsValid(tp))
+       {
+               Form_pg_collation colltup = (Form_pg_collation) GETSTRUCT(tp);
+               char       *result;
+
+               result = pstrdup(NameStr(colltup->collname));
+               ReleaseSysCache(tp);
+               return result;
+       }
+       else
+               return NULL;
+}
+
 /*                             ---------- CONSTRAINT CACHE ----------                                   */
 
 /*
@@ -1054,20 +1156,48 @@ op_input_types(Oid opno, Oid *lefttype, Oid *righttype)
  * will fail to find any mergejoin plans unless there are suitable btree
  * opfamily entries for this operator and associated sortops.  The pg_operator
  * flag is just a hint to tell the planner whether to bother looking.)
+ *
+ * In some cases (currently only array_eq and record_eq), mergejoinability
+ * depends on the specific input data type the operator is invoked for, so
+ * that must be passed as well. We currently assume that only one input's type
+ * is needed to check this --- by convention, pass the left input's data type.
  */
 bool
-op_mergejoinable(Oid opno)
+op_mergejoinable(Oid opno, Oid inputtype)
 {
-       HeapTuple       tp;
        bool            result = false;
+       HeapTuple       tp;
+       TypeCacheEntry *typentry;
 
-       tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
-       if (HeapTupleIsValid(tp))
+       /*
+        * For array_eq or record_eq, we can sort if the element or field types
+        * are all sortable.  We could implement all the checks for that here, but
+        * the typcache already does that and caches the results too, so let's
+        * rely on the typcache.
+        */
+       if (opno == ARRAY_EQ_OP)
        {
-               Form_pg_operator optup = (Form_pg_operator) GETSTRUCT(tp);
+               typentry = lookup_type_cache(inputtype, TYPECACHE_CMP_PROC);
+               if (typentry->cmp_proc == F_BTARRAYCMP)
+                       result = true;
+       }
+       else if (opno == RECORD_EQ_OP)
+       {
+               typentry = lookup_type_cache(inputtype, TYPECACHE_CMP_PROC);
+               if (typentry->cmp_proc == F_BTRECORDCMP)
+                       result = true;
+       }
+       else
+       {
+               /* For all other operators, rely on pg_operator.oprcanmerge */
+               tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
+               if (HeapTupleIsValid(tp))
+               {
+                       Form_pg_operator optup = (Form_pg_operator) GETSTRUCT(tp);
 
-               result = optup->oprcanmerge;
-               ReleaseSysCache(tp);
+                       result = optup->oprcanmerge;
+                       ReleaseSysCache(tp);
+               }
        }
        return result;
 }
@@ -1077,20 +1207,38 @@ op_mergejoinable(Oid opno)
  *
  * Returns true if the operator is hashjoinable.  (There must be a suitable
  * hash opfamily entry for this operator if it is so marked.)
+ *
+ * In some cases (currently only array_eq), hashjoinability depends on the
+ * specific input data type the operator is invoked for, so that must be
+ * passed as well.     We currently assume that only one input's type is needed
+ * to check this --- by convention, pass the left input's data type.
  */
 bool
-op_hashjoinable(Oid opno)
+op_hashjoinable(Oid opno, Oid inputtype)
 {
-       HeapTuple       tp;
        bool            result = false;
+       HeapTuple       tp;
+       TypeCacheEntry *typentry;
 
-       tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
-       if (HeapTupleIsValid(tp))
+       /* As in op_mergejoinable, let the typcache handle the hard cases */
+       /* Eventually we'll need a similar case for record_eq ... */
+       if (opno == ARRAY_EQ_OP)
        {
-               Form_pg_operator optup = (Form_pg_operator) GETSTRUCT(tp);
+               typentry = lookup_type_cache(inputtype, TYPECACHE_HASH_PROC);
+               if (typentry->hash_proc == F_HASH_ARRAY)
+                       result = true;
+       }
+       else
+       {
+               /* For all other operators, rely on pg_operator.oprcanhash */
+               tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
+               if (HeapTupleIsValid(tp))
+               {
+                       Form_pg_operator optup = (Form_pg_operator) GETSTRUCT(tp);
 
-               result = optup->oprcanhash;
-               ReleaseSysCache(tp);
+                       result = optup->oprcanhash;
+                       ReleaseSysCache(tp);
+               }
        }
        return result;
 }
@@ -1755,10 +1903,9 @@ getTypeIOParam(HeapTuple typeTuple)
 
        /*
         * Array types get their typelem as parameter; everybody else gets their
-        * own type OID as parameter.  (As of 8.2, domains must get their own OID
-        * even if their base type is an array.)
+        * own type OID as parameter.
         */
-       if (typeStruct->typtype == TYPTYPE_BASE && OidIsValid(typeStruct->typelem))
+       if (OidIsValid(typeStruct->typelem))
                return typeStruct->typelem;
        else
                return HeapTupleGetOid(typeTuple);
@@ -1943,6 +2090,7 @@ get_typdefault(Oid typid)
                        /* Build a Const node containing the value */
                        expr = (Node *) makeConst(typid,
                                                                          -1,
+                                                                         type->typcollation,
                                                                          type->typlen,
                                                                          datum,
                                                                          false,
@@ -2115,6 +2263,16 @@ type_is_enum(Oid typid)
        return (get_typtype(typid) == TYPTYPE_ENUM);
 }
 
+/*
+ * type_is_range
+ *       Returns true if the given type is a range type.
+ */
+bool
+type_is_range(Oid typid)
+{
+       return (get_typtype(typid) == TYPTYPE_RANGE);
+}
+
 /*
  * get_type_category_preferred
  *
@@ -2212,6 +2370,52 @@ get_array_type(Oid typid)
        return result;
 }
 
+/*
+ * get_base_element_type
+ *             Given the type OID, get the typelem, looking "through" any domain
+ *             to its underlying array type.
+ *
+ * This is equivalent to get_element_type(getBaseType(typid)), but avoids
+ * an extra cache lookup.  Note that it fails to provide any information
+ * about the typmod of the array.
+ */
+Oid
+get_base_element_type(Oid typid)
+{
+       /*
+        * We loop to find the bottom base type in a stack of domains.
+        */
+       for (;;)
+       {
+               HeapTuple       tup;
+               Form_pg_type typTup;
+
+               tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
+               if (!HeapTupleIsValid(tup))
+                       break;
+               typTup = (Form_pg_type) GETSTRUCT(tup);
+               if (typTup->typtype != TYPTYPE_DOMAIN)
+               {
+                       /* Not a domain, so stop descending */
+                       Oid                     result;
+
+                       /* This test must match get_element_type */
+                       if (typTup->typlen == -1)
+                               result = typTup->typelem;
+                       else
+                               result = InvalidOid;
+                       ReleaseSysCache(tup);
+                       return result;
+               }
+
+               typid = typTup->typbasetype;
+               ReleaseSysCache(tup);
+       }
+
+       /* Like get_element_type, silently return InvalidOid for bogus input */
+       return InvalidOid;
+}
+
 /*
  * getTypeInputInfo
  *
@@ -2394,6 +2598,42 @@ get_typmodout(Oid typid)
 }
 #endif   /* NOT_USED */
 
+/*
+ * get_typcollation
+ *
+ *             Given the type OID, return the type's typcollation attribute.
+ */
+Oid
+get_typcollation(Oid typid)
+{
+       HeapTuple       tp;
+
+       tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
+       if (HeapTupleIsValid(tp))
+       {
+               Form_pg_type typtup = (Form_pg_type) GETSTRUCT(tp);
+               Oid                     result;
+
+               result = typtup->typcollation;
+               ReleaseSysCache(tp);
+               return result;
+       }
+       else
+               return InvalidOid;
+}
+
+
+/*
+ * type_is_collatable
+ *
+ *             Return whether the type cares about collations
+ */
+bool
+type_is_collatable(Oid typid)
+{
+       return OidIsValid(get_typcollation(typid));
+}
+
 
 /*                             ---------- STATISTICS CACHE ----------                                   */
 
@@ -2463,6 +2703,10 @@ get_attavgwidth(Oid relid, AttrNumber attnum)
  * If the attribute type is pass-by-reference, the values referenced by
  * the values array are themselves palloc'd.  The palloc'd stuff can be
  * freed by calling free_attstatsslot.
+ *
+ * Note: at present, atttype/atttypmod aren't actually used here at all.
+ * But the caller must have the correct (or at least binary-compatible)
+ * type ID to pass to free_attstatsslot later.
  */
 bool
 get_attstatsslot(HeapTuple statstuple,
@@ -2478,6 +2722,7 @@ get_attstatsslot(HeapTuple statstuple,
        Datum           val;
        bool            isnull;
        ArrayType  *statarray;
+       Oid                     arrayelemtype;
        int                     narrayelem;
        HeapTuple       typeTuple;
        Form_pg_type typeForm;
@@ -2503,15 +2748,22 @@ get_attstatsslot(HeapTuple statstuple,
                        elog(ERROR, "stavalues is null");
                statarray = DatumGetArrayTypeP(val);
 
-               /* Need to get info about the array element type */
-               typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(atttype));
+               /*
+                * Need to get info about the array element type.  We look at the
+                * actual element type embedded in the array, which might be only
+                * binary-compatible with the passed-in atttype.  The info we extract
+                * here should be the same either way, but deconstruct_array is picky
+                * about having an exact type OID match.
+                */
+               arrayelemtype = ARR_ELEMTYPE(statarray);
+               typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(arrayelemtype));
                if (!HeapTupleIsValid(typeTuple))
-                       elog(ERROR, "cache lookup failed for type %u", atttype);
+                       elog(ERROR, "cache lookup failed for type %u", arrayelemtype);
                typeForm = (Form_pg_type) GETSTRUCT(typeTuple);
 
                /* Deconstruct array into Datum elements; NULLs not expected */
                deconstruct_array(statarray,
-                                                 atttype,
+                                                 arrayelemtype,
                                                  typeForm->typlen,
                                                  typeForm->typbyval,
                                                  typeForm->typalign,
@@ -2627,33 +2879,29 @@ get_namespace_name(Oid nspid)
                return NULL;
 }
 
-/*                             ---------- PG_AUTHID CACHE ----------                                    */
+/*                             ---------- PG_RANGE CACHE ----------                             */
 
 /*
- * get_roleid
- *       Given a role name, look up the role's OID.
- *       Returns InvalidOid if no such role.
+ * get_range_subtype
+ *             Returns the subtype of a given range type
+ *
+ * Returns InvalidOid if the type is not a range type.
  */
 Oid
-get_roleid(const char *rolname)
+get_range_subtype(Oid rangeOid)
 {
-       return GetSysCacheOid1(AUTHNAME, PointerGetDatum(rolname));
-}
+       HeapTuple       tp;
 
-/*
- * get_roleid_checked
- *       Given a role name, look up the role's OID.
- *       ereports if no such role.
- */
-Oid
-get_roleid_checked(const char *rolname)
-{
-       Oid                     roleid;
+       tp = SearchSysCache1(RANGETYPE, ObjectIdGetDatum(rangeOid));
+       if (HeapTupleIsValid(tp))
+       {
+               Form_pg_range   rngtup = (Form_pg_range) GETSTRUCT(tp);
+               Oid                             result;
 
-       roleid = get_roleid(rolname);
-       if (!OidIsValid(roleid))
-               ereport(ERROR,
-                               (errcode(ERRCODE_UNDEFINED_OBJECT),
-                                errmsg("role \"%s\" does not exist", rolname)));
-       return roleid;
+               result = rngtup->rngsubtype;
+               ReleaseSysCache(tp);
+               return result;
+       }
+       else
+               return InvalidOid;
 }