]> granicus.if.org Git - postgresql/commitdiff
Fix reporting of constraint violations for table partitioning.
authorRobert Haas <rhaas@postgresql.org>
Wed, 4 Jan 2017 19:36:34 +0000 (14:36 -0500)
committerRobert Haas <rhaas@postgresql.org>
Wed, 4 Jan 2017 19:36:34 +0000 (14:36 -0500)
After a tuple is routed to a partition, it has been converted from the
root table's row type to the partition's row type.  ExecConstraints
needs to report the failure using the original tuple and the parent's
tuple descriptor rather than the ones for the selected partition.

Amit Langote

src/backend/commands/copy.c
src/backend/commands/tablecmds.c
src/backend/executor/execMain.c
src/backend/executor/nodeModifyTable.c
src/include/executor/executor.h
src/include/nodes/execnodes.h
src/test/regress/expected/insert.out
src/test/regress/sql/insert.sql

index 071b97dee132a0356ed98c710ea248ab2fb138a1..f56b2ac49b0c5c3fd4b7dccb7a0c4da33c2ec58c 100644 (file)
@@ -2426,6 +2426,7 @@ CopyFrom(CopyState cstate)
                                          cstate->rel,
                                          1,            /* dummy rangetable index */
                                          true,         /* do load partition check expression */
+                                         NULL,
                                          0);
 
        ExecOpenIndices(resultRelInfo, false);
@@ -2491,7 +2492,7 @@ CopyFrom(CopyState cstate)
        for (;;)
        {
                TupleTableSlot *slot,
-                                          *oldslot = NULL;
+                                          *oldslot;
                bool            skip_tuple;
                Oid                     loaded_oid = InvalidOid;
 
@@ -2533,6 +2534,7 @@ CopyFrom(CopyState cstate)
                ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 
                /* Determine the partition to heap_insert the tuple into */
+               oldslot = slot;
                if (cstate->partition_dispatch_info)
                {
                        int                     leaf_part_index;
@@ -2587,7 +2589,6 @@ CopyFrom(CopyState cstate)
                                 * point on.  Use a dedicated slot from this point on until
                                 * we're finished dealing with the partition.
                                 */
-                               oldslot = slot;
                                slot = cstate->partition_tuple_slot;
                                Assert(slot != NULL);
                                ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
@@ -2624,7 +2625,7 @@ CopyFrom(CopyState cstate)
                                /* Check the constraints of the tuple */
                                if (cstate->rel->rd_att->constr ||
                                        resultRelInfo->ri_PartitionCheck)
-                                       ExecConstraints(resultRelInfo, slot, estate);
+                                       ExecConstraints(resultRelInfo, slot, oldslot, estate);
 
                                if (useHeapMultiInsert)
                                {
@@ -2686,10 +2687,6 @@ CopyFrom(CopyState cstate)
                        {
                                resultRelInfo = saved_resultRelInfo;
                                estate->es_result_relation_info = resultRelInfo;
-
-                               /* Switch back to the slot corresponding to the root table */
-                               Assert(oldslot != NULL);
-                               slot = oldslot;
                        }
                }
        }
index 25620f5a7ea4fd256775c6554bfb2f05c4b90f96..7a5a574bbe2184d8e395451c12fa421f80cd4f33 100644 (file)
@@ -1324,6 +1324,7 @@ ExecuteTruncate(TruncateStmt *stmt)
                                                  rel,
                                                  0,    /* dummy rangetable index */
                                                  false,
+                                                 NULL,
                                                  0);
                resultRelInfo++;
        }
index eb9b528c4e4800df885c185f1d2bd4371f1137cf..d9ff5fa28bc80ac3919aa10e15b532d384b7c73d 100644 (file)
@@ -828,6 +828,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
                                                          resultRelation,
                                                          resultRelationIndex,
                                                          true,
+                                                         NULL,
                                                          estate->es_instrument);
                        resultRelInfo++;
                }
@@ -1218,6 +1219,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
                                  Relation resultRelationDesc,
                                  Index resultRelationIndex,
                                  bool load_partition_check,
+                                 Relation partition_root,
                                  int instrument_options)
 {
        MemSet(resultRelInfo, 0, sizeof(ResultRelInfo));
@@ -1259,6 +1261,11 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
                resultRelInfo->ri_PartitionCheck =
                                                        RelationGetPartitionQual(resultRelationDesc,
                                                                                                         true);
+       /*
+        * The following gets set to NULL unless we are initializing leaf
+        * partitions for tuple-routing.
+        */
+       resultRelInfo->ri_PartitionRoot = partition_root;
 }
 
 /*
@@ -1322,6 +1329,7 @@ ExecGetTriggerResultRel(EState *estate, Oid relid)
                                          rel,
                                          0,            /* dummy rangetable index */
                                          true,
+                                         NULL,
                                          estate->es_instrument);
        estate->es_trig_target_relations =
                lappend(estate->es_trig_target_relations, rInfo);
@@ -1743,9 +1751,21 @@ ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
        return ExecQual(resultRelInfo->ri_PartitionCheckExpr, econtext, true);
 }
 
