]> granicus.if.org Git - postgresql/commitdiff
If we're going to advertise the array overlap/containment operators,
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 10 Sep 2006 20:14:20 +0000 (20:14 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 10 Sep 2006 20:14:20 +0000 (20:14 +0000)
we probably should make them work reliably for all arrays.  Fix code
to handle NULLs and multidimensional arrays, move it into arrayfuncs.c.
GIN is still restricted to indexing arrays with no null elements, however.

src/backend/access/gin/ginarrayproc.c
src/backend/utils/adt/arrayfuncs.c
src/include/access/gin.h
src/include/utils/array.h

index 82f487316fe57c067efc92fb5069887f7f6472f7..911cf629832d0a8cf0cc5d3e57ab157f1be45612 100644 (file)
@@ -1,23 +1,22 @@
 /*-------------------------------------------------------------------------
  *
- * ginvacuum.c
- *    support function for GIN's indexing of any array
+ * ginarrayproc.c
+ *    support functions for GIN's indexing of any array
  *
  *
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *          $PostgreSQL: pgsql/src/backend/access/gin/ginarrayproc.c,v 1.4 2006/07/14 14:52:16 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gin/ginarrayproc.c,v 1.5 2006/09/10 20:14:20 tgl Exp $
  *-------------------------------------------------------------------------
  */
-
 #include "postgres.h"
+
+#include "access/gin.h"
 #include "utils/array.h"
-#include "utils/builtins.h"
 #include "utils/lsyscache.h"
-#include "utils/typcache.h"
-#include "access/gin.h"
+
 
 #define GinOverlapStrategy             1
 #define GinContainsStrategy            2
                ereport(ERROR,                                                                          \
                        (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),               \
                         errmsg("array must not contain nulls")));              \
-                                                                                                                       \
-       if ( ARR_NDIM(x) != 1 && ARR_NDIM(x) != 0 )                     \
-               ereport(ERROR,                                                                          \
-                       (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),                \
-                        errmsg("array must be one-dimensional")));     \
 } while(0) 
 
+
 /*
  * Function used as extractValue and extractQuery both
  */
