]> granicus.if.org Git - postgresql/commitdiff
Prevent BEFORE triggers from violating partitioning constraints.
authorRobert Haas <rhaas@postgresql.org>
Wed, 7 Jun 2017 16:45:32 +0000 (12:45 -0400)
committerRobert Haas <rhaas@postgresql.org>
Wed, 7 Jun 2017 16:50:45 +0000 (12:50 -0400)
Since tuple-routing implicitly checks the partitioning constraints
at least for the levels of the partitioning hierarchy it traverses,
there's normally no need to revalidate the partitioning constraint
after performing tuple routing.  However, if there's a BEFORE trigger
on the target partition, it could modify the tuple, causing the
partitioning constraint to be violated.  Catch that case.

Also, instead of checking the root table's partition constraint after
tuple-routing, check it beforehand.  Otherwise, the rules for when
the partitioning constraint gets checked get too complicated, because
you sometimes have to check part of the constraint but not all of it.
This effectively reverts commit 39162b2030fb0a35a6bb28dc636b5a71b8df8d1c
in favor of a different approach altogether.

Report by me.  Initial debugging by Jeevan Ladhe.  Patch by Amit
Langote, reviewed by me.

Discussion: http://postgr.es/m/CA+Tgmoa9DTgeVOqopieV8d1QRpddmP65aCdxyjdYDoEO5pS5KA@mail.gmail.com

src/backend/commands/copy.c
src/backend/executor/execMain.c
src/backend/executor/nodeModifyTable.c
src/test/regress/expected/insert.out
src/test/regress/sql/insert.sql

index 810bae5dadcf10427537f403fd6b3571d33065ba..0a33c40c17f4041988826257bdf9ee5285d82882 100644 (file)
@@ -2640,9 +2640,24 @@ CopyFrom(CopyState cstate)
                        }
                        else
                        {
+                               /*
+                                * We always check the partition constraint, including when
+                                * the tuple got here via tuple-routing.  However we don't
+                                * need to in the latter case if no BR trigger is defined on
+                                * the partition.  Note that a BR trigger might modify the
+                                * tuple such that the partition constraint is no longer
+                                * satisfied, so we need to check in that case.
+                                */
+                               bool    check_partition_constr =
+                                                                       (resultRelInfo->ri_PartitionCheck != NIL);
+
+                               if (saved_resultRelInfo != NULL &&
+                                       !(resultRelInfo->ri_TrigDesc &&
+                                         resultRelInfo->ri_TrigDesc->trig_insert_before_row))
+                                       check_partition_constr = false;
+
                                /* Check the constraints of the tuple */
-                               if (cstate->rel->rd_att->constr ||
-                                       resultRelInfo->ri_PartitionCheck)
+                               if (cstate->rel->rd_att->constr || check_partition_constr)
                                        ExecConstraints(resultRelInfo, slot, estate);
 
                                if (useHeapMultiInsert)
index 4a899f1eb567c74d2e8b73b2912b9b31b3c154d7..3caeeac708c56d3ccdbfcc0e305e3e9d9ea4e9f7 100644 (file)
@@ -103,6 +103,8 @@ static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
                                                                         int maxfieldlen);
 static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
                                  Plan *planTree);
+static void ExecPartitionCheck(ResultRelInfo *resultRelInfo,
+                                 TupleTableSlot *slot, EState *estate);
 
 /*
  * Note that GetUpdatedColumns() also exists in commands/trigger.c.  There does
@@ -1339,34 +1341,19 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
        resultRelInfo->ri_projectReturning = NULL;
 
        /*
-        * If partition_root has been specified, that means we are building the
-        * ResultRelInfo for one of its leaf partitions.  In that case, we need
-        * *not* initialize the leaf partition's constraint, but rather the
-        * partition_root's (if any).  We must do that explicitly like this,
-        * because implicit partition constraints are not inherited like user-
-        * defined constraints and would fail to be enforced by ExecConstraints()
-        * after a tuple is routed to a leaf partition.
+        * Partition constraint, which also includes the partition constraint of
+        * all the ancestors that are partitions.  Note that it will be checked
+        * even in the case of tuple-routing where this table is the target leaf
+        * partition, if there any BR triggers defined on the table.  Although
+        * tuple-routing implicitly preserves the partition constraint of the
+        * target partition for a given row, the BR triggers may change the row
+        * such that the constraint is no longer satisfied, which we must fail
+        * for by checking it explicitly.
+        *
+        * If this is a partitioned table, the partition constraint (if any) of a
+        * given row will be checked just before performing tuple-routing.
         */