+/*
+ * ExecConstraints - check constraints of the tuple in 'slot'
+ *
+ * This checks the traditional NOT NULL and check constraints, as well as
+ * the partition constraint, if any.
+ *
+ * Note: 'slot' contains the tuple to check the constraints of, which may
+ * have been converted from the original input tuple after tuple routing,
+ * while 'orig_slot' contains the original tuple to be shown in the message,
+ * if an error occurs.
+ */
 void
 ExecConstraints(ResultRelInfo *resultRelInfo,
-                               TupleTableSlot *slot, EState *estate)
+                               TupleTableSlot *slot, TupleTableSlot *orig_slot,
+                               EState *estate)
 {
        Relation        rel = resultRelInfo->ri_RelationDesc;
        TupleDesc       tupdesc = RelationGetDescr(rel);
@@ -1767,12 +1787,24 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
                                slot_attisnull(slot, attrChk))
                        {
                                char       *val_desc;
+                               Relation        orig_rel = rel;
+                               TupleDesc       orig_tupdesc = tupdesc;
+
+                               /*
+                                * choose the correct relation to build val_desc from the
+                                * tuple contained in orig_slot
+                                */
+                               if (resultRelInfo->ri_PartitionRoot)
+                               {
+                                       rel = resultRelInfo->ri_PartitionRoot;
+                                       tupdesc = RelationGetDescr(rel);
+                               }
 
                                insertedCols = GetInsertedColumns(resultRelInfo, estate);
                                updatedCols = GetUpdatedColumns(resultRelInfo, estate);
                                modifiedCols = bms_union(insertedCols, updatedCols);
                                val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
-                                                                                                                slot,
+                                                                                                                orig_slot,
                                                                                                                 tupdesc,
                                                                                                                 modifiedCols,
                                                                                                                 64);
@@ -1780,9 +1812,9 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
                                ereport(ERROR,
                                                (errcode(ERRCODE_NOT_NULL_VIOLATION),
                                                 errmsg("null value in column \"%s\" violates not-null constraint",
-                                                         NameStr(tupdesc->attrs[attrChk - 1]->attname)),
+                                                 NameStr(orig_tupdesc->attrs[attrChk - 1]->attname)),
                                                 val_desc ? errdetail("Failing row contains %s.", val_desc) : 0,
-                                                errtablecol(rel, attrChk)));
+                                                errtablecol(orig_rel, attrChk)));
                        }
                }
        }
@@ -1794,21 +1826,29 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
                if ((failed = ExecRelCheck(resultRelInfo, slot, estate)) != NULL)
                {
                        char       *val_desc;
+                       Relation        orig_rel = rel;
+
+                       /* See the comment above. */
+                       if (resultRelInfo->ri_PartitionRoot)
+                       {
+                               rel = resultRelInfo->ri_PartitionRoot;
+                               tupdesc = RelationGetDescr(rel);
+                       }
 
                        insertedCols = GetInsertedColumns(resultRelInfo, estate);
                        updatedCols = GetUpdatedColumns(resultRelInfo, estate);
                        modifiedCols = bms_union(insertedCols, updatedCols);
                        val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
-                                                                                                        slot,
+                                                                                                        orig_slot,
                                                                                                         tupdesc,
                                                                                                         modifiedCols,
                                                                                                         64);
                        ereport(ERROR,
                                        (errcode(ERRCODE_CHECK_VIOLATION),
                                         errmsg("new row for relation \"%s\" violates check constraint \"%s\"",
-                                                       RelationGetRelationName(rel), failed),
+                                                       RelationGetRelationName(orig_rel), failed),
                          val_desc ? errdetail("Failing row contains %s.", val_desc) : 0,
-                                        errtableconstraint(rel, failed)));
+                                        errtableconstraint(orig_rel, failed)));
                }
        }
 
@@ -1816,19 +1856,27 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
                !ExecPartitionCheck(resultRelInfo, slot, estate))
        {
                char       *val_desc;
+               Relation        orig_rel = rel;
+
+               /* See the comment above. */
+               if (resultRelInfo->ri_PartitionRoot)
+               {
+                       rel = resultRelInfo->ri_PartitionRoot;
+                       tupdesc = RelationGetDescr(rel);
+               }
 
                insertedCols = GetInsertedColumns(resultRelInfo, estate);
                updatedCols = GetUpdatedColumns(resultRelInfo, estate);
                modifiedCols = bms_union(insertedCols, updatedCols);
                val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
-                                                                                                slot,
+                                                                                                orig_slot,
                                                                                                 tupdesc,
                                                                                                 modifiedCols,
                                                                                                 64);
                ereport(ERROR,
                                (errcode(ERRCODE_CHECK_VIOLATION),
                                 errmsg("new row for relation \"%s\" violates partition constraint",
-                                               RelationGetRelationName(rel)),
+                                               RelationGetRelationName(orig_rel)),
                  val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
        }
 }