@@ -70,9 +65,9 @@ ginarrayconsistent(PG_FUNCTION_ARGS) {
        bool    *check = (bool*)PG_GETARG_POINTER(0);
        StrategyNumber  strategy = PG_GETARG_UINT16(1);
        ArrayType   *query = PG_GETARG_ARRAYTYPE_P(2);
-       int res=FALSE, i, nentries=ArrayGetNItems(ARR_NDIM(query), ARR_DIMS(query));
+       int res, i, nentries;
 
-       /* we can do not check array carefully, it's done by previous ginarrayextract call */
+       /* ARRAYCHECK was already done by previous ginarrayextract call */
 
        switch( strategy ) {
                case GinOverlapStrategy:
@@ -82,6 +77,7 @@ ginarrayconsistent(PG_FUNCTION_ARGS) {
                        break;
                case GinContainsStrategy:
                case GinEqualStrategy:
+                       nentries=ArrayGetNItems(ARR_NDIM(query), ARR_DIMS(query));
                        res = TRUE;
                        for(i=0;i<nentries;i++)
                                if ( !check[i] ) {
@@ -90,168 +86,10 @@ ginarrayconsistent(PG_FUNCTION_ARGS) {
                                }
                        break;
                default:
-                       elog(ERROR, "ginarrayconsistent: unknown strategy number: %d", strategy);
+                       elog(ERROR, "ginarrayconsistent: unknown strategy number: %d",
+                                strategy);
+                       res = FALSE;
        }
 
        PG_RETURN_BOOL(res);
 }
-
-static TypeCacheEntry*
-fillTypeCacheEntry( TypeCacheEntry *typentry, Oid element_type ) {
-       if ( typentry && typentry->type_id == element_type )
-               return typentry;
-
-       typentry = lookup_type_cache(element_type,      TYPECACHE_EQ_OPR_FINFO);
-       if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
-               ereport(ERROR,
-                               (errcode(ERRCODE_UNDEFINED_FUNCTION),
-                               errmsg("could not identify an equality operator for type %s", format_type_be(element_type))));
-
-       return typentry;
-}
-
-static bool
-typeEQ(FunctionCallInfoData *locfcinfo, Datum a, Datum b) {
-       locfcinfo->arg[0] = a;
-       locfcinfo->arg[1] = b;
-       locfcinfo->argnull[0] = false;
-       locfcinfo->argnull[1] = false;
-       locfcinfo->isnull = false;
-
-       return DatumGetBool(FunctionCallInvoke(locfcinfo));
-}
-
-static bool
-ginArrayOverlap(TypeCacheEntry *typentry, ArrayType *a, ArrayType *b) {
-       Datum   *da, *db;
-       int             na, nb, j, i;
-       FunctionCallInfoData locfcinfo;
-       
-       if ( ARR_ELEMTYPE(a) != ARR_ELEMTYPE(b) )
-               ereport(ERROR,
-                       (errcode(ERRCODE_DATATYPE_MISMATCH),
-                       errmsg("cannot compare arrays of different element types")));
-
-       ARRAYCHECK(a);
-       ARRAYCHECK(b);
-
-       deconstruct_array(a,
-               ARR_ELEMTYPE(a),
-               typentry->typlen, typentry->typbyval, typentry->typalign,
-               &da, NULL, &na);
-       deconstruct_array(b,
-               ARR_ELEMTYPE(b),
-               typentry->typlen, typentry->typbyval, typentry->typalign,
-               &db, NULL, &nb);
-
-       InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
-               NULL, NULL);
-
-       for(i=0;i<na;i++) {
-               for(j=0;j<nb;j++) {
-                       if ( typeEQ(&locfcinfo, da[i], db[j]) ) {
-                                       pfree( da );
-                                       pfree( db );
-                                       return TRUE;
-                       }
-               }
-       }
-
-       pfree( da );
-       pfree( db );
-
-       return FALSE;
-}
-
-static bool
-ginArrayContains(TypeCacheEntry *typentry, ArrayType *a, ArrayType *b) {
-       Datum   *da, *db;
-       int             na, nb, j, i, n = 0;
-       FunctionCallInfoData locfcinfo;
-       
-       if ( ARR_ELEMTYPE(a) != ARR_ELEMTYPE(b) )
-               ereport(ERROR,
-                       (errcode(ERRCODE_DATATYPE_MISMATCH),
-                       errmsg("cannot compare arrays of different element types")));
-
-       ARRAYCHECK(a);
-       ARRAYCHECK(b);
-
-       deconstruct_array(a,
-               ARR_ELEMTYPE(a),
-               typentry->typlen, typentry->typbyval, typentry->typalign,
-               &da, NULL, &na);
-       deconstruct_array(b,
-               ARR_ELEMTYPE(b),
-               typentry->typlen, typentry->typbyval, typentry->typalign,
-               &db, NULL, &nb);
-
-       InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
-               NULL, NULL);
-
-       for(i=0;i<nb;i++) {
-               for(j=0;j<na;j++) {
-                       if ( typeEQ(&locfcinfo, db[i], da[j]) ) {
-                               n++;
-                               break;
-                       }
-               }
-       }
-
-       pfree( da );
-       pfree( db );
-
-       return ( n==nb ) ? TRUE : FALSE;
-}
-
-Datum
-arrayoverlap(PG_FUNCTION_ARGS) {
-       ArrayType   *a = PG_GETARG_ARRAYTYPE_P(0);
-       ArrayType   *b = PG_GETARG_ARRAYTYPE_P(1);
-       TypeCacheEntry *typentry = fillTypeCacheEntry( fcinfo->flinfo->fn_extra, ARR_ELEMTYPE(a) ); 
-       bool res;
-
-       fcinfo->flinfo->fn_extra = (void*)typentry;
-
-       res = ginArrayOverlap( typentry, a, b ); 
-
-       PG_FREE_IF_COPY(a,0);
-       PG_FREE_IF_COPY(b,1);
-
-       PG_RETURN_BOOL(res);
-}
-
-Datum
-arraycontains(PG_FUNCTION_ARGS) {
-       ArrayType   *a = PG_GETARG_ARRAYTYPE_P(0);
-       ArrayType   *b = PG_GETARG_ARRAYTYPE_P(1);
-       TypeCacheEntry *typentry = fillTypeCacheEntry( fcinfo->flinfo->fn_extra, ARR_ELEMTYPE(a) ); 
-       bool res;
-
-       fcinfo->flinfo->fn_extra = (void*)typentry;
-
-       res = ginArrayContains( typentry, a, b ); 
-
-       PG_FREE_IF_COPY(a,0);
-       PG_FREE_IF_COPY(b,1);
-
-       PG_RETURN_BOOL(res);
-}
-
-Datum
-arraycontained(PG_FUNCTION_ARGS) {
-       ArrayType   *a = PG_GETARG_ARRAYTYPE_P(0);
-       ArrayType   *b = PG_GETARG_ARRAYTYPE_P(1);
-       TypeCacheEntry *typentry = fillTypeCacheEntry( fcinfo->flinfo->fn_extra, ARR_ELEMTYPE(a) ); 
-       bool res;
-
-       fcinfo->flinfo->fn_extra = (void*)typentry;
-
-       res = ginArrayContains( typentry, b, a ); 
-
-       PG_FREE_IF_COPY(a,0);
-       PG_FREE_IF_COPY(b,1);
-
-       PG_RETURN_BOOL(res);
-}
-
index 58dfb0fc085f3d03287eaba468cf3d35c001d659..8b0c11c163dd5029dcd011acaa739896a598c0c8 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.130 2006/07/14 14:52:23 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.131 2006/09/10 20:14:20 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -3345,6 +3345,232 @@ array_cmp(FunctionCallInfo fcinfo)
 }
 
 