-       if (partition_root)
-       {
-               /*
-                * Root table itself may or may not be a partition; partition_check
-                * would be NIL in the latter case.
-                */
-               partition_check = RelationGetPartitionQual(partition_root);
-
-               /*
-                * This is not our own partition constraint, but rather an ancestor's.
-                * So any Vars in it bear the ancestor's attribute numbers.  We must
-                * switch them to our own. (dummy varno = 1)
-                */
-               if (partition_check != NIL)
-                       partition_check = map_partition_varattnos(partition_check, 1,
-                                                                                                         resultRelationDesc,
-                                                                                                         partition_root);
-       }
-       else
-               partition_check = RelationGetPartitionQual(resultRelationDesc);
+       partition_check = RelationGetPartitionQual(resultRelationDesc);
 
        resultRelInfo->ri_PartitionCheck = partition_check;
        resultRelInfo->ri_PartitionRoot = partition_root;
@@ -1835,13 +1822,16 @@ ExecRelCheck(ResultRelInfo *resultRelInfo,
 
 /*
  * ExecPartitionCheck --- check that tuple meets the partition constraint.
- *
- * Note: This is called *iff* resultRelInfo is the main target table.
  */
-static bool
+static void
 ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
                                   EState *estate)
 {
+       Relation        rel = resultRelInfo->ri_RelationDesc;
+       TupleDesc       tupdesc = RelationGetDescr(rel);
+       Bitmapset  *modifiedCols;
+       Bitmapset  *insertedCols;
+       Bitmapset  *updatedCols;
        ExprContext *econtext;
 
        /*
@@ -1869,7 +1859,44 @@ ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
         * As in case of the catalogued constraints, we treat a NULL result as
         * success here, not a failure.
         */
-       return ExecCheck(resultRelInfo->ri_PartitionCheckExpr, econtext);
+       if (!ExecCheck(resultRelInfo->ri_PartitionCheckExpr, econtext))
+       {
+               char       *val_desc;
+               Relation        orig_rel = rel;
+
+               /* See the comment above. */
+               if (resultRelInfo->ri_PartitionRoot)
+               {
+                       HeapTuple       tuple = ExecFetchSlotTuple(slot);
+                       TupleDesc       old_tupdesc = RelationGetDescr(rel);
+                       TupleConversionMap *map;
+
+                       rel = resultRelInfo->ri_PartitionRoot;
+                       tupdesc = RelationGetDescr(rel);
+                       /* a reverse map */
+                       map = convert_tuples_by_name(old_tupdesc, tupdesc,
+                                                                gettext_noop("could not convert row type"));
+                       if (map != NULL)
+                       {
+                               tuple = do_convert_tuple(tuple, map);
+                               ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+                       }
+               }
+
+               insertedCols = GetInsertedColumns(resultRelInfo, estate);
+               updatedCols = GetUpdatedColumns(resultRelInfo, estate);
+               modifiedCols = bms_union(insertedCols, updatedCols);
+               val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
+                                                                                                slot,
+                                                                                                tupdesc,
+                                                                                                modifiedCols,
+                                                                                                64);
+               ereport(ERROR,
+                               (errcode(ERRCODE_CHECK_VIOLATION),
+                 errmsg("new row for relation \"%s\" violates partition constraint",
+                                RelationGetRelationName(orig_rel)),
+                       val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
+       }
 }
 
 /*
@@ -1997,47 +2024,11 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
                }
        }
 
-       if (resultRelInfo->ri_PartitionCheck &&
-               !ExecPartitionCheck(resultRelInfo, slot, estate))
-       {
-               char       *val_desc;
-               Relation        orig_rel = rel;
-
-               /* See the comment above. */
-               if (resultRelInfo->ri_PartitionRoot)
-               {
-                       HeapTuple       tuple = ExecFetchSlotTuple(slot);
-                       TupleDesc       old_tupdesc = RelationGetDescr(rel);
-                       TupleConversionMap *map;
-
-                       rel = resultRelInfo->ri_PartitionRoot;
-                       tupdesc = RelationGetDescr(rel);
-                       /* a reverse map */
-                       map = convert_tuples_by_name(old_tupdesc, tupdesc,
-                                                                gettext_noop("could not convert row type"));
-                       if (map != NULL)
-                       {
-                               tuple = do_convert_tuple(tuple, map);
-                               ExecStoreTuple(tuple, slot, InvalidBuffer, false);
-                       }
-               }
-
-               insertedCols = GetInsertedColumns(resultRelInfo, estate);
-               updatedCols = GetUpdatedColumns(resultRelInfo, estate);
-               modifiedCols = bms_union(insertedCols, updatedCols);
-               val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
-                                                                                                slot,
-                                                                                                tupdesc,
-                                                                                                modifiedCols,
-                                                                                                64);
-               ereport(ERROR,
-                               (errcode(ERRCODE_CHECK_VIOLATION),
-                 errmsg("new row for relation \"%s\" violates partition constraint",
-                                RelationGetRelationName(orig_rel)),
-                       val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
-       }
+       if (resultRelInfo->ri_PartitionCheck)
+               ExecPartitionCheck(resultRelInfo, slot, estate);
 }
 
