]> granicus.if.org Git - postgresql/commitdiff
Refactor check_functional_grouping() to use get_primary_key_attnos().
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 11 Feb 2016 22:52:03 +0000 (17:52 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 11 Feb 2016 22:52:03 +0000 (17:52 -0500)
If we ever get around to allowing functional dependency to be proven
from other things besides simple primary keys, this code will need to
be rethought, but that was true anyway.  In the meantime, we might as
well not have two very-similar routines for scanning pg_constraint.

David Rowley, reviewed by Julien Rouhaud

src/backend/catalog/pg_constraint.c

index 74007856f35c50be8f2e9c231bc15067844d7c55..8fabe6899f65796e9c4495c6d27dd0ffdc93f7be 100644 (file)
@@ -986,93 +986,35 @@ check_functional_grouping(Oid relid,
                                                  List *grouping_columns,
                                                  List **constraintDeps)
 {
-       bool            result = false;
-       Relation        pg_constraint;
-       HeapTuple       tuple;
-       SysScanDesc scan;
-       ScanKeyData skey[1];
-
-       /* Scan pg_constraint for constraints of the target rel */
-       pg_constraint = heap_open(ConstraintRelationId, AccessShareLock);
-
-       ScanKeyInit(&skey[0],
-                               Anum_pg_constraint_conrelid,
-                               BTEqualStrategyNumber, F_OIDEQ,
-                               ObjectIdGetDatum(relid));
-
-       scan = systable_beginscan(pg_constraint, ConstraintRelidIndexId, true,
-                                                         NULL, 1, skey);
-
-       while (HeapTupleIsValid(tuple = systable_getnext(scan)))
+       Bitmapset  *pkattnos;
+       Bitmapset  *groupbyattnos;
+       Oid                     constraintOid;
+       ListCell   *gl;
+
+       /* If the rel has no PK, then we can't prove functional dependency */
+       pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
+       if (pkattnos == NULL)
+               return false;
+
+       /* Identify all the rel's columns that appear in grouping_columns */
+       groupbyattnos = NULL;
+       foreach(gl, grouping_columns)
        {
-               Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
-               Datum           adatum;
-               bool            isNull;
-               ArrayType  *arr;
-               int16      *attnums;
-               int                     numkeys;
-               int                     i;
-               bool            found_col;
+               Var                *gvar = (Var *) lfirst(gl);
 
-               /* Only PK constraints are of interest for now, see comment above */
-               if (con->contype != CONSTRAINT_PRIMARY)
-                       continue;
-               /* Constraint must be non-deferrable */
-               if (con->condeferrable)
-                       continue;
-
-               /* Extract the conkey array, ie, attnums of PK's columns */
-               adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
-                                                         RelationGetDescr(pg_constraint), &isNull);
-               if (isNull)
-                       elog(ERROR, "null conkey for constraint %u",
-                                HeapTupleGetOid(tuple));
-               arr = DatumGetArrayTypeP(adatum);               /* ensure not toasted */
-               numkeys = ARR_DIMS(arr)[0];
-               if (ARR_NDIM(arr) != 1 ||
-                       numkeys < 0 ||
-                       ARR_HASNULL(arr) ||
-                       ARR_ELEMTYPE(arr) != INT2OID)
-                       elog(ERROR, "conkey is not a 1-D smallint array");
-               attnums = (int16 *) ARR_DATA_PTR(arr);
-
-               found_col = false;
-               for (i = 0; i < numkeys; i++)
-               {
-                       AttrNumber      attnum = attnums[i];
-                       ListCell   *gl;
-
-                       found_col = false;
-                       foreach(gl, grouping_columns)
-                       {
-                               Var                *gvar = (Var *) lfirst(gl);
-
-                               if (IsA(gvar, Var) &&
-                                       gvar->varno == varno &&
-                                       gvar->varlevelsup == varlevelsup &&
-                                       gvar->varattno == attnum)
-                               {
-                                       found_col = true;
-                                       break;
-                               }
-                       }
-                       if (!found_col)
-                               break;
-               }
-
-               if (found_col)
-               {
-                       /* The PK is a subset of grouping_columns, so we win */
-                       *constraintDeps = lappend_oid(*constraintDeps,
-                                                                                 HeapTupleGetOid(tuple));
-                       result = true;
-                       break;
-               }
+               if (IsA(gvar, Var) &&
+                       gvar->varno == varno &&
+                       gvar->varlevelsup == varlevelsup)
+                       groupbyattnos = bms_add_member(groupbyattnos,
+                                               gvar->varattno - FirstLowInvalidHeapAttributeNumber);
        }
 
-       systable_endscan(scan);
-
-       heap_close(pg_constraint, AccessShareLock);
+       if (bms_is_subset(pkattnos, groupbyattnos))
+       {
+               /* The PK is a subset of grouping_columns, so we win */
+               *constraintDeps = lappend_oid(*constraintDeps, constraintOid);
+               return true;
+       }
 
-       return result;
+       return false;
 }