+/*-----------------------------------------------------------------------------
+ * array overlap/containment comparisons
+ *             These use the same methods of comparing array elements as array_eq.
+ *             We consider only the elements of the arrays, ignoring dimensionality.
+ *----------------------------------------------------------------------------
+ */
+
+/*
+ * array_contain_compare :
+ *               compares two arrays for overlap/containment
+ *
+ * When matchall is true, return true if all members of array1 are in array2.
+ * When matchall is false, return true if any members of array1 are in array2.
+ */
+static bool
+array_contain_compare(ArrayType *array1, ArrayType *array2, bool matchall,
+                                         void **fn_extra)
+{
+       bool            result = matchall;
+       Oid                     element_type = ARR_ELEMTYPE(array1);
+       TypeCacheEntry *typentry;
+       int                     nelems1;
+       Datum      *values2;
+       bool       *nulls2;
+       int                     nelems2;
+       int                     typlen;
+       bool            typbyval;
+       char            typalign;
+       char       *ptr1;
+       bits8      *bitmap1;
+       int                     bitmask;
+       int                     i;
+       int                     j;
+       FunctionCallInfoData locfcinfo;
+
+       if (element_type != ARR_ELEMTYPE(array2))
+               ereport(ERROR,
+                               (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                errmsg("cannot compare arrays of different element types")));
+
+       /*
+        * We arrange to look up the equality function only once per series of
+        * calls, assuming the element type doesn't change underneath us.  The
+        * typcache is used so that we have no memory leakage when being used
+        * as an index support function.
+        */
+       typentry = (TypeCacheEntry *) *fn_extra;
+       if (typentry == NULL ||
+               typentry->type_id != element_type)
+       {
+               typentry = lookup_type_cache(element_type,
+                                                                        TYPECACHE_EQ_OPR_FINFO);
+               if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_FUNCTION),
+                                        errmsg("could not identify an equality operator for type %s",
+                                                       format_type_be(element_type))));
+               *fn_extra = (void *) typentry;
+       }
+       typlen = typentry->typlen;
+       typbyval = typentry->typbyval;
+       typalign = typentry->typalign;
+
+       /*
+        * Since we probably will need to scan array2 multiple times, it's
+        * worthwhile to use deconstruct_array on it.  We scan array1 the hard way
+        * however, since we very likely won't need to look at all of it.
+        */
+       deconstruct_array(array2, element_type, typlen, typbyval, typalign,
+                                         &values2, &nulls2, &nelems2);
+
+       /*
+        * Apply the comparison operator to each pair of array elements.
+        */
+       InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
+                                                        NULL, NULL);
+
+       /* Loop over source data */
+       nelems1 = ArrayGetNItems(ARR_NDIM(array1), ARR_DIMS(array1));
+       ptr1 = ARR_DATA_PTR(array1);
+       bitmap1 = ARR_NULLBITMAP(array1);
+       bitmask = 1;
+
+       for (i = 0; i < nelems1; i++)
+       {
+               Datum           elt1;
+               bool            isnull1;
+
+               /* Get element, checking for NULL */
+               if (bitmap1 && (*bitmap1 & bitmask) == 0)
+               {
+                       isnull1 = true;
+                       elt1 = (Datum) 0;
+               }
+               else
+               {
+                       isnull1 = false;
+                       elt1 = fetch_att(ptr1, typbyval, typlen);
+                       ptr1 = att_addlength(ptr1, typlen, PointerGetDatum(ptr1));
+                       ptr1 = (char *) att_align(ptr1, typalign);
+               }
+
+               /* advance bitmap pointer if any */
+               bitmask <<= 1;
+               if (bitmask == 0x100)
+               {
+                       if (bitmap1)
+                               bitmap1++;
+                       bitmask = 1;
+               }
+
+               /*
+                * We assume that the comparison operator is strict, so a NULL
+                * can't match anything.  XXX this diverges from the "NULL=NULL"
+                * behavior of array_eq, should we act like that?
+                */
+               if (isnull1)
+               {
+                       if (matchall)
+                       {
+                               result = false;
+                               break;
+                       }
+                       continue;
+               }
+
+               for (j = 0; j < nelems2; j++)
+               {
+                       Datum           elt2 = values2[j];
+                       bool            isnull2 = nulls2[j];
+                       bool            oprresult;
+
+                       if (isnull2)
+                               continue;               /* can't match */
+
+                       /*
+                        * Apply the operator to the element pair
+                        */
+                       locfcinfo.arg[0] = elt1;
+                       locfcinfo.arg[1] = elt2;
+                       locfcinfo.argnull[0] = false;
+                       locfcinfo.argnull[1] = false;
+                       locfcinfo.isnull = false;
+                       oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo));
+                       if (oprresult)
+                               break;
+               }
+
+               if (j < nelems2)
+               {
+                       /* found a match for elt1 */
+                       if (!matchall)
+                       {
+                               result = true;
+                               break;
+                       }
+               }
+               else
+               {
+                       /* no match for elt1 */
+                       if (matchall)
+                       {
+                               result = false;
+                               break;
+                       }
+               }
+       }
+
+       pfree(values2);
+       pfree(nulls2);
+
+       return result;
+}
+
+Datum
+arrayoverlap(PG_FUNCTION_ARGS)
+{
+       ArrayType  *array1 = PG_GETARG_ARRAYTYPE_P(0);
+       ArrayType  *array2 = PG_GETARG_ARRAYTYPE_P(1);
+       bool            result;
+
+       result = array_contain_compare(array1, array2, false,
+                                                                  &fcinfo->flinfo->fn_extra);
+
+       /* Avoid leaking memory when handed toasted input. */
+       PG_FREE_IF_COPY(array1, 0);
+       PG_FREE_IF_COPY(array2, 1);
+
+       PG_RETURN_BOOL(result);
+}
+
+Datum
+arraycontains(PG_FUNCTION_ARGS)
+{
+       ArrayType  *array1 = PG_GETARG_ARRAYTYPE_P(0);
+       ArrayType  *array2 = PG_GETARG_ARRAYTYPE_P(1);
+       bool            result;
+
+       result = array_contain_compare(array2, array1, true,
+                                                                  &fcinfo->flinfo->fn_extra);
+
+       /* Avoid leaking memory when handed toasted input. */
+       PG_FREE_IF_COPY(array1, 0);
+       PG_FREE_IF_COPY(array2, 1);
+
+       PG_RETURN_BOOL(result);
+}
+
+Datum
+arraycontained(PG_FUNCTION_ARGS)
+{
+       ArrayType  *array1 = PG_GETARG_ARRAYTYPE_P(0);
+       ArrayType  *array2 = PG_GETARG_ARRAYTYPE_P(1);
+       bool            result;
+
+       result = array_contain_compare(array1, array2, true,
+                                                                  &fcinfo->flinfo->fn_extra);
+
+       /* Avoid leaking memory when handed toasted input. */
+       PG_FREE_IF_COPY(array1, 0);
+       PG_FREE_IF_COPY(array2, 1);
+
+       PG_RETURN_BOOL(result);
+}
+
+
 /***************************************************************************/
 /******************|             Support  Routines                       |*****************/
 /***************************************************************************/
