]> granicus.if.org Git - postgresql/commitdiff
During ALTER TABLE ADD FOREIGN KEY, try to check the existing rows using
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 6 Oct 2003 16:38:28 +0000 (16:38 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 6 Oct 2003 16:38:28 +0000 (16:38 +0000)
a single LEFT JOIN query instead of firing the check trigger for each
row individually.  Stephan Szabo, with some kibitzing from Tom Lane and
Jan Wieck.

src/backend/commands/tablecmds.c
src/backend/utils/adt/ri_triggers.c
src/include/commands/trigger.h

index 3ece4f80f9586f275a95dc7f6aec9956d02bab31..395077081a7f595385ab311b23af33b664511f18 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.85 2003/10/02 06:36:37 petere Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.86 2003/10/06 16:38:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -3454,6 +3454,13 @@ validateForeignKeyConstraint(FkConstraint *fkconstraint,
        List       *list;
        int                     count;
 
+       /*
+        * See if we can do it with a single LEFT JOIN query.  A FALSE result
+        * indicates we must proceed with the fire-the-trigger method.
+        */
+       if (RI_Initial_Check(fkconstraint, rel, pkrel))
+               return;
+
        /*
         * Scan through each tuple, calling RI_FKey_check_ins (insert trigger)
         * as if that tuple had just been inserted.  If any of those fail, it
index 11b7e84df03767bcc12556163bb3c5d99312a67e..306dda0aa4ebde4e4e0dd10929d2793ea81df8c2 100644 (file)
@@ -17,7 +17,7 @@
  *
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  *
- * $Header: /cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v 1.61 2003/10/01 21:30:52 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v 1.62 2003/10/06 16:38:28 tgl Exp $
  *
  * ----------
  */
@@ -40,6 +40,7 @@
 #include "rewrite/rewriteHandler.h"
 #include "utils/lsyscache.h"
 #include "utils/typcache.h"
+#include "utils/acl.h"
 #include "miscadmin.h"
 
 
@@ -164,7 +165,8 @@ static void ri_ExtractValues(RI_QueryKey *qkey, int key_idx,
                                 Datum *vals, char *nulls);
 static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                                   Relation pk_rel, Relation fk_rel,
-                                  HeapTuple violator, bool spi_err);
+                                  HeapTuple violator, TupleDesc tupdesc,
+                                  bool spi_err);
 
 
 /* ----------
@@ -2540,7 +2542,205 @@ RI_FKey_keyequal_upd(TriggerData *trigdata)
 }
 
 
+/* ----------
+ * RI_Initial_Check -
+ *
+ *     Check an entire table for non-matching values using a single query.
+ *     This is not a trigger procedure, but is called during ALTER TABLE
+ *     ADD FOREIGN KEY to validate the initial table contents.
+ *
+ *     We expect that an exclusive lock has been taken on rel and pkrel;
+ *     hence, we do not need to lock individual rows for the check.
+ *
+ *     If the check fails because the current user doesn't have permissions
+ *     to read both tables, return false to let our caller know that they will
+ *     need to do something else to check the constraint.
+ * ----------
+ */
+bool
+RI_Initial_Check(FkConstraint *fkconstraint, Relation rel, Relation pkrel)
+{
+       const char *constrname = fkconstraint->constr_name;
+       char            querystr[MAX_QUOTED_REL_NAME_LEN * 2 + 250 +
+                                               (MAX_QUOTED_NAME_LEN + 32) * ((RI_MAX_NUMKEYS * 4)+1)];
+       char            pkrelname[MAX_QUOTED_REL_NAME_LEN];
+       char            relname[MAX_QUOTED_REL_NAME_LEN];
+       char            attname[MAX_QUOTED_NAME_LEN];
+       char            fkattname[MAX_QUOTED_NAME_LEN];
+       const char *sep;
+       List            *list;
+       List            *list2;
+       int                     spi_result;
+       void            *qplan;
+
+       /*
+        * Check to make sure current user has enough permissions to do the
+        * test query.  (If not, caller can fall back to the trigger method,
+        * which works because it changes user IDs on the fly.)
+        *
+        * XXX are there any other show-stopper conditions to check?
+        */
+       if (pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_SELECT) != ACLCHECK_OK)
+               return false;
+       if (pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(), ACL_SELECT) != ACLCHECK_OK) 
+               return false;
+
+       /*----------
+        * The query string built is:
+        *  SELECT fk.keycols FROM ONLY relname fk 
+        *   LEFT OUTER JOIN ONLY pkrelname pk 
+        *   ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+        *   WHERE pk.pkkeycol1 IS NULL AND
+        * For MATCH unspecified:
+        *   (fk.keycol1 IS NOT NULL [AND ...])
+        * For MATCH FULL:
+        *   (fk.keycol1 IS NOT NULL [OR ...])
+        *----------
+        */
+
+       sprintf(querystr, "SELECT ");
+       sep="";
+       foreach(list, fkconstraint->fk_attrs)
+       {
+               quoteOneName(attname, strVal(lfirst(list)));
+               snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                                "%sfk.%s", sep, attname);
+               sep = ", ";
+       }
+
+       quoteRelationName(pkrelname, pkrel);
+       quoteRelationName(relname, rel);
+       snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                        " FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON (",
+                        relname, pkrelname);
+
+       sep="";
+       for (list=fkconstraint->pk_attrs, list2=fkconstraint->fk_attrs; 
+                list != NIL && list2 != NIL; 
+                list=lnext(list), list2=lnext(list2))
+       {
+               quoteOneName(attname, strVal(lfirst(list)));
+               quoteOneName(fkattname, strVal(lfirst(list2)));
+               snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                                "%spk.%s=fk.%s",
+                                sep, attname, fkattname);
+               sep = " AND ";
+       }
+       /*
+        * It's sufficient to test any one pk attribute for null to detect a
+        * join failure.
+        */
+       quoteOneName(attname, strVal(lfirst(fkconstraint->pk_attrs)));
+       snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                        ") WHERE pk.%s IS NULL AND (", attname);
+
+       sep="";
+       foreach(list, fkconstraint->fk_attrs)
+       {
+               quoteOneName(attname, strVal(lfirst(list)));
+               snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                                "%sfk.%s IS NOT NULL",
+                                sep, attname);
+               switch (fkconstraint->fk_matchtype)
+               {
+                       case FKCONSTR_MATCH_UNSPECIFIED:
+                               sep=" AND ";
+                               break;
+                       case FKCONSTR_MATCH_FULL:
+                               sep=" OR ";
+                               break;
+                       case FKCONSTR_MATCH_PARTIAL:
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("MATCH PARTIAL not yet implemented")));
+                               break;
+                       default:
+                               elog(ERROR, "unrecognized match type: %d",
+                                        fkconstraint->fk_matchtype);
+                               break;
+               }
+       }
+       snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                        ")");
+
+       if (SPI_connect() != SPI_OK_CONNECT)
+               elog(ERROR, "SPI_connect failed");
 