@@ -3086,6 +3134,7 @@ ExecSetupPartitionTupleRouting(Relation rel,
                                                  partrel,
                                                  1,     /* dummy */
                                                  false,
+                                                 rel,
                                                  0);
 
                /*
index aa364707f8d321550eff2d182b349161370095dc..4692427e600319793da9afa5291180c28787e998 100644 (file)
@@ -262,7 +262,7 @@ ExecInsert(ModifyTableState *mtstate,
        Relation        resultRelationDesc;
        Oid                     newId;
        List       *recheckIndexes = NIL;
-       TupleTableSlot *oldslot = NULL;
+       TupleTableSlot *oldslot = slot;
 
        /*
         * get the heap tuple out of the tuple table slot, making sure we have a
@@ -328,7 +328,6 @@ ExecInsert(ModifyTableState *mtstate,
                         * point on, until we're finished dealing with the partition.
                         * Use the dedicated slot for that.
                         */
-                       oldslot = slot;
                        slot = mtstate->mt_partition_tuple_slot;
                        Assert(slot != NULL);
                        ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
@@ -434,7 +433,7 @@ ExecInsert(ModifyTableState *mtstate,
                 * Check the constraints of the tuple
                 */
                if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck)
-                       ExecConstraints(resultRelInfo, slot, estate);
+                       ExecConstraints(resultRelInfo, slot, oldslot, estate);
 
                if (onconflict != ONCONFLICT_NONE && resultRelInfo->ri_NumIndices > 0)
                {
@@ -579,10 +578,6 @@ ExecInsert(ModifyTableState *mtstate,
        {
                resultRelInfo = saved_resultRelInfo;
                estate->es_result_relation_info = resultRelInfo;
-
-               /* Switch back to the slot corresponding to the root table */
-               Assert(oldslot != NULL);
-               slot = oldslot;
        }
 
        /*
@@ -994,10 +989,12 @@ lreplace:;
                                                                 resultRelInfo, slot, estate);
 
                /*
-                * Check the constraints of the tuple
+                * Check the constraints of the tuple.  Note that we pass the same
+                * slot for the orig_slot argument, because unlike ExecInsert(), no
+                * tuple-routing is performed here, hence the slot remains unchanged.
                 */
                if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck)
-                       ExecConstraints(resultRelInfo, slot, estate);
+                       ExecConstraints(resultRelInfo, slot, slot, estate);
 
                /*
                 * replace the heap tuple
index 4a7074edd70e9ac00108ec73f4870180ea18a5ed..b9c7f729030b1e3bdf5043a264d1dfb0e2b6b81f 100644 (file)
@@ -190,11 +190,13 @@ extern void InitResultRelInfo(ResultRelInfo *resultRelInfo,
                                  Relation resultRelationDesc,
                                  Index resultRelationIndex,
                                  bool load_partition_check,
+                                 Relation partition_root,
                                  int instrument_options);
 extern ResultRelInfo *ExecGetTriggerResultRel(EState *estate, Oid relid);
 extern bool ExecContextForcesOids(PlanState *planstate, bool *hasoids);
 extern void ExecConstraints(ResultRelInfo *resultRelInfo,
-                               TupleTableSlot *slot, EState *estate);
+                               TupleTableSlot *slot, TupleTableSlot *orig_slot,
+                               EState *estate);
 extern void ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
                                         TupleTableSlot *slot, EState *estate);
 extern LockTupleMode ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo);
index 0e05d472c90728e285429a566f249803855677fc..ce13bf76355805ea741ac9fdb890ae7de7569491 100644 (file)
@@ -349,6 +349,7 @@ typedef struct ResultRelInfo
        List       *ri_onConflictSetWhere;
        List       *ri_PartitionCheck;
        List       *ri_PartitionCheckExpr;
+       Relation        ri_PartitionRoot;
 } ResultRelInfo;
 
 /* ----------------
index 383d884847131d58d5012a05aa27838d8ed47555..ca3134c34c57bf577764e73a78d7784ce5291cfd 100644 (file)
@@ -333,5 +333,12 @@ select tableoid::regclass, * from p;
  p11      | 1 | 2
 (1 row)
 
+truncate p;
+alter table p add constraint check_b check (b = 3);
+-- check that correct input row is shown when constraint check_b fails on p11
+-- after "(1, 2)" is routed to it
+insert into p values (1, 2);
+ERROR:  new row for relation "p11" violates check constraint "check_b"
+DETAIL:  Failing row contains (1, 2).
 -- cleanup
 drop table p, p1, p11;
index 687bd4b50411ee339c8d8b6f973bea1bc56cdd1b..09c9879da12131ef69712d4ae271b42075f69e88 100644 (file)
@@ -195,5 +195,11 @@ alter table p attach partition p1 for values from (1, 2) to (1, 10);
 insert into p values (1, 2);
 select tableoid::regclass, * from p;
 
+truncate p;
+alter table p add constraint check_b check (b = 3);
+-- check that correct input row is shown when constraint check_b fails on p11
+-- after "(1, 2)" is routed to it
+insert into p values (1, 2);
+
 -- cleanup
 drop table p, p1, p11;