+
 /*
  * ExecWithCheckOptions -- check that tuple satisfies any WITH CHECK OPTIONs
  * of the specified kind.
@@ -3317,6 +3308,13 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
        PartitionDispatchData *failed_at;
        TupleTableSlot *failed_slot;
 
+       /*
+        * First check the root table's partition constraint, if any.  No point in
+        * routing the tuple it if it doesn't belong in the root table itself.
+        */
+       if (resultRelInfo->ri_PartitionCheck)
+               ExecPartitionCheck(resultRelInfo, slot, estate);
+
        result = get_partition_for_tuple(pd, slot, estate,
                                                                         &failed_at, &failed_slot);
        if (result < 0)
index cf555fe78d91b38f39f8f38594074bf55c4b7d00..bf26488c5103358b785d55445dc8596fc40d955f 100644 (file)
@@ -414,6 +414,16 @@ ExecInsert(ModifyTableState *mtstate,
        }
        else
        {
+               /*
+                * We always check the partition constraint, including when the tuple
+                * got here via tuple-routing.  However we don't need to in the latter
+                * case if no BR trigger is defined on the partition.  Note that a BR
+                * trigger might modify the tuple such that the partition constraint
+                * is no longer satisfied, so we need to check in that case.
+                */
+               bool    check_partition_constr =
+                                                               (resultRelInfo->ri_PartitionCheck != NIL);
+
                /*
                 * Constraints might reference the tableoid column, so initialize
                 * t_tableOid before evaluating them.
@@ -431,9 +441,16 @@ ExecInsert(ModifyTableState *mtstate,
                                                                 resultRelInfo, slot, estate);
 
                /*
-                * Check the constraints of the tuple
+                * No need though if the tuple has been routed, and a BR trigger
+                * doesn't exist.
                 */
-               if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck)
+               if (saved_resultRelInfo != NULL &&
+                       !(resultRelInfo->ri_TrigDesc &&
+                         resultRelInfo->ri_TrigDesc->trig_insert_before_row))
+                       check_partition_constr = false;
+
+               /* Check the constraints of the tuple */
+               if (resultRelationDesc->rd_att->constr || check_partition_constr)
                        ExecConstraints(resultRelInfo, slot, estate);
 
                if (onconflict != ONCONFLICT_NONE && resultRelInfo->ri_NumIndices > 0)
index 8b0752a0d26b8cb46a6d9dace5a1c3681bd2f5c2..d1153f410baf6fce15da0aa642c9211f9f131765 100644 (file)
@@ -379,7 +379,7 @@ drop function mlparted11_trig_fn();
 -- checking its partition constraint before inserting into the leaf partition
 -- selected by tuple-routing
 insert into mlparted1 (a, b) values (2, 3);
-ERROR:  new row for relation "mlparted11" violates partition constraint
+ERROR:  new row for relation "mlparted1" violates partition constraint
 DETAIL:  Failing row contains (3, 2).
 -- check routing error through a list partitioned table when the key is null
 create table lparted_nonullpart (a int, b char) partition by list (b);
@@ -495,3 +495,16 @@ select tableoid::regclass::text, * from mcrparted order by 1;
 
 -- cleanup
 drop table mcrparted;
+-- check that a BR constraint can't make partition contain violating rows
+create table brtrigpartcon (a int, b text) partition by list (a);
+create table brtrigpartcon1 partition of brtrigpartcon for values in (1);
+create or replace function brtrigpartcon1trigf() returns trigger as $$begin new.a := 2; return new; end$$ language plpgsql;
+create trigger brtrigpartcon1trig before insert on brtrigpartcon1 for each row execute procedure brtrigpartcon1trigf();
+insert into brtrigpartcon values (1, 'hi there');
+ERROR:  new row for relation "brtrigpartcon1" violates partition constraint
+DETAIL:  Failing row contains (2, hi there).
+insert into brtrigpartcon1 values (1, 'hi there');
+ERROR:  new row for relation "brtrigpartcon1" violates partition constraint
+DETAIL:  Failing row contains (2, hi there).
+drop table brtrigpartcon;
+drop function brtrigpartcon1trigf();
index db8967bad7ddffc6480752cf25589b5235bef552..83c3ad8f534954185e1950c471b10f9fc7a5d882 100644 (file)
@@ -332,3 +332,13 @@ select tableoid::regclass::text, * from mcrparted order by 1;
 
 -- cleanup
 drop table mcrparted;
+
+-- check that a BR constraint can't make partition contain violating rows
+create table brtrigpartcon (a int, b text) partition by list (a);
+create table brtrigpartcon1 partition of brtrigpartcon for values in (1);
+create or replace function brtrigpartcon1trigf() returns trigger as $$begin new.a := 2; return new; end$$ language plpgsql;
+create trigger brtrigpartcon1trig before insert on brtrigpartcon1 for each row execute procedure brtrigpartcon1trigf();
+insert into brtrigpartcon values (1, 'hi there');
+insert into brtrigpartcon1 values (1, 'hi there');
+drop table brtrigpartcon;
+drop function brtrigpartcon1trigf();