]> granicus.if.org Git - postgresql/commitdiff
Refactor validation of new partitions a little bit.
authorRobert Haas <rhaas@postgresql.org>
Thu, 17 Aug 2017 18:49:45 +0000 (14:49 -0400)
committerRobert Haas <rhaas@postgresql.org>
Thu, 17 Aug 2017 18:49:45 +0000 (14:49 -0400)
Move some logic that is currently in ATExecAttachPartition to
separate functions to facilitate future code reuse.

Ashutosh Bapat and Jeevan Ladhe

Discussion: http://postgr.es/m/CA+Tgmobbnamyvii0pRdg9pp_jLHSUvq7u5SiRrVV0tEFFU58Tg@mail.gmail.com

src/backend/commands/tablecmds.c

index 513a9ec485759e16783c0f87edbfde3e5829719b..83cb46016410d96ae36b014950140ba832b370cf 100644 (file)
@@ -473,6 +473,11 @@ static void CreateInheritance(Relation child_rel, Relation parent_rel);
 static void RemoveInheritance(Relation child_rel, Relation parent_rel);
 static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel,
                                          PartitionCmd *cmd);
+static bool PartConstraintImpliedByRelConstraint(Relation scanrel,
+                                                                        List *partConstraint);
+static void ValidatePartitionConstraints(List **wqueue, Relation scanrel,
+                                                        List *scanrel_children,
+                                                        List *partConstraint);
 static ObjectAddress ATExecDetachPartition(Relation rel, RangeVar *name);
 
 
@@ -13424,6 +13429,169 @@ ComputePartitionAttrs(Relation rel, List *partParams, AttrNumber *partattrs,
        }
 }
 