index 64aef37a94c5bd3f9aac0d6a7aa62ca4c87c46da..8bdb030c208ad6ad93d88f8b1509dd8c678e3042 100644 (file)
@@ -3,7 +3,7 @@
  *    header file for postgres inverted index access method implementation.
  *
  *  Copyright (c) 2006, PostgreSQL Global Development Group
- *  $PostgreSQL: pgsql/src/include/access/gin.h,v 1.6 2006/08/07 16:57:57 tgl Exp $
+ *  $PostgreSQL: pgsql/src/include/access/gin.h,v 1.7 2006/09/10 20:14:20 tgl Exp $
  *--------------------------------------------------------------------------
  */
 
@@ -409,11 +409,6 @@ extern Datum ginvacuumcleanup(PG_FUNCTION_ARGS);
 extern Datum ginarrayextract(PG_FUNCTION_ARGS);
 extern Datum ginarrayconsistent(PG_FUNCTION_ARGS);
 
-/* I'm not sure that is the best place */
-extern Datum arrayoverlap(PG_FUNCTION_ARGS);
-extern Datum arraycontains(PG_FUNCTION_ARGS);
-extern Datum arraycontained(PG_FUNCTION_ARGS);
-
 /* ginbulk.c */
 typedef struct EntryAccumulator {
     Datum       value;
index 67085d6f612110df5198716a8410ef8f1de1cae5..a273ee765e11c2fd486cb6d5de3d34cc94282cf4 100644 (file)
@@ -49,7 +49,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/utils/array.h,v 1.58 2006/03/05 15:59:06 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/utils/array.h,v 1.59 2006/09/10 20:14:20 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -193,6 +193,9 @@ extern Datum array_gt(PG_FUNCTION_ARGS);
 extern Datum array_le(PG_FUNCTION_ARGS);
 extern Datum array_ge(PG_FUNCTION_ARGS);
 extern Datum btarraycmp(PG_FUNCTION_ARGS);
+extern Datum arrayoverlap(PG_FUNCTION_ARGS);
+extern Datum arraycontains(PG_FUNCTION_ARGS);
+extern Datum arraycontained(PG_FUNCTION_ARGS);
 extern Datum array_dims(PG_FUNCTION_ARGS);
 extern Datum array_lower(PG_FUNCTION_ARGS);
 extern Datum array_upper(PG_FUNCTION_ARGS);