+       /*
+        * Generate the plan.  We don't need to cache it, and there are no
+        * arguments to the plan. 
+        */
+       qplan = SPI_prepare(querystr, 0, NULL);
+
+       if (qplan == NULL)
+               elog(ERROR, "SPI_prepare returned %d for %s", SPI_result, querystr);
+
+       /*
+        * Run the plan.  For safety we force a current query snapshot to be
+        * used.  (In serializable mode, this arguably violates serializability,
+        * but we really haven't got much choice.)  We need at most one tuple
+        * returned, so pass limit = 1.
+        */
+       spi_result = SPI_execp_current(qplan, NULL, NULL, true, 1);
+
+       /* Check result */
+       if (spi_result != SPI_OK_SELECT)
+               elog(ERROR, "SPI_execp_current returned %d", spi_result);
+
+       /* Did we find a tuple violating the constraint? */
+       if (SPI_processed > 0)
+       {
+               HeapTuple       tuple = SPI_tuptable->vals[0];
+               TupleDesc       tupdesc = SPI_tuptable->tupdesc;
+               int                     nkeys = length(fkconstraint->fk_attrs);
+               int                     i;
+               RI_QueryKey     qkey;
+
+               /*
+                * If it's MATCH FULL, and there are any nulls in the FK keys,
+                * complain about that rather than the lack of a match.  MATCH FULL
+                * disallows partially-null FK rows.
+                */
+               if (fkconstraint->fk_matchtype == FKCONSTR_MATCH_FULL)
+               {
+                       bool    isnull = false;
+
+                       for (i = 1; i <= nkeys; i++)
+                       {
+                               (void) SPI_getbinval(tuple, tupdesc, i, &isnull);
+                               if (isnull)
+                                       break;
+                       }
+                       if (isnull)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+                                                errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
+                                                               RelationGetRelationName(rel),
+                                                               constrname),
+                                                errdetail("MATCH FULL does not allow mixing of null and nonnull key values.")));
+               }
+
+               /*
+                * Although we didn't cache the query, we need to set up a fake
+                * query key to pass to ri_ReportViolation.
+                */
+               MemSet(&qkey, 0, sizeof(qkey));
+               qkey.constr_queryno = RI_PLAN_CHECK_LOOKUPPK;
+               qkey.nkeypairs = nkeys;
+               for (i = 0; i < nkeys; i++)
+                       qkey.keypair[i][RI_KEYPAIR_FK_IDX] = i + 1;
+
+               ri_ReportViolation(&qkey, constrname,
+                                                  pkrel, rel,
+                                                  tuple, tupdesc,
+                                                  false);
+       }
+
+       if (SPI_finish() != SPI_OK_FINISH)
+               elog(ERROR, "SPI_finish failed");
+
+       return true;
+}
 
 
 /* ----------
@@ -2782,6 +2982,9 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
        /* Create the plan */
        qplan = SPI_prepare(querystr, nargs, argtypes);
 