+/*
+ * PartConstraintImpliedByRelConstraint
+ *             Does scanrel's existing constraints imply the partition constraint?
+ *
+ * Existing constraints includes its check constraints and column-level
+ * NOT NULL constraints and partConstraint describes the partition constraint.
+ */
+static bool
+PartConstraintImpliedByRelConstraint(Relation scanrel,
+                                                                        List *partConstraint)
+{
+       List       *existConstraint = NIL;
+       TupleConstr *constr = RelationGetDescr(scanrel)->constr;
+       int                     num_check,
+                               i;
+
+       if (constr && constr->has_not_null)
+       {
+               int                     natts = scanrel->rd_att->natts;
+
+               for (i = 1; i <= natts; i++)
+               {
+                       Form_pg_attribute att = scanrel->rd_att->attrs[i - 1];
+
+                       if (att->attnotnull && !att->attisdropped)
+                       {
+                               NullTest   *ntest = makeNode(NullTest);
+
+                               ntest->arg = (Expr *) makeVar(1,
+                                                                                         i,
+                                                                                         att->atttypid,
+                                                                                         att->atttypmod,
+                                                                                         att->attcollation,
+                                                                                         0);
+                               ntest->nulltesttype = IS_NOT_NULL;
+
+                               /*
+                                * argisrow=false is correct even for a composite column,
+                                * because attnotnull does not represent a SQL-spec IS NOT
+                                * NULL test in such a case, just IS DISTINCT FROM NULL.
+                                */
+                               ntest->argisrow = false;
+                               ntest->location = -1;
+                               existConstraint = lappend(existConstraint, ntest);
+                       }
+               }
+       }
+
+       num_check = (constr != NULL) ? constr->num_check : 0;
+       for (i = 0; i < num_check; i++)
+       {
+               Node       *cexpr;
+
+               /*
+                * If this constraint hasn't been fully validated yet, we must ignore
+                * it here.
+                */
+               if (!constr->check[i].ccvalid)
+                       continue;
+
+               cexpr = stringToNode(constr->check[i].ccbin);
+
+               /*
+                * Run each expression through const-simplification and
+                * canonicalization.  It is necessary, because we will be comparing it
+                * to similarly-processed partition constraint expressions, and may
+                * fail to detect valid matches without this.
+                */
+               cexpr = eval_const_expressions(NULL, cexpr);
+               cexpr = (Node *) canonicalize_qual((Expr *) cexpr);
+
+               existConstraint = list_concat(existConstraint,
+                                                                         make_ands_implicit((Expr *) cexpr));
+       }
+
+       if (existConstraint != NIL)
+               existConstraint = list_make1(make_ands_explicit(existConstraint));
+
+       /* And away we go ... */
+       return predicate_implied_by(partConstraint, existConstraint, true);
+}
+
+/*
+ * ValidatePartitionConstraints
+ *
+ * Check whether all rows in the given table obey the given partition
+ * constraint; if so, it can be attached as a partition.  We do this by
+ * scanning the table (or all of its leaf partitions) row by row, except when
+ * the existing constraints are sufficient to prove that the new partitioning
+ * constraint must already hold.
+ */
+static void
+ValidatePartitionConstraints(List **wqueue, Relation scanrel,
+                                                        List *scanrel_children,
+                                                        List *partConstraint)
+{
+       bool            found_whole_row;
+       ListCell   *lc;
+
+       if (partConstraint == NIL)
+               return;
+
+       /*
+        * Based on the table's existing constraints, determine if we can skip
+        * scanning the table to validate the partition constraint.
+        */
+       if (PartConstraintImpliedByRelConstraint(scanrel, partConstraint))
+       {
+               ereport(INFO,
+                               (errmsg("partition constraint for table \"%s\" is implied by existing constraints",
+                                               RelationGetRelationName(scanrel))));
+               return;
+       }
+
+       /* Constraints proved insufficient, so we need to scan the table. */
+       foreach(lc, scanrel_children)
+       {
+               AlteredTableInfo *tab;
+               Oid                     part_relid = lfirst_oid(lc);
+               Relation        part_rel;
+               List       *my_partconstr = partConstraint;
+
+               /* Lock already taken */
+               if (part_relid != RelationGetRelid(scanrel))
+                       part_rel = heap_open(part_relid, NoLock);
+               else
+                       part_rel = scanrel;
+
+               /*
+                * Skip if the partition is itself a partitioned table.  We can only
+                * ever scan RELKIND_RELATION relations.
+                */
+               if (part_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+               {
+                       if (part_rel != scanrel)
+                               heap_close(part_rel, NoLock);
+                       continue;
+               }
+
+               if (part_rel != scanrel)
+               {
+                       /*
+                        * Adjust the constraint for scanrel so that it matches this
+                        * partition's attribute numbers.
+                        */
+                       my_partconstr = map_partition_varattnos(my_partconstr, 1,
+                                                                                                       part_rel, scanrel,
+                                                                                                       &found_whole_row);
+                       /* There can never be a whole-row reference here */
+                       if (found_whole_row)
+                               elog(ERROR, "unexpected whole-row reference found in partition key");
+               }
+
+               /* Grab a work queue entry. */
+               tab = ATGetQueueEntry(wqueue, part_rel);
+               tab->partition_constraint = (Expr *) linitial(my_partconstr);
+
+               /* keep our lock until commit */
+               if (part_rel != scanrel)
+                       heap_close(part_rel, NoLock);
+       }
+}
+
 /*
  * ALTER TABLE <name> ATTACH PARTITION <partition-name> FOR VALUES
  *
@@ -13435,15 +13603,12 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
        Relation        attachrel,
                                catalog;
        List       *attachrel_children;
-       TupleConstr *attachrel_constr;
-       List       *partConstraint,
-                          *existConstraint;
+       List       *partConstraint;
        SysScanDesc scan;
        ScanKeyData skey;
        AttrNumber      attno;
        int                     natts;
        TupleDesc       tupleDesc;
-       bool            skip_validate = false;
        ObjectAddress address;
        const char *trigger_name;
        bool            found_whole_row;
@@ -13637,148 +13802,9 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
        if (found_whole_row)
                elog(ERROR, "unexpected whole-row reference found in partition key");
 
-       /*
-        * Check if we can do away with having to scan the table being attached to
-        * validate the partition constraint, by *proving* that the existing
-        * constraints of the table *imply* the partition predicate.  We include
-        * the table's check constraints and NOT NULL constraints in the list of
-        * clauses passed to predicate_implied_by().
-        *
-        * There is a case in which we cannot rely on just the result of the
-        * proof.
-        */
-       attachrel_constr = tupleDesc->constr;
-       existConstraint = NIL;
-       if (attachrel_constr != NULL)
-       {
-               int                     num_check = attachrel_constr->num_check;
-               int                     i;
-
-               if (attachrel_constr->has_not_null)
-               {
-                       int                     natts = attachrel->rd_att->natts;
-
-                       for (i = 1; i <= natts; i++)
-                       {
-                               Form_pg_attribute att = attachrel->rd_att->attrs[i - 1];
-
-                               if (att->attnotnull && !att->attisdropped)
-                               {
-                                       NullTest   *ntest = makeNode(NullTest);
-
-                                       ntest->arg = (Expr *) makeVar(1,
-                                                                                                 i,
-                                                                                                 att->atttypid,
-                                                                                                 att->atttypmod,
-                                                                                                 att->attcollation,
-                                                                                                 0);
-                                       ntest->nulltesttype = IS_NOT_NULL;
-
-                                       /*
-                                        * argisrow=false is correct even for a composite column,
-                                        * because attnotnull does not represent a SQL-spec IS NOT
-                                        * NULL test in such a case, just IS DISTINCT FROM NULL.
-                                        */
-                                       ntest->argisrow = false;
-                                       ntest->location = -1;
-                                       existConstraint = lappend(existConstraint, ntest);
-                               }
-                       }
-               }
-
-               for (i = 0; i < num_check; i++)
-               {
-                       Node       *cexpr;
-
-                       /*
-                        * If this constraint hasn't been fully validated yet, we must
-                        * ignore it here.
-                        */
-                       if (!attachrel_constr->check[i].ccvalid)
-                               continue;
-
-                       cexpr = stringToNode(attachrel_constr->check[i].ccbin);
-
-                       /*
-                        * Run each expression through const-simplification and
-                        * canonicalization.  It is necessary, because we will be
-                        * comparing it to similarly-processed qual clauses, and may fail
-                        * to detect valid matches without this.
-                        */
-                       cexpr = eval_const_expressions(NULL, cexpr);
-                       cexpr = (Node *) canonicalize_qual((Expr *) cexpr);
-
-                       existConstraint = list_concat(existConstraint,
-                                                                                 make_ands_implicit((Expr *) cexpr));
-               }
-
-               existConstraint = list_make1(make_ands_explicit(existConstraint));
-
-               /* And away we go ... */
-               if (predicate_implied_by(partConstraint, existConstraint, true))
-                       skip_validate = true;
-       }
-
-       if (skip_validate)
-       {
-               /* No need to scan the table after all. */
-               ereport(INFO,
-                               (errmsg("partition constraint for table \"%s\" is implied by existing constraints",
-                                               RelationGetRelationName(attachrel))));
-       }
-       else
-       {
-               /* Constraints proved insufficient, so we need to scan the table. */
-               ListCell   *lc;
-
-               foreach(lc, attachrel_children)
-               {
-                       AlteredTableInfo *tab;
-                       Oid                     part_relid = lfirst_oid(lc);
-                       Relation        part_rel;
-                       List       *my_partconstr = partConstraint;
-
-                       /* Lock already taken */
-                       if (part_relid != RelationGetRelid(attachrel))
-                               part_rel = heap_open(part_relid, NoLock);
-                       else
-                               part_rel = attachrel;
-
-                       /*
-                        * Skip if the partition is itself a partitioned table.  We can
-                        * only ever scan RELKIND_RELATION relations.
-                        */
-                       if (part_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-                       {
-                               if (part_rel != attachrel)
-                                       heap_close(part_rel, NoLock);
-                               continue;
-                       }
-
-                       if (part_rel != attachrel)
-                       {
-                               /*
-                                * Adjust the constraint that we constructed above for
-                                * attachRel so that it matches this partition's attribute
-                                * numbers.
-                                */
-                               my_partconstr = map_partition_varattnos(my_partconstr, 1,
-                                                                                                               part_rel, attachrel,
-                                                                                                               &found_whole_row);
-                               /* There can never be a whole-row reference here */
-                               if (found_whole_row)
-                                       elog(ERROR, "unexpected whole-row reference found in partition key");
-                       }
-
-                       /* Grab a work queue entry. */
-                       tab = ATGetQueueEntry(wqueue, part_rel);
-                       tab->partition_constraint = (Expr *) linitial(my_partconstr);
-
-                       /* keep our lock until commit */
-                       if (part_rel != attachrel)
-                               heap_close(part_rel, NoLock);
-               }
-       }
+       /* Validate partition constraints against the table being attached. */
+       ValidatePartitionConstraints(wqueue, attachrel, attachrel_children,
+                                                                partConstraint);
 
        ObjectAddressSet(address, RelationRelationId, RelationGetRelid(attachrel));