+       if (qplan == NULL)
+               elog(ERROR, "SPI_prepare returned %d for %s", SPI_result, querystr);
+
        /* Restore UID */
        SetUserId(save_uid);
 
@@ -2905,6 +3108,7 @@ ri_PerformCheck(RI_QueryKey *qkey, void *qplan,
                ri_ReportViolation(qkey, constrname ? constrname : "",
                                                   pk_rel, fk_rel,
                                                   new_tuple ? new_tuple : old_tuple,
+                                                  NULL,
                                                   true);
 
        /* XXX wouldn't it be clearer to do this part at the caller? */
@@ -2913,6 +3117,7 @@ ri_PerformCheck(RI_QueryKey *qkey, void *qplan,
                ri_ReportViolation(qkey, constrname,
                                                   pk_rel, fk_rel,
                                                   new_tuple ? new_tuple : old_tuple,
+                                                  NULL,
                                                   false);
 
        return SPI_processed != 0;
@@ -2950,7 +3155,8 @@ ri_ExtractValues(RI_QueryKey *qkey, int key_idx,
 static void
 ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                                   Relation pk_rel, Relation fk_rel,
-                                  HeapTuple violator, bool spi_err)
+                                  HeapTuple violator, TupleDesc tupdesc,
+                                  bool spi_err)
 {
 #define BUFLENGTH      512
        char            key_names[BUFLENGTH];
@@ -2958,7 +3164,6 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
        char       *name_ptr = key_names;
        char       *val_ptr = key_values;
        bool            onfk;
-       Relation        rel;
        int                     idx,
                                key_idx;
 
@@ -2972,18 +3177,21 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                                 errhint("This is most likely due to a rule having rewritten the query.")));
 
        /*
-        * rel is set to where the tuple description is coming from.
+        * Determine which relation to complain about.  If tupdesc wasn't
+        * passed by caller, assume the violator tuple came from there.
         */
        onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK);
        if (onfk)
        {
-               rel = fk_rel;
                key_idx = RI_KEYPAIR_FK_IDX;
+               if (tupdesc == NULL)
+                       tupdesc = fk_rel->rd_att;
        }
        else
        {
-               rel = pk_rel;
                key_idx = RI_KEYPAIR_PK_IDX;
+               if (tupdesc == NULL)
+                       tupdesc = pk_rel->rd_att;
        }
 
        /*
@@ -3008,8 +3216,8 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                char       *name,
                                   *val;
 
-               name = SPI_fname(rel->rd_att, fnum);
-               val = SPI_getvalue(violator, rel->rd_att, fnum);
+               name = SPI_fname(tupdesc, fnum);
+               val = SPI_getvalue(violator, tupdesc, fnum);
                if (!val)
                        val = "null";
 
index d3aa9e385af935cc6cebdba53e154c70be1fde44..fe8f6af61d9fef5d5b9d12e4a1234e083eebdb38 100644 (file)
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: trigger.h,v 1.43 2003/08/04 02:40:13 momjian Exp $
+ * $Id: trigger.h,v 1.44 2003/10/06 16:38:28 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -197,5 +197,8 @@ extern void DeferredTriggerSetState(ConstraintsSetStmt *stmt);
  * in utils/adt/ri_triggers.c
  */
 extern bool RI_FKey_keyequal_upd(TriggerData *trigdata);
+extern bool RI_Initial_Check(FkConstraint *fkconstraint, 
+                                                        Relation rel, 
+                                                        Relation pkrel);
 
 #endif   /* TRIGGER_H */