]> granicus.if.org Git - postgresql/commitdiff
Support foreign keys that reference partitioned tables
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 3 Apr 2019 17:38:20 +0000 (14:38 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 3 Apr 2019 17:40:21 +0000 (14:40 -0300)
Previously, while primary keys could be made on partitioned tables, it
was not possible to define foreign keys that reference those primary
keys.  Now it is possible to do that.

Author: Álvaro Herrera
Reviewed-by: Amit Langote, Jesper Pedersen
Discussion: https://postgr.es/m/20181102234158.735b3fevta63msbj@alvherre.pgsql

15 files changed:
doc/src/sgml/ref/create_table.sgml
src/backend/commands/tablecmds.c
src/backend/utils/adt/ri_triggers.c
src/backend/utils/adt/ruleutils.c
src/bin/psql/describe.c
src/include/commands/tablecmds.h
src/include/commands/trigger.h
src/include/utils/ruleutils.h
src/test/isolation/expected/fk-partitioned-1.out [new file with mode: 0644]
src/test/isolation/expected/fk-partitioned-2.out [new file with mode: 0644]
src/test/isolation/isolation_schedule
src/test/isolation/specs/fk-partitioned-1.spec [new file with mode: 0644]
src/test/isolation/specs/fk-partitioned-2.spec [new file with mode: 0644]
src/test/regress/expected/foreign_key.out
src/test/regress/sql/foreign_key.sql

index 0fcbc660b319b668accb03803ce73c6760e912a2..99b95bbdb43684744ba3bb02b1bf7e4f6db5d43c 100644 (file)
@@ -379,9 +379,6 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
      <para>
       Partitioned tables do not support <literal>EXCLUDE</literal> constraints;
       however, you can define these constraints on individual partitions.
-      Also, while it's possible to define <literal>PRIMARY KEY</literal>
-      constraints on partitioned tables, creating foreign keys that
-      reference a partitioned table is not yet supported.
      </para>
 
      <para>
@@ -1028,9 +1025,7 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       addition of a foreign key constraint requires a
       <literal>SHARE ROW EXCLUSIVE</literal> lock on the referenced table.
       Note that foreign key constraints cannot be defined between temporary
-      tables and permanent tables.  Also note that while it is possible to
-      define a foreign key on a partitioned table, it is not possible to
-      declare a foreign key that references a partitioned table.
+      tables and permanent tables.
      </para>
 
      <para>
index 654179297cfe927e1ca7dd9ab30751f319971c3c..978b6bec44a19518ccd5d0dbef1dc45ccbe1244e 100644 (file)
@@ -415,10 +415,32 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *
                                                  Relation rel, Constraint *fkconstraint, Oid parentConstr,
                                                  bool recurse, bool recursing,
                                                  LOCKMODE lockmode);
-static void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
-                                                  List **cloned);
-static void CloneFkReferencing(Relation pg_constraint, Relation parentRel,
-                                  Relation partRel, List *clone, List **cloned);
+static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint,
+                                          Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr,
+                                          int numfks, int16 *pkattnum, int16 *fkattnum,
+                                          Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+                                          bool old_check_ok);
+static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint,
+                                               Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr,
+                                               int numfks, int16 *pkattnum, int16 *fkattnum,
+                                               Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+                                               bool old_check_ok, LOCKMODE lockmode);
+static void CloneForeignKeyConstraints(List **wqueue, Relation parentRel,
+                                                  Relation partitionRel);
+static void CloneFkReferenced(Relation parentRel, Relation partitionRel);
+static void CloneFkReferencing(List **wqueue, Relation parentRel,
+                                  Relation partRel);
+static void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
+                                                         Constraint *fkconstraint, Oid constraintOid,
+                                                         Oid indexOid);
+static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid,
+                                                          Constraint *fkconstraint, Oid constraintOid,
+                                                          Oid indexOid);
+static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
+                                                        Oid partRelid,
+                                                        Oid parentConstrOid, int numfks,
+                                                        AttrNumber *mapped_conkey, AttrNumber *confkey,
+                                                        Oid *conpfeqop);
 static void ATExecDropConstraint(Relation rel, const char *constrName,
                                         DropBehavior behavior,
                                         bool recurse, bool recursing,
@@ -501,6 +523,8 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
                                          Relation partitionTbl);
 static void update_relispartition(Relation classRel, Oid relationId,
                                          bool newval);
+static List *GetParentedForeignKeyRefs(Relation partition);
+static void ATDetachCheckNoForeignKeyRefs(Relation partition);
 
 
 /* ----------------------------------------------------------------
@@ -1083,7 +1107,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
                 * And foreign keys too.  Note that because we're freshly creating the
                 * table, there is no need to verify these new constraints.
                 */
-               CloneForeignKeyConstraints(parentId, relationId, NULL);
+               CloneForeignKeyConstraints(NULL, parent, rel);
 
                table_close(parent, NoLock);
        }
@@ -3563,7 +3587,8 @@ AlterTableGetLockLevel(List *cmds)
 
                                /*
                                 * Removing constraints can affect SELECTs that have been
-                                * optimised assuming the constraint holds true.
+                                * optimised assuming the constraint holds true. See also
+                                * CloneFkReferenced.
                                 */
                        case AT_DropConstraint: /* as DROP INDEX */
                        case AT_DropNotNull:    /* may change some SQL plans */
@@ -7224,9 +7249,6 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
                case CONSTR_FOREIGN:
 
                        /*
-                        * Note that we currently never recurse for FK constraints, so the
-                        * "recurse" flag is silently ignored.
-                        *
                         * Assign or validate constraint name
                         */
                        if (newConstraint->conname)
@@ -7444,6 +7466,13 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
  * Subroutine for ATExecAddConstraint.  Must already hold exclusive
  * lock on the rel, and have done appropriate validity checks for it.
  * We do permissions checks here, however.
+ *
+ * When the referenced or referencing tables (or both) are partitioned,
+ * multiple pg_constraint rows are required -- one for each partitioned table
+ * and each partition on each side (fortunately, not one for every combination
+ * thereof).  We also need action triggers on each leaf partition on the
+ * referenced side, and check triggers on each leaf partition on the
+ * referencing side.
  */
 static ObjectAddress
 ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
@@ -7459,12 +7488,10 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
        Oid                     pfeqoperators[INDEX_MAX_KEYS];
        Oid                     ppeqoperators[INDEX_MAX_KEYS];
        Oid                     ffeqoperators[INDEX_MAX_KEYS];
-       bool            connoinherit;
        int                     i;
        int                     numfks,
                                numpks;
        Oid                     indexOid;
-       Oid                     constrOid;
        bool            old_check_ok;
        ObjectAddress address;
        ListCell   *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop);
@@ -7482,12 +7509,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
         * Validity checks (permission checks wait till we have the column
         * numbers)
         */
-       if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-               ereport(ERROR,
-                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                errmsg("cannot reference partitioned table \"%s\"",
-                                               RelationGetRelationName(pkrel))));
-
        if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
        {
                if (!recurse)
@@ -7505,7 +7526,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
                                         errdetail("This feature is not yet supported on partitioned tables.")));
        }
 
-       if (pkrel->rd_rel->relkind != RELKIND_RELATION)
+       if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+               pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
                ereport(ERROR,
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                 errmsg("referenced relation \"%s\" is not a table",
@@ -7741,8 +7763,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
                if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
                        ereport(ERROR,
                                        (errcode(ERRCODE_DATATYPE_MISMATCH),
-                                        errmsg("foreign key constraint \"%s\" "
-                                                       "cannot be implemented",
+                                        errmsg("foreign key constraint \"%s\" cannot be implemented",
                                                        fkconstraint->conname),
                                         errdetail("Key columns \"%s\" and \"%s\" "
                                                           "are of incompatible types: %s and %s.",
@@ -7830,15 +7851,126 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
        }
 
        /*
-        * FKs always inherit for partitioned tables, and never for legacy
-        * inheritance.
+        * Create all the constraint and trigger objects, recursing to partitions
+        * as necessary.  First handle the referenced side.
+        */
+       address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel,
+                                                                        indexOid,
+                                                                        InvalidOid,    /* no parent constraint */
+                                                                        numfks,
+                                                                        pkattnum,
+                                                                        fkattnum,
+                                                                        pfeqoperators,
+                                                                        ppeqoperators,
+                                                                        ffeqoperators,
+                                                                        old_check_ok);
+
+       /* Now handle the referencing side. */
+       addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel,
+                                                       indexOid,
+                                                       address.objectId,
+                                                       numfks,
+                                                       pkattnum,
+                                                       fkattnum,
+                                                       pfeqoperators,
+                                                       ppeqoperators,
+                                                       ffeqoperators,
+                                                       old_check_ok,
+                                                       lockmode);
+
+       /*
+        * Done.  Close pk table, but keep lock until we've committed.
+        */
+       table_close(pkrel, NoLock);
+
+       return address;
+}
+
+/*
+ * addFkRecurseReferenced
+ *             subroutine for ATAddForeignKeyConstraint; recurses on the referenced
+ *             side of the constraint
+ *
+ * Create pg_constraint rows for the referenced side of the constraint,
+ * referencing the parent of the referencing side; also create action triggers
+ * on leaf partitions.  If the table is partitioned, recurse to handle each
+ * partition.
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the root referencing relation.
+ * pkrel is the referenced relation; might be a partition, if recursing.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstr is the OID of a parent constraint; InvalidOid if this is a
+ * top-level constraint.
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ * was already validated (thus this one doesn't need validation).
+ */
+static ObjectAddress
+addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
+                                          Relation pkrel, Oid indexOid, Oid parentConstr,
+                                          int numfks,
+                                          int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators,
+                                          Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok)
+{
+       ObjectAddress address;
+       Oid                     constrOid;
+       char       *conname;
+       bool            conislocal;
+       int                     coninhcount;
+       bool            connoinherit;
+
+       /*
+        * Verify relkind for each referenced partition.  At the top level, this
+        * is redundant with a previous check, but we need it when recursing.
         */
-       connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE;
+       if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+               pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("referenced relation \"%s\" is not a table",
+                                               RelationGetRelationName(pkrel))));
+
+       /*
+        * Caller supplies us with a constraint name; however, it may be used in
+        * this partition, so come up with a different one in that case.
+        */
+       if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+                                                        RelationGetRelid(rel),
+                                                        fkconstraint->conname))
+               conname = ChooseConstraintName(RelationGetRelationName(rel),
+                                                                          ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+                                                                          "fkey",
+                                                                          RelationGetNamespace(rel), NIL);
+       else
+               conname = fkconstraint->conname;
+
+       if (OidIsValid(parentConstr))
+       {
+               conislocal = false;
+               coninhcount = 1;
+               connoinherit = false;
+       }
+       else
+       {
+               conislocal = true;
+               coninhcount = 0;
+
+               /*
+                * always inherit for partitioned tables, never for legacy inheritance
+                */
+               connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE;
+       }
 
        /*
         * Record the FK constraint in pg_constraint.
         */
-       constrOid = CreateConstraintEntry(fkconstraint->conname,
+       constrOid = CreateConstraintEntry(conname,
                                                                          RelationGetNamespace(rel),
                                                                          CONSTRAINT_FOREIGN,
                                                                          fkconstraint->deferrable,
@@ -7856,108 +7988,317 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
                                                                          pfeqoperators,
                                                                          ppeqoperators,
                                                                          ffeqoperators,
-                                                                         numpks,
+                                                                         numfks,
                                                                          fkconstraint->fk_upd_action,
                                                                          fkconstraint->fk_del_action,
                                                                          fkconstraint->fk_matchtype,
                                                                          NULL, /* no exclusion constraint */
                                                                          NULL, /* no check constraint */
                                                                          NULL,
-                                                                         true, /* islocal */
-                                                                         0,    /* inhcount */
-                                                                         connoinherit, /* conNoInherit */
+                                                                         conislocal,   /* islocal */
+                                                                         coninhcount,  /* inhcount */
+                                                                         connoinherit, /* conNoInherit */
                                                                          false);       /* is_internal */
+
        ObjectAddressSet(address, ConstraintRelationId, constrOid);
 
        /*
-        * Create the triggers that will enforce the constraint.  We only want the
-        * action triggers to appear for the parent partitioned relation, even
-        * though the constraints also exist below.
+        * Mark the child constraint as part of the parent constraint; it must not
+        * be dropped on its own.  (This constraint is deleted when the partition
+        * is detached, but a special check needs to occur that the partition
+        * contains no referenced values.)
         */
-       createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
-                                                        constrOid, indexOid, !recursing);
+       if (OidIsValid(parentConstr))
+       {
+               ObjectAddress referenced;
+
+               ObjectAddressSet(referenced, ConstraintRelationId, parentConstr);
+               recordDependencyOn(&address, &referenced, DEPENDENCY_INTERNAL);
+       }
+
+       /* make new constraint visible, in case we add more */
+       CommandCounterIncrement();
 
        /*
-        * Tell Phase 3 to check that the constraint is satisfied by existing
-        * rows. We can skip this during table creation, when requested explicitly
-        * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're
-        * recreating a constraint following a SET DATA TYPE operation that did
-        * not impugn its validity.
+        * If the referenced table is a plain relation, create the action triggers
+        * that enforce the constraint.
         */
-       if (!old_check_ok && !fkconstraint->skip_validation)
+       if (pkrel->rd_rel->relkind == RELKIND_RELATION)
        {
-               NewConstraint *newcon;
-
-               newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
-               newcon->name = fkconstraint->conname;
-               newcon->contype = CONSTR_FOREIGN;
-               newcon->refrelid = RelationGetRelid(pkrel);
-               newcon->refindid = indexOid;
-               newcon->conid = constrOid;
-               newcon->qual = (Node *) fkconstraint;
-
-               tab->constraints = lappend(tab->constraints, newcon);
+               createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel),
+                                                                          fkconstraint,
+                                                                          constrOid, indexOid);
        }
 
        /*
-        * When called on a partitioned table, recurse to create the constraint on
-        * the partitions also.
+        * If the referenced table is partitioned, recurse on ourselves to handle
+        * each partition.  We need one pg_constraint row created for each
+        * partition in addition to the pg_constraint row for the parent table.
         */
-       if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+       if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
        {
-               PartitionDesc partdesc;
-               Relation        pg_constraint;
-               List       *cloned = NIL;
-               ListCell   *cell;
-
-               pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock);
-               partdesc = RelationGetPartitionDesc(rel);
+               PartitionDesc pd = RelationGetPartitionDesc(pkrel);
 
-               for (i = 0; i < partdesc->nparts; i++)
+               for (int i = 0; i < pd->nparts; i++)
                {
-                       Oid                     partitionId = partdesc->oids[i];
-                       Relation        partition = table_open(partitionId, lockmode);
-
-                       CheckTableNotInUse(partition, "ALTER TABLE");
+                       Relation        partRel;
+                       AttrNumber *map;
+                       AttrNumber *mapped_pkattnum;
+                       Oid                     partIndexId;
 
-                       CloneFkReferencing(pg_constraint, rel, partition,
-                                                          list_make1_oid(constrOid),
-                                                          &cloned);
+                       partRel = table_open(pd->oids[i], ShareRowExclusiveLock);
 
-                       table_close(partition, NoLock);
+                       /*
+                        * Map the attribute numbers in the referenced side of the FK
+                        * definition to match the partition's column layout.
+                        */
+                       map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel),
+                                                                                                       RelationGetDescr(pkrel),
+                                                                                                       gettext_noop("could not convert row type"));
+                       if (map)
+                       {
+                               mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks);
+                               for (int j = 0; j < numfks; j++)
+                                       mapped_pkattnum[j] = map[pkattnum[j] - 1];
+                       }
+                       else
+                               mapped_pkattnum = pkattnum;
+
+                       /* do the deed */
+                       partIndexId = index_get_partition(partRel, indexOid);
+                       if (!OidIsValid(partIndexId))
+                               elog(ERROR, "index for %u not found in partition %s",
+                                        indexOid, RelationGetRelationName(partRel));
+                       addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel,
+                                                                  partIndexId, constrOid, numfks,
+                                                                  mapped_pkattnum, fkattnum,
+                                                                  pfeqoperators, ppeqoperators, ffeqoperators,
+                                                                  old_check_ok);
+
+                       /* Done -- clean up (but keep the lock) */
+                       table_close(partRel, NoLock);
+                       if (map)
+                       {
+                               pfree(mapped_pkattnum);
+                               pfree(map);
+                       }
                }
-               table_close(pg_constraint, RowExclusiveLock);
+       }
+
+       return address;
+}
+
+/*
+ * addFkRecurseReferencing
+ *             subroutine for ATAddForeignKeyConstraint and CloneFkReferencing
+ *
+ * If the referencing relation is a plain relation, create the necessary check
+ * triggers that implement the constraint, and set up for Phase 3 constraint
+ * verification.  If the referencing relation is a partitioned table, then
+ * we create a pg_constraint row for it and recurse on this routine for each
+ * partition.
+ *
+ * We assume that the referenced relation is locked against concurrent
+ * deletions.  If it's a partitioned relation, every partition must be so
+ * locked.
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the referencing relation; might be a partition, if recursing.
+ * pkrel is the root referenced relation.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstr is the OID of the parent constraint (there is always one).
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ *             was already validated (thus this one doesn't need validation).
+ * lockmode is the lockmode to acquire on partitions when recursing.
+ */
+static void
+addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
+                                               Relation pkrel, Oid indexOid, Oid parentConstr,
+                                               int numfks, int16 *pkattnum, int16 *fkattnum,
+                                               Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+                                               bool old_check_ok, LOCKMODE lockmode)
+{
+       AssertArg(OidIsValid(parentConstr));
+
+       if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("foreign keys constraints are not supported on foreign tables")));
+
+       /*
+        * If the referencing relation is a plain table, add the check triggers to
+        * it and, if necessary, schedule it to be checked in Phase 3.
+        *
+        * If the relation is partitioned, drill down to do it to its partitions.
+        */
+       if (rel->rd_rel->relkind == RELKIND_RELATION)
+       {
+               createForeignKeyCheckTriggers(RelationGetRelid(rel),
+                                                                         RelationGetRelid(pkrel),
+                                                                         fkconstraint,
+                                                                         parentConstr,
+                                                                         indexOid);
 
-               foreach(cell, cloned)
+               /*
+                * Tell Phase 3 to check that the constraint is satisfied by existing
+                * rows. We can skip this during table creation, when requested
+                * explicitly by specifying NOT VALID in an ADD FOREIGN KEY command,
+                * and when we're recreating a constraint following a SET DATA TYPE
+                * operation that did not impugn its validity.
+                */
+               if (wqueue && !old_check_ok && !fkconstraint->skip_validation)
                {
-                       ClonedConstraint *cc = (ClonedConstraint *) lfirst(cell);
-                       Relation    partition = table_open(cc->relid, lockmode);
-                       AlteredTableInfo *childtab;
                        NewConstraint *newcon;
+                       AlteredTableInfo *tab;
 
-                       /* Find or create work queue entry for this partition */
-                       childtab = ATGetQueueEntry(wqueue, partition);
+                       tab = ATGetQueueEntry(wqueue, rel);
 
                        newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
-                       newcon->name = cc->constraint->conname;
+                       newcon->name = get_constraint_name(parentConstr);
                        newcon->contype = CONSTR_FOREIGN;
-                       newcon->refrelid = cc->refrelid;
-                       newcon->refindid = cc->conindid;
-                       newcon->conid = cc->conid;
+                       newcon->refrelid = RelationGetRelid(pkrel);
+                       newcon->refindid = indexOid;
+                       newcon->conid = parentConstr;
                        newcon->qual = (Node *) fkconstraint;
 
-                       childtab->constraints = lappend(childtab->constraints, newcon);
-
-                       table_close(partition, lockmode);
+                       tab->constraints = lappend(tab->constraints, newcon);
                }
        }
+       else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+       {
+               PartitionDesc pd = RelationGetPartitionDesc(rel);
 
-       /*
-        * Close pk table, but keep lock until we've committed.
-        */
-       table_close(pkrel, NoLock);
+               /*
+                * Recurse to take appropriate action on each partition; either we
+                * find an existing constraint to reparent to ours, or we create a new
+                * one.
+                */
+               for (int i = 0; i < pd->nparts; i++)
+               {
+                       Oid                     partitionId = pd->oids[i];
+                       Relation        partition = table_open(partitionId, lockmode);
+                       List       *partFKs;
+                       AttrNumber *attmap;
+                       AttrNumber      mapped_fkattnum[INDEX_MAX_KEYS];
+                       bool            attached;
+                       char       *conname;
+                       Oid                     constrOid;
+                       ObjectAddress address,
+                                               referenced;
+                       ListCell   *cell;
 
-       return address;
+                       CheckTableNotInUse(partition, "ALTER TABLE");
+
+                       attmap = convert_tuples_by_name_map(RelationGetDescr(partition),
+                                                                                               RelationGetDescr(rel),
+                                                                                               gettext_noop("could not convert row type"));
+                       for (int j = 0; j < numfks; j++)
+                               mapped_fkattnum[j] = attmap[fkattnum[j] - 1];
+
+                       /* Check whether an existing constraint can be repurposed */
+                       partFKs = copyObject(RelationGetFKeyList(partition));
+                       attached = false;
+                       foreach(cell, partFKs)
+                       {
+                               ForeignKeyCacheInfo *fk;
+
+                               fk = lfirst_node(ForeignKeyCacheInfo, cell);
+                               if (tryAttachPartitionForeignKey(fk,
+                                                                                                partitionId,
+                                                                                                parentConstr,
+                                                                                                numfks,
+                                                                                                mapped_fkattnum,
+                                                                                                pkattnum,
+                                                                                                pfeqoperators))
+                               {
+                                       attached = true;
+                                       break;
+                               }
+                       }
+                       if (attached)
+                       {
+                               table_close(partition, NoLock);
+                               continue;
+                       }
+
+                       /*
+                        * No luck finding a good constraint to reuse; create our own.
+                        */
+                       if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+                                                                        RelationGetRelid(partition),
+                                                                        fkconstraint->conname))
+                               conname = ChooseConstraintName(RelationGetRelationName(partition),
+                                                                                          ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+                                                                                          "fkey",
+                                                                                          RelationGetNamespace(partition), NIL);
+                       else
+                               conname = fkconstraint->conname;
+                       constrOid =
+                               CreateConstraintEntry(conname,
+                                                                         RelationGetNamespace(partition),
+                                                                         CONSTRAINT_FOREIGN,
+                                                                         fkconstraint->deferrable,
+                                                                         fkconstraint->initdeferred,
+                                                                         fkconstraint->initially_valid,
+                                                                         parentConstr,
+                                                                         partitionId,
+                                                                         mapped_fkattnum,
+                                                                         numfks,
+                                                                         numfks,
+                                                                         InvalidOid,
+                                                                         indexOid,
+                                                                         RelationGetRelid(pkrel),
+                                                                         pkattnum,
+                                                                         pfeqoperators,
+                                                                         ppeqoperators,
+                                                                         ffeqoperators,
+                                                                         numfks,
+                                                                         fkconstraint->fk_upd_action,
+                                                                         fkconstraint->fk_del_action,
+                                                                         fkconstraint->fk_matchtype,
+                                                                         NULL,
+                                                                         NULL,
+                                                                         NULL,
+                                                                         false,
+                                                                         1,
+                                                                         false,
+                                                                         false);
+
+                       /*
+                        * Give this constraint partition-type dependencies on the parent
+                        * constraint as well as the table.
+                        */
+                       ObjectAddressSet(address, ConstraintRelationId, constrOid);
+                       ObjectAddressSet(referenced, ConstraintRelationId, parentConstr);
+                       recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI);
+                       ObjectAddressSet(referenced, RelationRelationId, partitionId);
+                       recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC);
+
+                       /* Make all this visible before recursing */
+                       CommandCounterIncrement();
+
+                       /* call ourselves to finalize the creation and we're done */
+                       addFkRecurseReferencing(wqueue, fkconstraint, partition, pkrel,
+                                                                       indexOid,
+                                                                       constrOid,
+                                                                       numfks,
+                                                                       pkattnum,
+                                                                       mapped_fkattnum,
+                                                                       pfeqoperators,
+                                                                       ppeqoperators,
+                                                                       ffeqoperators,
+                                                                       old_check_ok,
+                                                                       lockmode);
+
+                       table_close(partition, NoLock);
+               }
+       }
 }
 
 /*
@@ -7965,77 +8306,219 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
  *             Clone foreign keys from a partitioned table to a newly acquired
  *             partition.
  *
- * relationId is a partition of parentId, so we can be certain that it has the
- * same columns with the same datatypes.  The columns may be in different
+ * partitionRel is a partition of parentRel, so we can be certain that it has
+ * the same columns with the same datatypes.  The columns may be in different
  * order, though.
  *
- * The *cloned list is appended ClonedConstraint elements describing what was
- * created, for the purposes of validating the constraint in ALTER TABLE's
- * Phase 3.
+ * wqueue must be passed to set up phase 3 constraint checking, unless the
+ * referencing-side partition is known to be empty (such as in CREATE TABLE /
+ * PARTITION OF).
  */
 static void
-CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
+CloneForeignKeyConstraints(List **wqueue, Relation parentRel,
+                                                  Relation partitionRel)
+{
+       /* This only works for declarative partitioning */
+       Assert(parentRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+
+       /*
+        * Clone constraints for which the parent is on the referenced side.
+        */
+       CloneFkReferenced(parentRel, partitionRel);
+
+       /*
+        * Now clone constraints where the parent is on the referencing side.
+        */
+       CloneFkReferencing(wqueue, parentRel, partitionRel);
+}
+
+/*
+ * CloneFkReferenced
+ *             Subroutine for CloneForeignKeyConstraints
+ *
+ * Find all the FKs that have the parent relation on the referenced side;
+ * clone those constraints to the given partition.  This is to be called
+ * when the partition is being created or attached.
+ *
+ * This recurses to partitions, if the relation being attached is partitioned.
+ * Recursion is done by calling addFkRecurseReferenced.
+ */
+static void
+CloneFkReferenced(Relation parentRel, Relation partitionRel)
 {
        Relation        pg_constraint;
-       Relation        parentRel;
-       Relation        rel;
-       ScanKeyData key;
+       AttrNumber *attmap;
+       ListCell   *cell;
        SysScanDesc scan;
+       ScanKeyData key[2];
        HeapTuple       tuple;
        List       *clone = NIL;
 
-       parentRel = table_open(parentId, NoLock);       /* already got lock */
-       /* see ATAddForeignKeyConstraint about lock level */
-       rel = table_open(relationId, AccessExclusiveLock);
+       /*
+        * Search for any constraints where this partition is in the referenced
+        * side.  However, we must ignore any constraint whose parent constraint
+        * is also going to be cloned, to avoid duplicates.  So do it in two
+        * steps: first construct the list of constraints to clone, then go over
+        * that list cloning those whose parents are not in the list.  (We must
+        * not rely on the parent being seen first, since the catalog scan could
+        * return children first.)
+        */
        pg_constraint = table_open(ConstraintRelationId, RowShareLock);
-
-       /* Obtain the list of constraints to clone or attach */
-       ScanKeyInit(&key,
-                               Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
-                               F_OIDEQ, ObjectIdGetDatum(parentId));
-       scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
-                                                         NULL, 1, &key);
+       ScanKeyInit(&key[0],
+                               Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+                               F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parentRel)));
+       ScanKeyInit(&key[1],
+                               Anum_pg_constraint_contype, BTEqualStrategyNumber,
+                               F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+       /* This is a seqscan, as we don't have a usable index ... */
+       scan = systable_beginscan(pg_constraint, InvalidOid, true,
+                                                         NULL, 2, key);
        while ((tuple = systable_getnext(scan)) != NULL)
        {
-               Oid             oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+               Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
 
-               clone = lappend_oid(clone, oid);
+               /* Only try to clone the top-level constraint; skip child ones. */
+               if (constrForm->conparentid != InvalidOid)
+                       continue;
+
+               clone = lappend_oid(clone, constrForm->oid);
        }
        systable_endscan(scan);
+       table_close(pg_constraint, RowShareLock);
 
-       /* Do the actual work, recursing to partitions as needed */
-       CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned);
+       attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel),
+                                                                               RelationGetDescr(parentRel),
+                                                                               gettext_noop("could not convert row type"));
+       foreach(cell, clone)
+       {
+               Oid                     constrOid = lfirst_oid(cell);
+               Form_pg_constraint constrForm;
+               Relation        fkRel;
+               Oid                     indexOid;
+               Oid                     partIndexId;
+               int                     numfks;
+               AttrNumber      conkey[INDEX_MAX_KEYS];
+               AttrNumber      mapped_confkey[INDEX_MAX_KEYS];
+               AttrNumber      confkey[INDEX_MAX_KEYS];
+               Oid                     conpfeqop[INDEX_MAX_KEYS];
+               Oid                     conppeqop[INDEX_MAX_KEYS];
+               Oid                     conffeqop[INDEX_MAX_KEYS];
+               Constraint *fkconstraint;
 
-       /* We're done.  Clean up */
-       table_close(parentRel, NoLock);
-       table_close(rel, NoLock);       /* keep lock till commit */
-       table_close(pg_constraint, RowShareLock);
+               tuple = SearchSysCache1(CONSTROID, constrOid);
+               if (!HeapTupleIsValid(tuple))
+                       elog(ERROR, "cache lookup failed for constraint %u", constrOid);
+               constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+               /*
+                * Because we're only expanding the key space at the referenced side,
+                * we don't need to prevent any operation in the referencing table, so
+                * AccessShareLock suffices (assumes that dropping the constraint
+                * acquires AEL).
+                */
+               fkRel = table_open(constrForm->conrelid, AccessShareLock);
+
+               indexOid = constrForm->conindid;
+               DeconstructFkConstraintRow(tuple,
+                                                                  &numfks,
+                                                                  conkey,
+                                                                  confkey,
+                                                                  conpfeqop,
+                                                                  conppeqop,
+                                                                  conffeqop);
+               for (int i = 0; i < numfks; i++)
+                       mapped_confkey[i] = attmap[confkey[i] - 1];
+
+               fkconstraint = makeNode(Constraint);
+               /* for now this is all we need */
+               fkconstraint->conname = NameStr(constrForm->conname);
+               fkconstraint->fk_upd_action = constrForm->confupdtype;
+               fkconstraint->fk_del_action = constrForm->confdeltype;
+               fkconstraint->deferrable = constrForm->condeferrable;
+               fkconstraint->initdeferred = constrForm->condeferred;
+               fkconstraint->initially_valid = true;
+               fkconstraint->fk_matchtype = constrForm->confmatchtype;
+
+               /* set up colnames that are used to generate the constraint name */
+               for (int i = 0; i < numfks; i++)
+               {
+                       Form_pg_attribute att;
+
+                       att = TupleDescAttr(RelationGetDescr(fkRel),
+                                                               conkey[i] - 1);
+                       fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs,
+                                                                                        makeString(NameStr(att->attname)));
+               }
+
+               /*
+                * Add the new foreign key constraint pointing to the new partition.
+                * Because this new partition appears in the referenced side of the
+                * constraint, we don't need to set up for Phase 3 check.
+                */
+               partIndexId = index_get_partition(partitionRel, indexOid);
+               if (!OidIsValid(partIndexId))
+                       elog(ERROR, "index for %u not found in partition %s",
+                                indexOid, RelationGetRelationName(partitionRel));
+               addFkRecurseReferenced(NULL,
+                                                          fkconstraint,
+                                                          fkRel,
+                                                          partitionRel,
+                                                          partIndexId,
+                                                          constrOid,
+                                                          numfks,
+                                                          mapped_confkey,
+                                                          conkey,
+                                                          conpfeqop,
+                                                          conppeqop,
+                                                          conffeqop,
+                                                          true);
+
+               table_close(fkRel, NoLock);
+               ReleaseSysCache(tuple);
+       }
 }
 
 /*
  * CloneFkReferencing
- *             Recursive subroutine for CloneForeignKeyConstraints, referencing side
- *
- * Clone the given list of FK constraints when a partition is attached on the
- * referencing side of those constraints.
+ *             Subroutine for CloneForeignKeyConstraints
  *
- * When cloning foreign keys to a partition, it may happen that equivalent
- * constraints already exist in the partition for some of them.  We can skip
- * creating a clone in that case, and instead just attach the existing
- * constraint to the one in the parent.
+ * For each FK constraint of the parent relation in the given list, find an
+ * equivalent constraint in its partition relation that can be reparented;
+ * if one cannot be found, create a new constraint in the partition as its
+ * child.
  *
- * This function recurses to partitions, if the new partition is partitioned;
- * of course, only do this for FKs that were actually cloned.
+ * If wqueue is given, it is used to set up phase-3 verification for each
+ * cloned constraint; if omitted, we assume that such verification is not
+ * needed (example: the partition is being created anew).
  */
 static void
-CloneFkReferencing(Relation pg_constraint, Relation parentRel,
-                                  Relation partRel, List *clone, List **cloned)
+CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
 {
        AttrNumber *attmap;
        List       *partFKs;
-       List       *subclone = NIL;
+       List       *clone = NIL;
        ListCell   *cell;
 
+       /* obtain a list of constraints that we need to clone */
+       foreach(cell, RelationGetFKeyList(parentRel))
+       {
+               ForeignKeyCacheInfo *fk = lfirst(cell);
+
+               clone = lappend_oid(clone, fk->conoid);
+       }
+
+       /*
+        * Silently do nothing if there's nothing to do.  In particular, this
+        * avoids throwing a spurious error for foreign tables.
+        */
+       if (clone == NIL)
+               return;
+
+       if (partRel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("foreign keys constraints are not supported on foreign tables")));
+
        /*
         * The constraint key may differ, if the columns in the partition are
         * different.  This map is used to convert them.
@@ -8050,6 +8533,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
        {
                Oid                     parentConstrOid = lfirst_oid(cell);
                Form_pg_constraint constrForm;
+               Relation        pkrel;
                HeapTuple       tuple;
                int                     numfks;
                AttrNumber      conkey[INDEX_MAX_KEYS];
@@ -8059,13 +8543,12 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
                Oid                     conppeqop[INDEX_MAX_KEYS];
                Oid                     conffeqop[INDEX_MAX_KEYS];
                Constraint *fkconstraint;
-               bool            attach_it;
+               bool            attached;
+               Oid                     indexOid;
                Oid                     constrOid;
-               ObjectAddress parentAddr,
-                                       childAddr,
-                                       childTableAddr;
+               ObjectAddress address,
+                                       referenced;
                ListCell   *cell;
-               int                     i;
 
                tuple = SearchSysCache1(CONSTROID, parentConstrOid);
                if (!tuple)
@@ -8073,161 +8556,92 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
                                 parentConstrOid);
                constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
 
-               /* only foreign keys */
-               if (constrForm->contype != CONSTRAINT_FOREIGN)
+               /* Don't clone constraints whose parents are being cloned */
+               if (list_member_oid(clone, constrForm->conparentid))
                {
                        ReleaseSysCache(tuple);
                        continue;
                }
 
-               ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
+               /*
+                * Need to prevent concurrent deletions.  If pkrel is a partitioned
+                * relation, that means to lock all partitions.
+                */
+               pkrel = table_open(constrForm->confrelid, ShareRowExclusiveLock);
+               if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+                       (void) find_all_inheritors(RelationGetRelid(pkrel),
+                                                                          ShareRowExclusiveLock, NULL);
 
                DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey,
                                                                   conpfeqop, conppeqop, conffeqop);
-               for (i = 0; i < numfks; i++)
+               for (int i = 0; i < numfks; i++)
                        mapped_conkey[i] = attmap[conkey[i] - 1];
 
                /*
                 * Before creating a new constraint, see whether any existing FKs are
-                * fit for the purpose.  If one is, attach the parent constraint to it,
-                * and don't clone anything.  This way we avoid the expensive
-                * verification step and don't end up with a duplicate FK.  This also
-                * means we don't consider this constraint when recursing to
-                * partitions.
+                * fit for the purpose.  If one is, attach the parent constraint to
+                * it, and don't clone anything.  This way we avoid the expensive
+                * verification step and don't end up with a duplicate FK, and we
+                * don't need to recurse to partitions for this constraint.
                 */
-               attach_it = false;
+               attached = false;
                foreach(cell, partFKs)
                {
                        ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
-                       Form_pg_constraint partConstr;
-                       HeapTuple       partcontup;
-                       Relation        trigrel;
-                       HeapTuple       trigtup;
-                       SysScanDesc scan;
-                       ScanKeyData key;
-
-                       attach_it = true;
-
-                       /*
-                        * Do some quick & easy initial checks.  If any of these fail, we
-                        * cannot use this constraint, but keep looking.
-                        */
-                       if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks)
-                       {
-                               attach_it = false;
-                               continue;
-                       }
-                       for (i = 0; i < numfks; i++)
-                       {
-                               if (fk->conkey[i] != mapped_conkey[i] ||
-                                       fk->confkey[i] != confkey[i] ||
-                                       fk->conpfeqop[i] != conpfeqop[i])
-                               {
-                                       attach_it = false;
-                                       break;
-                               }
-                       }
-                       if (!attach_it)
-                               continue;
 
-                       /*
-                        * Looks good so far; do some more extensive checks.  Presumably
-                        * the check for 'convalidated' could be dropped, since we don't
-                        * really care about that, but let's be careful for now.
-                        */
-                       partcontup = SearchSysCache1(CONSTROID,
-                                                                                ObjectIdGetDatum(fk->conoid));
-                       if (!partcontup)
-                               elog(ERROR, "cache lookup failed for constraint %u",
-                                        fk->conoid);
-                       partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
-                       if (OidIsValid(partConstr->conparentid) ||
-                               !partConstr->convalidated ||
-                               partConstr->condeferrable != constrForm->condeferrable ||
-                               partConstr->condeferred != constrForm->condeferred ||
-                               partConstr->confupdtype != constrForm->confupdtype ||
-                               partConstr->confdeltype != constrForm->confdeltype ||
-                               partConstr->confmatchtype != constrForm->confmatchtype)
+                       if (tryAttachPartitionForeignKey(fk,
+                                                                                        RelationGetRelid(partRel),
+                                                                                        parentConstrOid,
+                                                                                        numfks,
+                                                                                        mapped_conkey,
+                                                                                        confkey,
+                                                                                        conpfeqop))
                        {
-                               ReleaseSysCache(partcontup);
-                               attach_it = false;
-                               continue;
-                       }
-
-                       ReleaseSysCache(partcontup);
-
-                       /*
-                        * Looks good!  Attach this constraint.  The action triggers in
-                        * the new partition become redundant -- the parent table already
-                        * has equivalent ones, and those will be able to reach the
-                        * partition.  Remove the ones in the partition.  We identify them
-                        * because they have our constraint OID, as well as being on the
-                        * referenced rel.
-                        */
-                       trigrel = heap_open(TriggerRelationId, RowExclusiveLock);
-                       ScanKeyInit(&key,
-                                               Anum_pg_trigger_tgconstraint,
-                                               BTEqualStrategyNumber, F_OIDEQ,
-                                               ObjectIdGetDatum(fk->conoid));
-
-                       scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true,
-                                                                         NULL, 1, &key);
-                       while ((trigtup = systable_getnext(scan)) != NULL)
-                       {
-                               Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup);
-                               ObjectAddress   trigger;
-
-                               if (trgform->tgconstrrelid != fk->conrelid)
-                                       continue;
-                               if (trgform->tgrelid != fk->confrelid)
-                                       continue;
-
-                               /*
-                                * The constraint is originally set up to contain this trigger
-                                * as an implementation object, so there's a dependency record
-                                * that links the two; however, since the trigger is no longer
-                                * needed, we remove the dependency link in order to be able
-                                * to drop the trigger while keeping the constraint intact.
-                                */
-                               deleteDependencyRecordsFor(TriggerRelationId,
-                                                                                  trgform->oid,
-                                                                                  false);
-                               /* make dependency deletion visible to performDeletion */
-                               CommandCounterIncrement();
-                               ObjectAddressSet(trigger, TriggerRelationId,
-                                                                trgform->oid);
-                               performDeletion(&trigger, DROP_RESTRICT, 0);
-                               /* make trigger drop visible, in case the loop iterates */
-                               CommandCounterIncrement();
+                               attached = true;
+                               table_close(pkrel, NoLock);
+                               break;
                        }
-
-                       systable_endscan(scan);
-                       table_close(trigrel, RowExclusiveLock);
-
-                       ConstraintSetParentConstraint(fk->conoid, parentConstrOid,
-                                                                                 RelationGetRelid(partRel));
-                       CommandCounterIncrement();
-                       attach_it = true;
-                       break;
                }
-
-               /*
-                * If we attached to an existing constraint, there is no need to
-                * create a new one.  In fact, there's no need to recurse for this
-                * constraint to partitions, either.
-                */
-               if (attach_it)
+               if (attached)
                {
                        ReleaseSysCache(tuple);
                        continue;
                }
 
+               /* No dice.  Set up to create our own constraint */
+               fkconstraint = makeNode(Constraint);
+               if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+                                                                RelationGetRelid(partRel),
+                                                                NameStr(constrForm->conname)))
+                       fkconstraint->conname =
+                               ChooseConstraintName(RelationGetRelationName(partRel),
+                                                                        ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+                                                                        "fkey",
+                                                                        RelationGetNamespace(partRel), NIL);
+               else
+                       fkconstraint->conname = NameStr(constrForm->conname);
+               fkconstraint->fk_upd_action = constrForm->confupdtype;
+               fkconstraint->fk_del_action = constrForm->confdeltype;
+               fkconstraint->deferrable = constrForm->condeferrable;
+               fkconstraint->initdeferred = constrForm->condeferred;
+               fkconstraint->fk_matchtype = constrForm->confmatchtype;
+               for (int i = 0; i < numfks; i++)
+               {
+                       Form_pg_attribute att;
+
+                       att = TupleDescAttr(RelationGetDescr(partRel),
+                                                               mapped_conkey[i] - 1);
+                       fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs,
+                                                                                        makeString(NameStr(att->attname)));
+               }
+
+               indexOid = constrForm->conindid;
                constrOid =
-                       CreateConstraintEntry(NameStr(constrForm->conname),
+                       CreateConstraintEntry(fkconstraint->conname,
                                                                  constrForm->connamespace,
                                                                  CONSTRAINT_FOREIGN,
-                                                                 constrForm->condeferrable,
-                                                                 constrForm->condeferred,
+                                                                 fkconstraint->deferrable,
+                                                                 fkconstraint->initdeferred,
                                                                  constrForm->convalidated,
                                                                  parentConstrOid,
                                                                  RelationGetRelid(partRel),
@@ -8235,92 +8649,191 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
                                                                  numfks,
                                                                  numfks,
                                                                  InvalidOid,   /* not a domain constraint */
-                                                                 constrForm->conindid, /* same index */
+                                                                 indexOid,
                                                                  constrForm->confrelid,        /* same foreign rel */
                                                                  confkey,
                                                                  conpfeqop,
                                                                  conppeqop,
                                                                  conffeqop,
                                                                  numfks,
-                                                                 constrForm->confupdtype,
-                                                                 constrForm->confdeltype,
-                                                                 constrForm->confmatchtype,
+                                                                 fkconstraint->fk_upd_action,
+                                                                 fkconstraint->fk_del_action,
+                                                                 fkconstraint->fk_matchtype,
                                                                  NULL,
                                                                  NULL,
                                                                  NULL,
-                                                                 false,
-                                                                 1, false, true);
-               subclone = lappend_oid(subclone, constrOid);
+                                                                 false,        /* islocal */
+                                                                 1,    /* inhcount */
+                                                                 false,        /* conNoInherit */
+                                                                 true);
 
                /* Set up partition dependencies for the new constraint */
-               ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
-               recordDependencyOn(&childAddr, &parentAddr,
-                                                  DEPENDENCY_PARTITION_PRI);
-               ObjectAddressSet(childTableAddr, RelationRelationId,
+               ObjectAddressSet(address, ConstraintRelationId, constrOid);
+               ObjectAddressSet(referenced, ConstraintRelationId, parentConstrOid);
+               recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI);
+               ObjectAddressSet(referenced, RelationRelationId,
                                                 RelationGetRelid(partRel));
-               recordDependencyOn(&childAddr, &childTableAddr,
-                                                  DEPENDENCY_PARTITION_SEC);
+               recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC);
 
-               fkconstraint = makeNode(Constraint);
-               /* for now this is all we need */
-               fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
-               fkconstraint->fk_upd_action = constrForm->confupdtype;
-               fkconstraint->fk_del_action = constrForm->confdeltype;
-               fkconstraint->deferrable = constrForm->condeferrable;
-               fkconstraint->initdeferred = constrForm->condeferred;
+               /* Done with the cloned constraint's tuple */
+               ReleaseSysCache(tuple);
 
-               createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
-                                                                constrOid, constrForm->conindid, false);
+               /* Make all this visible before recursing */
+               CommandCounterIncrement();
 
-               if (cloned)
-               {
-                       ClonedConstraint *newc;
+               addFkRecurseReferencing(wqueue,
+                                                               fkconstraint,
+                                                               partRel,
+                                                               pkrel,
+                                                               indexOid,
+                                                               constrOid,
+                                                               numfks,
+                                                               confkey,
+                                                               mapped_conkey,
+                                                               conpfeqop,
+                                                               conppeqop,
+                                                               conffeqop,
+                                                               false,  /* no old check exists */
+                                                               AccessExclusiveLock);
+               table_close(pkrel, NoLock);
+       }
+}
 
-                       /*
-                        * Feed back caller about the constraints we created, so that they
-                        * can set up constraint verification.
-                        */
-                       newc = palloc(sizeof(ClonedConstraint));
-                       newc->relid = RelationGetRelid(partRel);
-                       newc->refrelid = constrForm->confrelid;
-                       newc->conindid = constrForm->conindid;
-                       newc->conid = constrOid;
-                       newc->constraint = fkconstraint;
+/*
+ * When the parent of a partition receives [the referencing side of] a foreign
+ * key, we must propagate that foreign key to the partition.  However, the
+ * partition might already have an equivalent foreign key; this routine
+ * compares the given ForeignKeyCacheInfo (in the partition) to the FK defined
+ * by the other parameters.  If they are equivalent, create the link between
+ * the two constraints and return true.
+ *
+ * If the given FK does not match the one defined by rest of the params,
+ * return false.
+ */
+static bool
+tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
+                                                        Oid partRelid,
+                                                        Oid parentConstrOid,
+                                                        int numfks,
+                                                        AttrNumber *mapped_conkey,
+                                                        AttrNumber *confkey,
+                                                        Oid *conpfeqop)
+{
+       HeapTuple       parentConstrTup;
+       Form_pg_constraint parentConstr;
+       HeapTuple       partcontup;
+       Form_pg_constraint partConstr;
+       Relation        trigrel;
+       ScanKeyData key;
+       SysScanDesc scan;
+       HeapTuple       trigtup;
+
+       parentConstrTup = SearchSysCache1(CONSTROID,
+                                                                         ObjectIdGetDatum(parentConstrOid));
+       if (!parentConstrTup)
+               elog(ERROR, "cache lookup failed for constraint %u", parentConstrOid);
+       parentConstr = (Form_pg_constraint) GETSTRUCT(parentConstrTup);
 
-                       *cloned = lappend(*cloned, newc);
+       /*
+        * Do some quick & easy initial checks.  If any of these fail, we cannot
+        * use this constraint.
+        */
+       if (fk->confrelid != parentConstr->confrelid || fk->nkeys != numfks)
+       {
+               ReleaseSysCache(parentConstrTup);
+               return false;
+       }
+       for (int i = 0; i < numfks; i++)
+       {
+               if (fk->conkey[i] != mapped_conkey[i] ||
+                       fk->confkey[i] != confkey[i] ||
+                       fk->conpfeqop[i] != conpfeqop[i])
+               {
+                       ReleaseSysCache(parentConstrTup);
+                       return false;
                }
+       }
 
-               ReleaseSysCache(tuple);
+       /*
+        * Looks good so far; do some more extensive checks.  Presumably the check
+        * for 'convalidated' could be dropped, since we don't really care about
+        * that, but let's be careful for now.
+        */
+       partcontup = SearchSysCache1(CONSTROID,
+                                                                ObjectIdGetDatum(fk->conoid));
+       if (!partcontup)
+               elog(ERROR, "cache lookup failed for constraint %u",
+                        fk->conoid);
+       partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
+       if (OidIsValid(partConstr->conparentid) ||
+               !partConstr->convalidated ||
+               partConstr->condeferrable != parentConstr->condeferrable ||
+               partConstr->condeferred != parentConstr->condeferred ||
+               partConstr->confupdtype != parentConstr->confupdtype ||
+               partConstr->confdeltype != parentConstr->confdeltype ||
+               partConstr->confmatchtype != parentConstr->confmatchtype)
+       {
+               ReleaseSysCache(parentConstrTup);
+               ReleaseSysCache(partcontup);
+               return false;
        }
 
-       pfree(attmap);
-       list_free_deep(partFKs);
+       ReleaseSysCache(partcontup);
+       ReleaseSysCache(parentConstrTup);
 
        /*
-        * If the partition is partitioned, recurse to handle any constraints that
-        * were cloned.
+        * Looks good!  Attach this constraint.  The action triggers in the new
+        * partition become redundant -- the parent table already has equivalent
+        * ones, and those will be able to reach the partition.  Remove the ones
+        * in the partition.  We identify them because they have our constraint
+        * OID, as well as being on the referenced rel.
         */
-       if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
-               subclone != NIL)
+       trigrel = table_open(TriggerRelationId, RowExclusiveLock);
+       ScanKeyInit(&key,
+                               Anum_pg_trigger_tgconstraint,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(fk->conoid));
+
+       scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true,
+                                                         NULL, 1, &key);
+       while ((trigtup = systable_getnext(scan)) != NULL)
        {
-               PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
-               int                     i;
+               Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup);
+               ObjectAddress trigger;
 
-               for (i = 0; i < partdesc->nparts; i++)
-               {
-                       Relation        childRel;
+               if (trgform->tgconstrrelid != fk->conrelid)
+                       continue;
+               if (trgform->tgrelid != fk->confrelid)
+                       continue;
 
-                       childRel = table_open(partdesc->oids[i], AccessExclusiveLock);
-                       CloneFkReferencing(pg_constraint,
-                                                          partRel,
-                                                          childRel,
-                                                          subclone,
-                                                          cloned);
-                       table_close(childRel, NoLock);  /* keep lock till commit */
-               }
+               /*
+                * The constraint is originally set up to contain this trigger as an
+                * implementation object, so there's a dependency record that links
+                * the two; however, since the trigger is no longer needed, we remove
+                * the dependency link in order to be able to drop the trigger while
+                * keeping the constraint intact.
+                */
+               deleteDependencyRecordsFor(TriggerRelationId,
+                                                                  trgform->oid,
+                                                                  false);
+               /* make dependency deletion visible to performDeletion */
+               CommandCounterIncrement();
+               ObjectAddressSet(trigger, TriggerRelationId,
+                                                trgform->oid);
+               performDeletion(&trigger, DROP_RESTRICT, 0);
+               /* make trigger drop visible, in case the loop iterates */
+               CommandCounterIncrement();
        }
+
+       systable_endscan(scan);
+       table_close(trigrel, RowExclusiveLock);
+
+       ConstraintSetParentConstraint(fk->conoid, parentConstrOid, partRelid);
+       CommandCounterIncrement();
+       return true;
 }
 
+
 /*
  * ALTER TABLE ALTER CONSTRAINT
  *
@@ -8440,8 +8953,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
                        /*
                         * Update deferrability of RI_FKey_noaction_del,
                         * RI_FKey_noaction_upd, RI_FKey_check_ins and RI_FKey_check_upd
-                        * triggers, but not others; see createForeignKeyTriggers and
-                        * CreateFKCheckTrigger.
+                        * triggers, but not others; see createForeignKeyActionTriggers
+                        * and CreateFKCheckTrigger.
                         */
                        if (tgform->tgfoid != F_RI_FKEY_NOACTION_DEL &&
                                tgform->tgfoid != F_RI_FKEY_NOACTION_UPD &&
@@ -9348,37 +9861,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
                                                 indexOid, false);
 }
 
-/*
- * Create the triggers that implement an FK constraint.
- *
- * NB: if you change any trigger properties here, see also
- * ATExecAlterConstraint.
- */
-void
-createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
-                                                Oid constraintOid, Oid indexOid, bool create_action)
-{
-       /*
-        * For the referenced side, create action triggers, if requested.  (If the
-        * referencing side is partitioned, there is still only one trigger, which
-        * runs on the referenced side and points to the top of the referencing
-        * hierarchy.)
-        */
-       if (create_action)
-               createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid,
-                                                                          indexOid);
-
-       /*
-        * For the referencing side, create the check triggers.  We only need
-        * these on the partitions.
-        */
-       if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-               createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid,
-                                                                         fkconstraint, constraintOid, indexOid);
-
-       CommandCounterIncrement();
-}
-
 /*
  * ALTER TABLE DROP CONSTRAINT
  *
@@ -14778,8 +15260,6 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
        bool            found_whole_row;
        Oid                     defaultPartOid;
        List       *partBoundConstraint;
-       List       *cloned;
-       ListCell   *l;
 
        /*
         * We must lock the default partition if one exists, because attaching a
@@ -14960,33 +15440,10 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
        CloneRowTriggersToPartition(rel, attachrel);
 
        /*
-        * Clone foreign key constraints, and setup for Phase 3 to verify them.
+        * Clone foreign key constraints.  Callee is responsible for setting up
+        * for phase 3 constraint verification.
         */
-       cloned = NIL;
-       CloneForeignKeyConstraints(RelationGetRelid(rel),
-                                                          RelationGetRelid(attachrel), &cloned);
-       foreach(l, cloned)
-       {
-               ClonedConstraint *clonedcon = lfirst(l);
-               NewConstraint *newcon;
-               Relation        clonedrel;
-               AlteredTableInfo *parttab;
-
-               clonedrel = relation_open(clonedcon->relid, NoLock);
-               parttab = ATGetQueueEntry(wqueue, clonedrel);
-
-               newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
-               newcon->name = clonedcon->constraint->conname;
-               newcon->contype = CONSTR_FOREIGN;
-               newcon->refrelid = clonedcon->refrelid;
-               newcon->refindid = clonedcon->conindid;
-               newcon->conid = clonedcon->conid;
-               newcon->qual = (Node *) clonedcon->constraint;
-
-               parttab->constraints = lappend(parttab->constraints, newcon);
-
-               relation_close(clonedrel, NoLock);
-       }
+       CloneForeignKeyConstraints(wqueue, rel, attachrel);
 
        /*
         * Generate partition constraint from the partition bound specification.
@@ -15177,6 +15634,8 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
                                                                                                  RelationGetRelid(attachrel));
                                update_relispartition(NULL, cldIdxId, true);
                                found = true;
+
+                               CommandCounterIncrement();
                                break;
                        }
                }
@@ -15368,6 +15827,9 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
 
        partRel = table_openrv(name, ShareUpdateExclusiveLock);
 
+       /* Ensure that foreign keys still hold after this detach */
+       ATDetachCheckNoForeignKeyRefs(partRel);
+
        /* All inheritance related checks are performed within the function */
        RemoveInheritance(partRel, rel);
 
@@ -15486,6 +15948,28 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
        }
        list_free_deep(fks);
 
+       /*
+        * Any sub-constrains that are in the referenced-side of a larger
+        * constraint have to be removed.  This partition is no longer part of the
+        * key space of the constraint.
+        */
+       foreach(cell, GetParentedForeignKeyRefs(partRel))
+       {
+               Oid                     constrOid = lfirst_oid(cell);
+               ObjectAddress constraint;
+
+               ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid);
+               deleteDependencyRecordsForClass(ConstraintRelationId,
+                                                                               constrOid,
+                                                                               ConstraintRelationId,
+                                                                               DEPENDENCY_INTERNAL);
+               CommandCounterIncrement();
+
+               ObjectAddressSet(constraint, ConstraintRelationId, constrOid);
+               performDeletion(&constraint, DROP_RESTRICT, 0);
+       }
+       CommandCounterIncrement();
+
        /*
         * Invalidate the parent's relcache so that the partition is no longer
         * included in its partition descriptor.
@@ -15866,3 +16350,107 @@ update_relispartition(Relation classRel, Oid relationId, bool newval)
        if (opened)
                table_close(classRel, RowExclusiveLock);
 }
+
+/*
+ * Return an OID list of constraints that reference the given relation
+ * that are marked as having a parent constraints.
+ */
+static List *
+GetParentedForeignKeyRefs(Relation partition)
+{
+       Relation        pg_constraint;
+       HeapTuple       tuple;
+       SysScanDesc scan;
+       ScanKeyData key[2];
+       List       *constraints = NIL;
+
+       /*
+        * If no indexes, or no columns are referenceable by FKs, we can avoid the
+        * scan.
+        */
+       if (RelationGetIndexList(partition) == NIL ||
+               bms_is_empty(RelationGetIndexAttrBitmap(partition,
+                                                                                               INDEX_ATTR_BITMAP_KEY)))
+               return NIL;
+
+       /* Search for constraints referencing this table */
+       pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
+       ScanKeyInit(&key[0],
+                               Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+                               F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(partition)));
+       ScanKeyInit(&key[1],
+                               Anum_pg_constraint_contype, BTEqualStrategyNumber,
+                               F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+
+       /* XXX This is a seqscan, as we don't have a usable index */
+       scan = systable_beginscan(pg_constraint, InvalidOid, true, NULL, 2, key);
+       while ((tuple = systable_getnext(scan)) != NULL)
+       {
+               Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+               /*
+                * We only need to process constraints that are part of larger ones.
+                */
+               if (!OidIsValid(constrForm->conparentid))
+                       continue;
+
+               constraints = lappend_oid(constraints, constrForm->oid);
+       }
+
+       systable_endscan(scan);
+       table_close(pg_constraint, AccessShareLock);
+
+       return constraints;
+}
+
+/*
+ * During DETACH PARTITION, verify that any foreign keys pointing to the
+ * partitioned table would not become invalid.  An error is raised if any
+ * referenced values exist.
+ */
+static void
+ATDetachCheckNoForeignKeyRefs(Relation partition)
+{
+       List       *constraints;
+       ListCell   *cell;
+
+       constraints = GetParentedForeignKeyRefs(partition);
+
+       foreach(cell, constraints)
+       {
+               Oid                     constrOid = lfirst_oid(cell);
+               HeapTuple       tuple;
+               Form_pg_constraint constrForm;
+               Relation        rel;
+               Trigger         trig;
+
+               tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constrOid));
+               if (!HeapTupleIsValid(tuple))
+                       elog(ERROR, "cache lookup failed for constraint %u", constrOid);
+               constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+               Assert(OidIsValid(constrForm->conparentid));
+               Assert(constrForm->confrelid == RelationGetRelid(partition));
+
+               /* prevent data changes into the referencing table until commit */
+               rel = table_open(constrForm->conrelid, ShareLock);
+
+               MemSet(&trig, 0, sizeof(trig));
+               trig.tgoid = InvalidOid;
+               trig.tgname = NameStr(constrForm->conname);
+               trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN;
+               trig.tgisinternal = true;
+               trig.tgconstrrelid = RelationGetRelid(partition);
+               trig.tgconstrindid = constrForm->conindid;
+               trig.tgconstraint = constrForm->oid;
+               trig.tgdeferrable = false;
+               trig.tginitdeferred = false;
+               /* we needn't fill in remaining fields */
+
+               RI_PartitionRemove_Check(&trig, rel, partition);
+
+               ReleaseSysCache(tuple);
+
+               table_close(rel, NoLock);
+       }
+}
index 72f8a9d69cff69aff2f379120325c9913d88324c..095334b3363491c7ab5bc609b7c1da1247cd2787 100644 (file)
@@ -50,6 +50,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
+#include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
@@ -220,8 +221,8 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
                                 Datum *vals, char *nulls);
 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                                   Relation pk_rel, Relation fk_rel,
-                                  TupleTableSlot *violator, TupleDesc tupdesc,
-                                  int queryno) pg_attribute_noreturn();
+                                  TupleTableSlot *violatorslot, TupleDesc tupdesc,
+                                  int queryno, bool partgone) pg_attribute_noreturn();
 
 
 /*
@@ -348,18 +349,22 @@ RI_FKey_check(TriggerData *trigdata)
                char            paramname[16];
                const char *querysep;
                Oid                     queryoids[RI_MAX_NUMKEYS];
+               const char *pk_only;
 
                /* ----------
                 * The query string built is
-                *      SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
+                *      SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
                 *                 FOR KEY SHARE OF x
                 * The type id's for the $ parameters are those of the
                 * corresponding FK attributes.
                 * ----------
                 */
                initStringInfo(&querybuf);
+               pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+                       "" : "ONLY ";
                quoteRelationName(pkrelname, pk_rel);
-               appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+               appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+                                                pk_only, pkrelname);
                querysep = "WHERE";
                for (int i = 0; i < riinfo->nkeys; i++)
                {
@@ -471,19 +476,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
                char            attname[MAX_QUOTED_NAME_LEN];
                char            paramname[16];
                const char *querysep;
+               const char *pk_only;
                Oid                     queryoids[RI_MAX_NUMKEYS];
 
                /* ----------
                 * The query string built is
-                *      SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
+                *      SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
                 *                 FOR KEY SHARE OF x
                 * The type id's for the $ parameters are those of the
                 * PK attributes themselves.
                 * ----------
                 */
                initStringInfo(&querybuf);
+               pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+                       "" : "ONLY ";
                quoteRelationName(pkrelname, pk_rel);
-               appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+               appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+                                                pk_only, pkrelname);
                querysep = "WHERE";
                for (int i = 0; i < riinfo->nkeys; i++)
                {
@@ -1293,6 +1302,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
        RangeTblEntry *fkrte;
        const char *sep;
        const char *fk_only;
+       const char *pk_only;
        int                     save_nestlevel;
        char            workmembuf[32];
        int                     spi_result;
@@ -1350,7 +1360,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
        /*----------
         * The query string built is:
         *      SELECT fk.keycols FROM [ONLY] relname fk
-        *       LEFT OUTER JOIN ONLY pkrelname pk
+        *       LEFT OUTER JOIN [ONLY] pkrelname pk
         *       ON (pk.pkkeycol1=fk.keycol1 [AND ...])
         *       WHERE pk.pkkeycol1 IS NULL AND
         * For MATCH SIMPLE:
@@ -1377,9 +1387,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
        quoteRelationName(fkrelname, fk_rel);
        fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
                "" : "ONLY ";
+       pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+               "" : "ONLY ";
        appendStringInfo(&querybuf,
-                                        " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON",
-                                        fk_only, fkrelname, pkrelname);
+                                        " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON",
+                                        fk_only, fkrelname, pk_only, pkrelname);
 
        strcpy(pkattname, "pk.");
        strcpy(fkattname, "fk.");
@@ -1530,7 +1542,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
                ri_ReportViolation(&fake_riinfo,
                                                   pk_rel, fk_rel,
                                                   slot, tupdesc,
-                                                  RI_PLAN_CHECK_LOOKUPPK);
+                                                  RI_PLAN_CHECK_LOOKUPPK, false);
 
                ExecDropSingleTupleTableSlot(slot);
        }
@@ -1546,6 +1558,214 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
        return true;
 }
 
+/*
+ * RI_PartitionRemove_Check -
+ *
+ * Verify no referencing values exist, when a partition is detached on
+ * the referenced side of a foreign key constraint.
+ */
+void
+RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
+{
+       const RI_ConstraintInfo *riinfo;
+       StringInfoData querybuf;
+       char       *constraintDef;
+       char            pkrelname[MAX_QUOTED_REL_NAME_LEN];
+       char            fkrelname[MAX_QUOTED_REL_NAME_LEN];
+       char            pkattname[MAX_QUOTED_NAME_LEN + 3];
+       char            fkattname[MAX_QUOTED_NAME_LEN + 3];
+       const char *sep;
+       const char *fk_only;
+       int                     save_nestlevel;
+       char            workmembuf[32];
+       int                     spi_result;
+       SPIPlanPtr      qplan;
+       int                     i;
+
+       riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
+
+       /*
+        * We don't check permissions before displaying the error message, on the
+        * assumption that the user detaching the partition must have enough
+        * privileges to examine the table contents anyhow.
+        */
+
+       /*----------
+        * The query string built is:
+        *  SELECT fk.keycols FROM [ONLY] relname fk
+        *    JOIN pkrelname pk
+        *    ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+        *    WHERE (<partition constraint>) AND
+        * For MATCH SIMPLE:
+        *   (fk.keycol1 IS NOT NULL [AND ...])
+        * For MATCH FULL:
+        *   (fk.keycol1 IS NOT NULL [OR ...])
+        *
+        * We attach COLLATE clauses to the operators when comparing columns
+        * that have different collations.
+        *----------
+        */
+       initStringInfo(&querybuf);
+       appendStringInfoString(&querybuf, "SELECT ");
+       sep = "";
+       for (i = 0; i < riinfo->nkeys; i++)
+       {
+               quoteOneName(fkattname,
+                                        RIAttName(fk_rel, riinfo->fk_attnums[i]));
+               appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
+               sep = ", ";
+       }
+
+       quoteRelationName(pkrelname, pk_rel);
+       quoteRelationName(fkrelname, fk_rel);
+       fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+               "" : "ONLY ";
+       appendStringInfo(&querybuf,
+                                        " FROM %s%s fk JOIN %s pk ON",
+                                        fk_only, fkrelname, pkrelname);
+       strcpy(pkattname, "pk.");
+       strcpy(fkattname, "fk.");
+       sep = "(";
+       for (i = 0; i < riinfo->nkeys; i++)
+       {
+               Oid                     pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+               Oid                     fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+               Oid                     pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
+               Oid                     fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
+
+               quoteOneName(pkattname + 3,
+                                        RIAttName(pk_rel, riinfo->pk_attnums[i]));
+               quoteOneName(fkattname + 3,
+                                        RIAttName(fk_rel, riinfo->fk_attnums[i]));
+               ri_GenerateQual(&querybuf, sep,
+                                               pkattname, pk_type,
+                                               riinfo->pf_eq_oprs[i],
+                                               fkattname, fk_type);
+               if (pk_coll != fk_coll)
+                       ri_GenerateQualCollation(&querybuf, pk_coll);
+               sep = "AND";
+       }
+
+       /*
+        * Start the WHERE clause with the partition constraint (except if this is
+        * the default partition and there's no other partition, because the
+        * partition constraint is the empty string in that case.)
+        */
+       constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk");
+       if (constraintDef && constraintDef[0] != '\0')
+               appendStringInfo(&querybuf, ") WHERE %s AND (",
+                                                constraintDef);
+       else
+               appendStringInfo(&querybuf, ") WHERE (");
+
+       sep = "";
+       for (i = 0; i < riinfo->nkeys; i++)
+       {
+               quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
+               appendStringInfo(&querybuf,
+                                                "%sfk.%s IS NOT NULL",
+                                                sep, fkattname);
+               switch (riinfo->confmatchtype)
+               {
+                       case FKCONSTR_MATCH_SIMPLE:
+                               sep = " AND ";
+                               break;
+                       case FKCONSTR_MATCH_FULL:
+                               sep = " OR ";
+                               break;
+               }
+       }
+       appendStringInfoChar(&querybuf, ')');
+
+       /*
+        * Temporarily increase work_mem so that the check query can be executed
+        * more efficiently.  It seems okay to do this because the query is simple
+        * enough to not use a multiple of work_mem, and one typically would not
+        * have many large foreign-key validations happening concurrently.  So
+        * this seems to meet the criteria for being considered a "maintenance"
+        * operation, and accordingly we use maintenance_work_mem.
+        *
+        * We use the equivalent of a function SET option to allow the setting to
+        * persist for exactly the duration of the check query.  guc.c also takes
+        * care of undoing the setting on error.
+        */
+       save_nestlevel = NewGUCNestLevel();
+
+       snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem);
+       (void) set_config_option("work_mem", workmembuf,
+                                                        PGC_USERSET, PGC_S_SESSION,
+                                                        GUC_ACTION_SAVE, true, 0, false);
+
+       if (SPI_connect() != SPI_OK_CONNECT)
+               elog(ERROR, "SPI_connect failed");
+
+       /*
+        * Generate the plan.  We don't need to cache it, and there are no
+        * arguments to the plan.
+        */
+       qplan = SPI_prepare(querybuf.data, 0, NULL);
+
+       if (qplan == NULL)
+               elog(ERROR, "SPI_prepare returned %s for %s",
+                        SPI_result_code_string(SPI_result), querybuf.data);
+
+       /*
+        * Run the plan.  For safety we force a current snapshot to be used. (In
+        * transaction-snapshot mode, this arguably violates transaction isolation
+        * rules, but we really haven't got much choice.) We don't need to
+        * register the snapshot, because SPI_execute_snapshot will see to it. We
+        * need at most one tuple returned, so pass limit = 1.
+        */
+       spi_result = SPI_execute_snapshot(qplan,
+                                                                         NULL, NULL,
+                                                                         GetLatestSnapshot(),
+                                                                         InvalidSnapshot,
+                                                                         true, false, 1);
+
+       /* Check result */
+       if (spi_result != SPI_OK_SELECT)
+               elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
+
+       /* Did we find a tuple that would violate the constraint? */
+       if (SPI_processed > 0)
+       {
+               TupleTableSlot *slot;
+               HeapTuple       tuple = SPI_tuptable->vals[0];
+               TupleDesc       tupdesc = SPI_tuptable->tupdesc;
+               RI_ConstraintInfo fake_riinfo;
+
+               slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
+
+               heap_deform_tuple(tuple, tupdesc,
+                                                 slot->tts_values, slot->tts_isnull);
+               ExecStoreVirtualTuple(slot);
+
+               /*
+                * The columns to look at in the result tuple are 1..N, not whatever
+                * they are in the fk_rel.  Hack up riinfo so that ri_ReportViolation
+                * will behave properly.
+                *
+                * In addition to this, we have to pass the correct tupdesc to
+                * ri_ReportViolation, overriding its normal habit of using the pk_rel
+                * or fk_rel's tupdesc.
+                */
+               memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
+               for (i = 0; i < fake_riinfo.nkeys; i++)
+                       fake_riinfo.pk_attnums[i] = i + 1;
+
+               ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
+                                                  slot, tupdesc, 0, true);
+       }
+
+       if (SPI_finish() != SPI_OK_FINISH)
+               elog(ERROR, "SPI_finish failed");
+
+       /*
+        * Restore work_mem.
+        */
+       AtEOXact_GUC(true, save_nestlevel);
+}
+
 
 /* ----------
  * Local functions below
@@ -2078,7 +2298,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
                                                   pk_rel, fk_rel,
                                                   newslot ? newslot : oldslot,
                                                   NULL,
-                                                  qkey->constr_queryno);
+                                                  qkey->constr_queryno, false);
 
        return SPI_processed != 0;
 }
@@ -2119,7 +2339,7 @@ static void
 ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                                   Relation pk_rel, Relation fk_rel,
                                   TupleTableSlot *violatorslot, TupleDesc tupdesc,
-                                  int queryno)
+                                  int queryno, bool partgone)
 {
        StringInfoData key_names;
        StringInfoData key_values;
@@ -2158,9 +2378,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
         *
         * Check table-level permissions next and, failing that, column-level
         * privileges.
+        *
+        * When a partition at the referenced side is being detached/dropped, we
+        * needn't check, since the user must be the table owner anyway.
         */
-
-       if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
+       if (partgone)
+               has_perm = true;
+       else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
        {
                aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT);
                if (aclresult != ACLCHECK_OK)
@@ -2222,7 +2446,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                }
        }
 
-       if (onfk)
+       if (partgone)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+                                errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"",
+                                               RelationGetRelationName(pk_rel),
+                                               NameStr(riinfo->conname)),
+                                errdetail("Key (%s)=(%s) still referenced from table \"%s\".",
+                                                  key_names.data, key_values.data,
+                                                  RelationGetRelationName(fk_rel))));
+       else if (onfk)
                ereport(ERROR,
                                (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
                                 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
index 88dc09cae68e5b60fbe3644c18150e1b6cbd0071..7b142e3b188dee63458c30222a6a4cd53e80b0c9 100644 (file)
@@ -1836,6 +1836,24 @@ pg_get_partition_constraintdef(PG_FUNCTION_ARGS)
        PG_RETURN_TEXT_P(string_to_text(consrc));
 }
 
+/*
+ * pg_get_partconstrdef_string
+ *
+ * Returns the partition constraint as a C-string for the input relation, with
+ * the given alias.  No pretty-printing.
+ */
+char *
+pg_get_partconstrdef_string(Oid partitionId, char *aliasname)
+{
+       Expr       *constr_expr;
+       List       *context;
+
+       constr_expr = get_partition_qual_relid(partitionId);
+       context = deparse_context_for(aliasname, partitionId);
+
+       return deparse_expression((Node *) constr_expr, context, true, false);
+}
+
 /*
  * pg_get_constraintdef
  *
index 036810303a68b2cd4f0cdaade9518ef0cc099c9f..f7f7285acca9fdbc3130d86840c5744f79262394 100644 (file)
@@ -2452,9 +2452,12 @@ describeOneTableDetails(const char *schemaname,
                                                                  "  pg_catalog.pg_get_constraintdef(r.oid, true) as condef,\n"
                                                                  "  conrelid::pg_catalog.regclass AS ontable\n"
                                                                  "FROM pg_catalog.pg_constraint r\n"
-                                                                 "WHERE r.conrelid = '%s' AND r.contype = 'f'\n"
-                                                                 "ORDER BY conname;",
+                                                                 "WHERE r.conrelid = '%s' AND r.contype = 'f'\n",
                                                                  oid);
+
+                               if (pset.sversion >= 120000)
+                                       appendPQExpBuffer(&buf, "     AND conparentid = 0\n");
+                               appendPQExpBuffer(&buf, "ORDER BY conname");
                        }
 
                        result = PSQLexec(buf.data);
index ec3bb90b01b1418dafff57a83e67ec485deeae3f..96927b900d696c2ba3264cac522e0c8281bb0891 100644 (file)
@@ -76,10 +76,6 @@ extern void find_composite_type_dependencies(Oid typeOid,
 
 extern void check_of_type(HeapTuple typetuple);
 
-extern void createForeignKeyTriggers(Relation rel, Oid refRelOid,
-                                                Constraint *fkconstraint, Oid constraintOid,
-                                                Oid indexOid, bool create_action);
-
 extern void register_on_commit_action(Oid relid, OnCommitAction action);
 extern void remove_on_commit_action(Oid relid);
 
index 846679ecc123778fcab4dd9e483f92c4351ab5f0..6d460ffd74229a14f5083400627c1e5b68267eda 100644 (file)
@@ -263,6 +263,8 @@ extern bool RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
                                                          TupleTableSlot *old_slot, TupleTableSlot  *new_slot);
 extern bool RI_Initial_Check(Trigger *trigger,
                                 Relation fk_rel, Relation pk_rel);
+extern void RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel,
+                                                Relation pk_rel);
 
 /* result values for RI_FKey_trigger_type: */
 #define RI_TRIGGER_PK  1               /* is a trigger on the PK relation */
index 3ebc01e7147f171c71fb1b5cd6a744b9f98bcae0..7c49e9d0a83bb7fade8d6f67afb1a25b7f7365c1 100644 (file)
@@ -22,6 +22,7 @@ extern char *pg_get_indexdef_string(Oid indexrelid);
 extern char *pg_get_indexdef_columns(Oid indexrelid, bool pretty);
 
 extern char *pg_get_partkeydef_columns(Oid relid, bool pretty);
+extern char *pg_get_partconstrdef_string(Oid partitionId, char *aliasname);
 
 extern char *pg_get_constraintdef_command(Oid constraintId);
 extern char *deparse_expression(Node *expr, List *dpcontext,
diff --git a/src/test/isolation/expected/fk-partitioned-1.out b/src/test/isolation/expected/fk-partitioned-1.out
new file mode 100644 (file)
index 0000000..aea2b6d
--- /dev/null
@@ -0,0 +1,133 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1b s1d s1c s2b s2a s2c
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s1c: commit;
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+ERROR:  insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s1d s2b s1c s2a s2c
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s2b: begin;
+step s1c: commit;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+ERROR:  insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s1d s2b s2a s1c s2c
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1); <waiting ...>
+step s1c: commit;
+step s2a: <... completed>
+error in steps s1c s2a: ERROR:  insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s2b s1d s1c s2a s2c
+step s1b: begin;
+step s2b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s1c: commit;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+ERROR:  insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s2b s1d s2a s1c s2c
+step s1b: begin;
+step s2b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s2a: alter table pfk attach partition pfk1 for values in (1); <waiting ...>
+step s1c: commit;
+step s2a: <... completed>
+error in steps s1c s2a: ERROR:  insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s2b s2a s1d s2c s1c
+step s1b: begin;
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s1d: delete from ppk1 where a = 1; <waiting ...>
+step s2c: commit;
+step s1d: <... completed>
+error in steps s2c s1d: ERROR:  update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s1b s2b s2a s2c s1d s1c
+step s1b: begin;
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s2c: commit;
+step s1d: delete from ppk1 where a = 1;
+ERROR:  update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s2b s1b s1d s1c s2a s2c
+step s2b: begin;
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s1c: commit;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+ERROR:  insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s2b s1b s1d s2a s1c s2c
+step s2b: begin;
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s2a: alter table pfk attach partition pfk1 for values in (1); <waiting ...>
+step s1c: commit;
+step s2a: <... completed>
+error in steps s1c s2a: ERROR:  insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s2b s1b s2a s1d s2c s1c
+step s2b: begin;
+step s1b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s1d: delete from ppk1 where a = 1; <waiting ...>
+step s2c: commit;
+step s1d: <... completed>
+error in steps s2c s1d: ERROR:  update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s2b s1b s2a s2c s1d s1c
+step s2b: begin;
+step s1b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s2c: commit;
+step s1d: delete from ppk1 where a = 1;
+ERROR:  update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s2b s2a s1b s1d s2c s1c
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1; <waiting ...>
+step s2c: commit;
+step s1d: <... completed>
+error in steps s2c s1d: ERROR:  update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s2b s2a s1b s2c s1d s1c
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s1b: begin;
+step s2c: commit;
+step s1d: delete from ppk1 where a = 1;
+ERROR:  update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s2b s2a s2c s1b s1d s1c
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s2c: commit;
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+ERROR:  update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
diff --git a/src/test/isolation/expected/fk-partitioned-2.out b/src/test/isolation/expected/fk-partitioned-2.out
new file mode 100644 (file)
index 0000000..722b615
--- /dev/null
@@ -0,0 +1,70 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1b s1d s2b s2i s1c s2c
+step s1b: begin;
+step s1d: delete from ppk where a = 1;
+step s2b: begin;
+step s2i: insert into pfk values (1); <waiting ...>
+step s1c: commit;
+step s2i: <... completed>
+error in steps s1c s2i: ERROR:  insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s1d s2bs s2i s1c s2c
+step s1b: begin;
+step s1d: delete from ppk where a = 1;
+step s2bs: begin isolation level serializable; select 1;
+?column?       
+
+1              
+step s2i: insert into pfk values (1); <waiting ...>
+step s1c: commit;
+step s2i: <... completed>
+error in steps s1c s2i: ERROR:  could not serialize access due to concurrent update
+step s2c: commit;
+
+starting permutation: s1b s2b s1d s2i s1c s2c
+step s1b: begin;
+step s2b: begin;
+step s1d: delete from ppk where a = 1;
+step s2i: insert into pfk values (1); <waiting ...>
+step s1c: commit;
+step s2i: <... completed>
+error in steps s1c s2i: ERROR:  insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s2bs s1d s2i s1c s2c
+step s1b: begin;
+step s2bs: begin isolation level serializable; select 1;
+?column?       
+
+1              
+step s1d: delete from ppk where a = 1;
+step s2i: insert into pfk values (1); <waiting ...>
+step s1c: commit;
+step s2i: <... completed>
+error in steps s1c s2i: ERROR:  could not serialize access due to concurrent update
+step s2c: commit;
+
+starting permutation: s1b s2b s2i s1d s2c s1c
+step s1b: begin;
+step s2b: begin;
+step s2i: insert into pfk values (1);
+step s1d: delete from ppk where a = 1; <waiting ...>
+step s2c: commit;
+step s1d: <... completed>
+error in steps s2c s1d: ERROR:  update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s1b s2bs s2i s1d s2c s1c
+step s1b: begin;
+step s2bs: begin isolation level serializable; select 1;
+?column?       
+
+1              
+step s2i: insert into pfk values (1);
+step s1d: delete from ppk where a = 1; <waiting ...>
+step s2c: commit;
+step s1d: <... completed>
+error in steps s2c s1d: ERROR:  update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
index f1ae50e5ba8df2d0d7ec72bae09f8e1b562096f7..11cd24fc9812969e878ce9aec2a4ac65b00b68f8 100644 (file)
@@ -24,6 +24,8 @@ test: deadlock-soft-2
 test: fk-contention
 test: fk-deadlock
 test: fk-deadlock2
+test: fk-partitioned-1
+test: fk-partitioned-2
 test: eval-plan-qual
 test: lock-update-delete
 test: lock-update-traversal
diff --git a/src/test/isolation/specs/fk-partitioned-1.spec b/src/test/isolation/specs/fk-partitioned-1.spec
new file mode 100644 (file)
index 0000000..4c760e8
--- /dev/null
@@ -0,0 +1,45 @@
+# Verify that cloning a foreign key constraint to a partition ensures
+# that referenced values exist, even if they're being concurrently
+# deleted.
+setup {
+drop table if exists ppk, pfk, pfk1;
+  create table ppk (a int primary key) partition by list (a);
+  create table ppk1 partition of ppk for values in (1);
+  insert into ppk values (1);
+  create table pfk (a int references ppk) partition by list (a);
+  create table pfk1 (a int not null);
+  insert into pfk1 values (1);
+}
+
+session "s1"
+step "s1b"     {       begin; }
+step "s1d"     {       delete from ppk1 where a = 1; }
+step "s1c"     {       commit; }
+
+session "s2"
+step "s2b"     {       begin; }
+step "s2a"     {       alter table pfk attach partition pfk1 for values in (1); }
+step "s2c"     {       commit; }
+
+teardown       {       drop table ppk, pfk, pfk1; }
+
+permutation "s1b" "s1d" "s1c" "s2b" "s2a" "s2c"
+permutation "s1b" "s1d" "s2b" "s1c" "s2a" "s2c"
+permutation "s1b" "s1d" "s2b" "s2a" "s1c" "s2c"
+#permutation "s1b" "s1d" "s2b" "s2a" "s2c" "s1c"
+permutation "s1b" "s2b" "s1d" "s1c" "s2a" "s2c"
+permutation "s1b" "s2b" "s1d" "s2a" "s1c" "s2c"
+#permutation "s1b" "s2b" "s1d" "s2a" "s2c" "s1c"
+#permutation "s1b" "s2b" "s2a" "s1d" "s1c" "s2c"
+permutation "s1b" "s2b" "s2a" "s1d" "s2c" "s1c"
+permutation "s1b" "s2b" "s2a" "s2c" "s1d" "s1c"
+permutation "s2b" "s1b" "s1d" "s1c" "s2a" "s2c"
+permutation "s2b" "s1b" "s1d" "s2a" "s1c" "s2c"
+#permutation "s2b" "s1b" "s1d" "s2a" "s2c" "s1c"
+#permutation "s2b" "s1b" "s2a" "s1d" "s1c" "s2c"
+permutation "s2b" "s1b" "s2a" "s1d" "s2c" "s1c"
+permutation "s2b" "s1b" "s2a" "s2c" "s1d" "s1c"
+#permutation "s2b" "s2a" "s1b" "s1d" "s1c" "s2c"
+permutation "s2b" "s2a" "s1b" "s1d" "s2c" "s1c"
+permutation "s2b" "s2a" "s1b" "s2c" "s1d" "s1c"
+permutation "s2b" "s2a" "s2c" "s1b" "s1d" "s1c"
diff --git a/src/test/isolation/specs/fk-partitioned-2.spec b/src/test/isolation/specs/fk-partitioned-2.spec
new file mode 100644 (file)
index 0000000..f1a76e8
--- /dev/null
@@ -0,0 +1,29 @@
+# Make sure that FKs referencing partitioned tables actually work.
+setup {
+  drop table if exists ppk, pfk, pfk1;
+  create table ppk (a int primary key) partition by list (a);
+  create table ppk1 partition of ppk for values in (1);
+  insert into ppk values (1);
+  create table pfk (a int references ppk) partition by list (a);
+  create table pfk1 partition of pfk for values in (1);
+}
+
+session "s1"
+step "s1b"     {       begin; }
+step "s1d"     {       delete from ppk where a = 1; }
+step "s1c"     {       commit; }
+
+session "s2"
+step "s2b"     {       begin; }
+step "s2bs"    {       begin isolation level serializable; select 1; }
+step "s2i"     {       insert into pfk values (1); }
+step "s2c"     {       commit; }
+
+teardown       {       drop table ppk, pfk, pfk1; }
+
+permutation "s1b" "s1d"  "s2b"  "s2i" "s1c" "s2c"
+permutation "s1b" "s1d"  "s2bs" "s2i" "s1c" "s2c"
+permutation "s1b" "s2b"  "s1d"  "s2i" "s1c" "s2c"
+permutation "s1b" "s2bs" "s1d"  "s2i" "s1c" "s2c"
+permutation "s1b" "s2b"  "s2i"  "s1d" "s2c" "s1c"
+permutation "s1b" "s2bs" "s2i"  "s1d" "s2c" "s1c"
index 4f7acb9b1efadd563467bbe337f81792ec4d758b..620eb4392bed835d2771e0912e4119d54fdfaedd 100644 (file)
@@ -1532,19 +1532,6 @@ drop table pktable2, fktable2;
 --
 -- Foreign keys and partitioned tables
 --
--- partitioned table in the referenced side are not allowed
-CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
-  PARTITION BY RANGE (a, b);
--- verify with create table first ...
-CREATE TABLE fk_notpartitioned_fk (a int, b int,
-  FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
-ERROR:  cannot reference partitioned table "fk_partitioned_pk"
--- and then with alter table.
-CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
-ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
-  REFERENCES fk_partitioned_pk;
-ERROR:  cannot reference partitioned table "fk_partitioned_pk"
-DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
 -- Creation of a partitioned hierarchy with irregular definitions
 CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
   PRIMARY KEY (a, b));
@@ -1686,7 +1673,7 @@ CREATE TABLE fk_partitioned_fk_full (x int, y int) PARTITION BY RANGE (x);
 CREATE TABLE fk_partitioned_fk_full_1 PARTITION OF fk_partitioned_fk_full DEFAULT;
 INSERT INTO fk_partitioned_fk_full VALUES (1, NULL);
 ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL;  -- fails
-ERROR:  insert or update on table "fk_partitioned_fk_full" violates foreign key constraint "fk_partitioned_fk_full_x_y_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_full_1" violates foreign key constraint "fk_partitioned_fk_full_x_y_fkey"
 DETAIL:  MATCH FULL does not allow mixing of null and nonnull key values.
 TRUNCATE fk_partitioned_fk_full;
 ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL;
@@ -1902,7 +1889,7 @@ CREATE TABLE fk_partitioned_fk_2_2 PARTITION OF fk_partitioned_fk_2 FOR VALUES F
 INSERT INTO fk_partitioned_fk_2 VALUES (1600, 601), (1600, 1601);
 ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
   FOR VALUES IN (1600);
-ERROR:  insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_b_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_2_1" violates foreign key constraint "fk_partitioned_fk_a_b_fkey"
 DETAIL:  Key (a, b)=(1600, 601) is not present in table "fk_notpartitioned_pk".
 INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601);
 ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
@@ -2037,3 +2024,320 @@ drop cascades to table fkpart1.fk_part
 drop cascades to table fkpart1.fk_part_1
 drop cascades to table fkpart0.pkey
 drop cascades to table fkpart0.fk_part
+-- Test a partitioned table as referenced table.
+-- Verify basic functionality with a regular partition creation and a partition
+-- with a different column layout, as well as partitions added (created and
+-- attached) after creating the foreign key.
+CREATE SCHEMA fkpart3;
+SET search_path TO fkpart3;
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (0) TO (1000);
+CREATE TABLE pk2 (b int, a int);
+ALTER TABLE pk2 DROP COLUMN b;
+ALTER TABLE pk2 ALTER a SET NOT NULL;
+ALTER TABLE pk ATTACH PARTITION pk2 FOR VALUES FROM (1000) TO (2000);
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (0) TO (750);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
+CREATE TABLE fk2 (b int, a int) ;
+ALTER TABLE fk2 DROP COLUMN b;
+ALTER TABLE fk ATTACH PARTITION fk2 FOR VALUES FROM (750) TO (3500);
+CREATE TABLE pk3 PARTITION OF pk FOR VALUES FROM (2000) TO (3000);
+CREATE TABLE pk4 (LIKE pk);
+ALTER TABLE pk ATTACH PARTITION pk4 FOR VALUES FROM (3000) TO (4000);
+CREATE TABLE pk5 (c int, b int, a int NOT NULL) PARTITION BY RANGE (a);
+ALTER TABLE pk5 DROP COLUMN b, DROP COLUMN c;
+CREATE TABLE pk51 PARTITION OF pk5 FOR VALUES FROM (4000) TO (4500);
+CREATE TABLE pk52 PARTITION OF pk5 FOR VALUES FROM (4500) TO (5000);
+ALTER TABLE pk ATTACH PARTITION pk5 FOR VALUES FROM (4000) TO (5000);
+CREATE TABLE fk3 PARTITION OF fk FOR VALUES FROM (3500) TO (5000);
+-- these should fail: referenced value not present
+INSERT into fk VALUES (1);
+ERROR:  insert or update on table "fk1" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(1) is not present in table "pk".
+INSERT into fk VALUES (1000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(1000) is not present in table "pk".
+INSERT into fk VALUES (2000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(2000) is not present in table "pk".
+INSERT into fk VALUES (3000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(3000) is not present in table "pk".
+INSERT into fk VALUES (4000);
+ERROR:  insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(4000) is not present in table "pk".
+INSERT into fk VALUES (4500);
+ERROR:  insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(4500) is not present in table "pk".
+-- insert into the referenced table, now they should work
+INSERT into pk VALUES (1), (1000), (2000), (3000), (4000), (4500);
+INSERT into fk VALUES (1), (1000), (2000), (3000), (4000), (4500);
+-- should fail: referencing value present
+DELETE FROM pk WHERE a = 1;
+ERROR:  update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
+DETAIL:  Key (a)=(1) is still referenced from table "fk".
+DELETE FROM pk WHERE a = 1000;
+ERROR:  update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL:  Key (a)=(1000) is still referenced from table "fk".
+DELETE FROM pk WHERE a = 2000;
+ERROR:  update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
+DETAIL:  Key (a)=(2000) is still referenced from table "fk".
+DELETE FROM pk WHERE a = 3000;
+ERROR:  update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
+DETAIL:  Key (a)=(3000) is still referenced from table "fk".
+DELETE FROM pk WHERE a = 4000;
+ERROR:  update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
+DETAIL:  Key (a)=(4000) is still referenced from table "fk".
+DELETE FROM pk WHERE a = 4500;
+ERROR:  update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
+DETAIL:  Key (a)=(4500) is still referenced from table "fk".
+UPDATE pk SET a = 2 WHERE a = 1;
+ERROR:  update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
+DETAIL:  Key (a)=(1) is still referenced from table "fk".
+UPDATE pk SET a = 1002 WHERE a = 1000;
+ERROR:  update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL:  Key (a)=(1000) is still referenced from table "fk".
+UPDATE pk SET a = 2002 WHERE a = 2000;
+ERROR:  update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
+DETAIL:  Key (a)=(2000) is still referenced from table "fk".
+UPDATE pk SET a = 3002 WHERE a = 3000;
+ERROR:  update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
+DETAIL:  Key (a)=(3000) is still referenced from table "fk".
+UPDATE pk SET a = 4002 WHERE a = 4000;
+ERROR:  update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
+DETAIL:  Key (a)=(4000) is still referenced from table "fk".
+UPDATE pk SET a = 4502 WHERE a = 4500;
+ERROR:  update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
+DETAIL:  Key (a)=(4500) is still referenced from table "fk".
+-- now they should work
+DELETE FROM fk;
+UPDATE pk SET a = 2 WHERE a = 1;
+DELETE FROM pk WHERE a = 2;
+UPDATE pk SET a = 1002 WHERE a = 1000;
+DELETE FROM pk WHERE a = 1002;
+UPDATE pk SET a = 2002 WHERE a = 2000;
+DELETE FROM pk WHERE a = 2002;
+UPDATE pk SET a = 3002 WHERE a = 3000;
+DELETE FROM pk WHERE a = 3002;
+UPDATE pk SET a = 4002 WHERE a = 4000;
+DELETE FROM pk WHERE a = 4002;
+UPDATE pk SET a = 4502 WHERE a = 4500;
+DELETE FROM pk WHERE a = 4502;
+CREATE SCHEMA fkpart4;
+SET search_path TO fkpart4;
+-- dropping/detaching PARTITIONs is prevented if that would break
+-- a foreign key's existing data
+CREATE TABLE droppk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE droppk1 PARTITION OF droppk FOR VALUES FROM (0) TO (1000);
+CREATE TABLE droppk_d PARTITION OF droppk DEFAULT;
+CREATE TABLE droppk2 PARTITION OF droppk FOR VALUES FROM (1000) TO (2000)
+  PARTITION BY RANGE (a);
+CREATE TABLE droppk21 PARTITION OF droppk2 FOR VALUES FROM (1000) TO (1400);
+CREATE TABLE droppk2_d PARTITION OF droppk2 DEFAULT;
+INSERT into droppk VALUES (1), (1000), (1500), (2000);
+CREATE TABLE dropfk (a int REFERENCES droppk);
+INSERT into dropfk VALUES (1), (1000), (1500), (2000);
+-- these should all fail
+ALTER TABLE droppk DETACH PARTITION droppk_d;
+ERROR:  removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5"
+DETAIL:  Key (a)=(2000) still referenced from table "dropfk".
+ALTER TABLE droppk2 DETACH PARTITION droppk2_d;
+ERROR:  removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
+DETAIL:  Key (a)=(1500) still referenced from table "dropfk".
+ALTER TABLE droppk DETACH PARTITION droppk1;
+ERROR:  removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1"
+DETAIL:  Key (a)=(1) still referenced from table "dropfk".
+ALTER TABLE droppk DETACH PARTITION droppk2;
+ERROR:  removing partition "droppk2" violates foreign key constraint "dropfk_a_fkey2"
+DETAIL:  Key (a)=(1000) still referenced from table "dropfk".
+ALTER TABLE droppk2 DETACH PARTITION droppk21;
+ERROR:  removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3"
+DETAIL:  Key (a)=(1000) still referenced from table "dropfk".
+-- dropping partitions is disallowed
+DROP TABLE droppk_d;
+ERROR:  cannot drop table droppk_d because other objects depend on it
+DETAIL:  constraint dropfk_a_fkey on table dropfk depends on table droppk_d
+HINT:  Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk2_d;
+ERROR:  cannot drop table droppk2_d because other objects depend on it
+DETAIL:  constraint dropfk_a_fkey on table dropfk depends on table droppk2_d
+HINT:  Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk1;
+ERROR:  cannot drop table droppk1 because other objects depend on it
+DETAIL:  constraint dropfk_a_fkey on table dropfk depends on table droppk1
+HINT:  Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk2;
+ERROR:  cannot drop table droppk2 because other objects depend on it
+DETAIL:  constraint dropfk_a_fkey on table dropfk depends on table droppk2
+HINT:  Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk21;
+ERROR:  cannot drop table droppk21 because other objects depend on it
+DETAIL:  constraint dropfk_a_fkey on table dropfk depends on table droppk21
+HINT:  Use DROP ... CASCADE to drop the dependent objects too.
+DELETE FROM dropfk;
+-- dropping partitions is disallowed, even when no referencing values
+DROP TABLE droppk_d;
+ERROR:  cannot drop table droppk_d because other objects depend on it
+DETAIL:  constraint dropfk_a_fkey on table dropfk depends on table droppk_d
+HINT:  Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk2_d;
+ERROR:  cannot drop table droppk2_d because other objects depend on it
+DETAIL:  constraint dropfk_a_fkey on table dropfk depends on table droppk2_d
+HINT:  Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk1;
+ERROR:  cannot drop table droppk1 because other objects depend on it
+DETAIL:  constraint dropfk_a_fkey on table dropfk depends on table droppk1
+HINT:  Use DROP ... CASCADE to drop the dependent objects too.
+-- but DETACH is allowed, and DROP afterwards works
+ALTER TABLE droppk2 DETACH PARTITION droppk21;
+DROP TABLE droppk2;
+ERROR:  cannot drop table droppk2 because other objects depend on it
+DETAIL:  constraint dropfk_a_fkey on table dropfk depends on table droppk2
+HINT:  Use DROP ... CASCADE to drop the dependent objects too.
+-- Verify that initial constraint creation and cloning behave correctly
+CREATE SCHEMA fkpart5;
+SET search_path TO fkpart5;
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY LIST (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES IN (1) PARTITION BY LIST (a);
+CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES IN (1);
+CREATE TABLE fk (a int) PARTITION BY LIST (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES IN (1) PARTITION BY LIST (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES IN (1);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
+CREATE TABLE pk2 PARTITION OF pk FOR VALUES IN (2);
+CREATE TABLE pk3 (a int NOT NULL) PARTITION BY LIST (a);
+CREATE TABLE pk31 PARTITION OF pk3 FOR VALUES IN (31);
+CREATE TABLE pk32 (b int, a int NOT NULL);
+ALTER TABLE pk32 DROP COLUMN b;
+ALTER TABLE pk3 ATTACH PARTITION pk32 FOR VALUES IN (32);
+ALTER TABLE pk ATTACH PARTITION pk3 FOR VALUES IN (31, 32);
+CREATE TABLE fk2 PARTITION OF fk FOR VALUES IN (2);
+CREATE TABLE fk3 (b int, a int);
+ALTER TABLE fk3 DROP COLUMN b;
+ALTER TABLE fk ATTACH PARTITION fk3 FOR VALUES IN (3);
+SELECT pg_describe_object('pg_constraint'::regclass, oid, 0), confrelid::regclass,
+       CASE WHEN conparentid <> 0 THEN pg_describe_object('pg_constraint'::regclass, conparentid, 0) ELSE 'TOP' END
+FROM pg_catalog.pg_constraint
+WHERE conrelid IN (SELECT relid FROM pg_partition_tree('fk'))
+ORDER BY conrelid::regclass::text, conname;
+         pg_describe_object         | confrelid |               case                
+------------------------------------+-----------+-----------------------------------
+ constraint fk_a_fkey on table fk   | pk        | TOP
+ constraint fk_a_fkey1 on table fk  | pk1       | constraint fk_a_fkey on table fk
+ constraint fk_a_fkey2 on table fk  | pk11      | constraint fk_a_fkey1 on table fk
+ constraint fk_a_fkey3 on table fk  | pk2       | constraint fk_a_fkey on table fk
+ constraint fk_a_fkey4 on table fk  | pk3       | constraint fk_a_fkey on table fk
+ constraint fk_a_fkey5 on table fk  | pk31      | constraint fk_a_fkey4 on table fk
+ constraint fk_a_fkey6 on table fk  | pk32      | constraint fk_a_fkey4 on table fk
+ constraint fk_a_fkey on table fk1  | pk        | constraint fk_a_fkey on table fk
+ constraint fk_a_fkey on table fk11 | pk        | constraint fk_a_fkey on table fk1
+ constraint fk_a_fkey on table fk2  | pk        | constraint fk_a_fkey on table fk
+ constraint fk_a_fkey on table fk3  | pk        | constraint fk_a_fkey on table fk
+(11 rows)
+
+CREATE TABLE fk4 (LIKE fk);
+INSERT INTO fk4 VALUES (50);
+ALTER TABLE fk ATTACH PARTITION fk4 FOR VALUES IN (50);
+ERROR:  insert or update on table "fk4" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(50) is not present in table "pk".
+-- Verify ON UPDATE/DELETE behavior
+CREATE SCHEMA fkpart6;
+SET search_path TO fkpart6;
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES FROM (1) TO (50);
+CREATE TABLE pk12 PARTITION OF pk1 FOR VALUES FROM (50) TO (100);
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE CASCADE ON DELETE CASCADE;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO pk VALUES (1);
+INSERT INTO fk VALUES (1);
+UPDATE pk SET a = 20;
+SELECT tableoid::regclass, * FROM fk;
+ tableoid | a  
+----------+----
+ fk12     | 20
+(1 row)
+
+DELETE FROM pk WHERE a = 20;
+SELECT tableoid::regclass, * FROM fk;
+ tableoid | a 
+----------+---
+(0 rows)
+
+DROP TABLE fk;
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (50);
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET NULL ON DELETE SET NULL;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (20), (50);
+UPDATE pk SET a = 21 WHERE a = 20;
+DELETE FROM pk WHERE a = 50;
+SELECT tableoid::regclass, * FROM fk;
+ tableoid | a 
+----------+---
+ fk_d     |  
+ fk_d     |  
+(2 rows)
+
+DROP TABLE fk;
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (30), (50);
+CREATE TABLE fk (id int, a int DEFAULT 50) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET DEFAULT ON DELETE SET DEFAULT;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (1, 20), (2, 30);
+DELETE FROM pk WHERE a = 20 RETURNING *;
+ a  
+----
+ 20
+(1 row)
+
+UPDATE pk SET a = 90 WHERE a = 30 RETURNING *;
+ a  
+----
+ 90
+(1 row)
+
+SELECT tableoid::regclass, * FROM fk;
+ tableoid | id | a  
+----------+----+----
+ fk12     |  1 | 50
+ fk12     |  2 | 50
+(2 rows)
+
+DROP TABLE fk;
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (30);
+CREATE TABLE fk (a int DEFAULT 50) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE RESTRICT ON DELETE RESTRICT;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (20), (30);
+DELETE FROM pk WHERE a = 20;
+ERROR:  update or delete on table "pk11" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL:  Key (a)=(20) is still referenced from table "fk".
+UPDATE pk SET a = 90 WHERE a = 30;
+ERROR:  update or delete on table "pk11" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL:  Key (a)=(30) is still referenced from table "fk".
+SELECT tableoid::regclass, * FROM fk;
+ tableoid | a  
+----------+----
+ fk12     | 20
+ fk12     | 30
+(2 rows)
+
+DROP TABLE fk;
index a5c7e147a7f1a1a28b68695138bf2b87e8dd8d18..1a8685099e7c9739da65173d1369c4aba71910a9 100644 (file)
@@ -1145,18 +1145,6 @@ drop table pktable2, fktable2;
 -- Foreign keys and partitioned tables
 --
 
--- partitioned table in the referenced side are not allowed
-CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
-  PARTITION BY RANGE (a, b);
--- verify with create table first ...
-CREATE TABLE fk_notpartitioned_fk (a int, b int,
-  FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
--- and then with alter table.
-CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
-ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
-  REFERENCES fk_partitioned_pk;
-DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
-
 -- Creation of a partitioned hierarchy with irregular definitions
 CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
   PRIMARY KEY (a, b));
@@ -1443,3 +1431,204 @@ alter table fkpart2.fk_part_1 drop constraint fkey;     -- ok
 alter table fkpart2.fk_part_1_1 drop constraint my_fkey;       -- doesn't exist
 
 drop schema fkpart0, fkpart1, fkpart2 cascade;
+
+-- Test a partitioned table as referenced table.
+
+-- Verify basic functionality with a regular partition creation and a partition
+-- with a different column layout, as well as partitions added (created and
+-- attached) after creating the foreign key.
+CREATE SCHEMA fkpart3;
+SET search_path TO fkpart3;
+
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (0) TO (1000);
+CREATE TABLE pk2 (b int, a int);
+ALTER TABLE pk2 DROP COLUMN b;
+ALTER TABLE pk2 ALTER a SET NOT NULL;
+ALTER TABLE pk ATTACH PARTITION pk2 FOR VALUES FROM (1000) TO (2000);
+
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (0) TO (750);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
+CREATE TABLE fk2 (b int, a int) ;
+ALTER TABLE fk2 DROP COLUMN b;
+ALTER TABLE fk ATTACH PARTITION fk2 FOR VALUES FROM (750) TO (3500);
+
+CREATE TABLE pk3 PARTITION OF pk FOR VALUES FROM (2000) TO (3000);
+CREATE TABLE pk4 (LIKE pk);
+ALTER TABLE pk ATTACH PARTITION pk4 FOR VALUES FROM (3000) TO (4000);
+
+CREATE TABLE pk5 (c int, b int, a int NOT NULL) PARTITION BY RANGE (a);
+ALTER TABLE pk5 DROP COLUMN b, DROP COLUMN c;
+CREATE TABLE pk51 PARTITION OF pk5 FOR VALUES FROM (4000) TO (4500);
+CREATE TABLE pk52 PARTITION OF pk5 FOR VALUES FROM (4500) TO (5000);
+ALTER TABLE pk ATTACH PARTITION pk5 FOR VALUES FROM (4000) TO (5000);
+
+CREATE TABLE fk3 PARTITION OF fk FOR VALUES FROM (3500) TO (5000);
+
+-- these should fail: referenced value not present
+INSERT into fk VALUES (1);
+INSERT into fk VALUES (1000);
+INSERT into fk VALUES (2000);
+INSERT into fk VALUES (3000);
+INSERT into fk VALUES (4000);
+INSERT into fk VALUES (4500);
+-- insert into the referenced table, now they should work
+INSERT into pk VALUES (1), (1000), (2000), (3000), (4000), (4500);
+INSERT into fk VALUES (1), (1000), (2000), (3000), (4000), (4500);
+
+-- should fail: referencing value present
+DELETE FROM pk WHERE a = 1;
+DELETE FROM pk WHERE a = 1000;
+DELETE FROM pk WHERE a = 2000;
+DELETE FROM pk WHERE a = 3000;
+DELETE FROM pk WHERE a = 4000;
+DELETE FROM pk WHERE a = 4500;
+UPDATE pk SET a = 2 WHERE a = 1;
+UPDATE pk SET a = 1002 WHERE a = 1000;
+UPDATE pk SET a = 2002 WHERE a = 2000;
+UPDATE pk SET a = 3002 WHERE a = 3000;
+UPDATE pk SET a = 4002 WHERE a = 4000;
+UPDATE pk SET a = 4502 WHERE a = 4500;
+-- now they should work
+DELETE FROM fk;
+UPDATE pk SET a = 2 WHERE a = 1;
+DELETE FROM pk WHERE a = 2;
+UPDATE pk SET a = 1002 WHERE a = 1000;
+DELETE FROM pk WHERE a = 1002;
+UPDATE pk SET a = 2002 WHERE a = 2000;
+DELETE FROM pk WHERE a = 2002;
+UPDATE pk SET a = 3002 WHERE a = 3000;
+DELETE FROM pk WHERE a = 3002;
+UPDATE pk SET a = 4002 WHERE a = 4000;
+DELETE FROM pk WHERE a = 4002;
+UPDATE pk SET a = 4502 WHERE a = 4500;
+DELETE FROM pk WHERE a = 4502;
+
+CREATE SCHEMA fkpart4;
+SET search_path TO fkpart4;
+-- dropping/detaching PARTITIONs is prevented if that would break
+-- a foreign key's existing data
+CREATE TABLE droppk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE droppk1 PARTITION OF droppk FOR VALUES FROM (0) TO (1000);
+CREATE TABLE droppk_d PARTITION OF droppk DEFAULT;
+CREATE TABLE droppk2 PARTITION OF droppk FOR VALUES FROM (1000) TO (2000)
+  PARTITION BY RANGE (a);
+CREATE TABLE droppk21 PARTITION OF droppk2 FOR VALUES FROM (1000) TO (1400);
+CREATE TABLE droppk2_d PARTITION OF droppk2 DEFAULT;
+INSERT into droppk VALUES (1), (1000), (1500), (2000);
+CREATE TABLE dropfk (a int REFERENCES droppk);
+INSERT into dropfk VALUES (1), (1000), (1500), (2000);
+-- these should all fail
+ALTER TABLE droppk DETACH PARTITION droppk_d;
+ALTER TABLE droppk2 DETACH PARTITION droppk2_d;
+ALTER TABLE droppk DETACH PARTITION droppk1;
+ALTER TABLE droppk DETACH PARTITION droppk2;
+ALTER TABLE droppk2 DETACH PARTITION droppk21;
+-- dropping partitions is disallowed
+DROP TABLE droppk_d;
+DROP TABLE droppk2_d;
+DROP TABLE droppk1;
+DROP TABLE droppk2;
+DROP TABLE droppk21;
+DELETE FROM dropfk;
+-- dropping partitions is disallowed, even when no referencing values
+DROP TABLE droppk_d;
+DROP TABLE droppk2_d;
+DROP TABLE droppk1;
+-- but DETACH is allowed, and DROP afterwards works
+ALTER TABLE droppk2 DETACH PARTITION droppk21;
+DROP TABLE droppk2;
+
+-- Verify that initial constraint creation and cloning behave correctly
+CREATE SCHEMA fkpart5;
+SET search_path TO fkpart5;
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY LIST (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES IN (1) PARTITION BY LIST (a);
+CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES IN (1);
+CREATE TABLE fk (a int) PARTITION BY LIST (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES IN (1) PARTITION BY LIST (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES IN (1);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
+CREATE TABLE pk2 PARTITION OF pk FOR VALUES IN (2);
+CREATE TABLE pk3 (a int NOT NULL) PARTITION BY LIST (a);
+CREATE TABLE pk31 PARTITION OF pk3 FOR VALUES IN (31);
+CREATE TABLE pk32 (b int, a int NOT NULL);
+ALTER TABLE pk32 DROP COLUMN b;
+ALTER TABLE pk3 ATTACH PARTITION pk32 FOR VALUES IN (32);
+ALTER TABLE pk ATTACH PARTITION pk3 FOR VALUES IN (31, 32);
+CREATE TABLE fk2 PARTITION OF fk FOR VALUES IN (2);
+CREATE TABLE fk3 (b int, a int);
+ALTER TABLE fk3 DROP COLUMN b;
+ALTER TABLE fk ATTACH PARTITION fk3 FOR VALUES IN (3);
+SELECT pg_describe_object('pg_constraint'::regclass, oid, 0), confrelid::regclass,
+       CASE WHEN conparentid <> 0 THEN pg_describe_object('pg_constraint'::regclass, conparentid, 0) ELSE 'TOP' END
+FROM pg_catalog.pg_constraint
+WHERE conrelid IN (SELECT relid FROM pg_partition_tree('fk'))
+ORDER BY conrelid::regclass::text, conname;
+CREATE TABLE fk4 (LIKE fk);
+INSERT INTO fk4 VALUES (50);
+ALTER TABLE fk ATTACH PARTITION fk4 FOR VALUES IN (50);
+
+-- Verify ON UPDATE/DELETE behavior
+CREATE SCHEMA fkpart6;
+SET search_path TO fkpart6;
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES FROM (1) TO (50);
+CREATE TABLE pk12 PARTITION OF pk1 FOR VALUES FROM (50) TO (100);
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE CASCADE ON DELETE CASCADE;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO pk VALUES (1);
+INSERT INTO fk VALUES (1);
+UPDATE pk SET a = 20;
+SELECT tableoid::regclass, * FROM fk;
+DELETE FROM pk WHERE a = 20;
+SELECT tableoid::regclass, * FROM fk;
+DROP TABLE fk;
+
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (50);
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET NULL ON DELETE SET NULL;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (20), (50);
+UPDATE pk SET a = 21 WHERE a = 20;
+DELETE FROM pk WHERE a = 50;
+SELECT tableoid::regclass, * FROM fk;
+DROP TABLE fk;
+
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (30), (50);
+CREATE TABLE fk (id int, a int DEFAULT 50) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET DEFAULT ON DELETE SET DEFAULT;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (1, 20), (2, 30);
+DELETE FROM pk WHERE a = 20 RETURNING *;
+UPDATE pk SET a = 90 WHERE a = 30 RETURNING *;
+SELECT tableoid::regclass, * FROM fk;
+DROP TABLE fk;
+
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (30);
+CREATE TABLE fk (a int DEFAULT 50) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE RESTRICT ON DELETE RESTRICT;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (20), (30);
+DELETE FROM pk WHERE a = 20;
+UPDATE pk SET a = 90 WHERE a = 30;
+SELECT tableoid::regclass, * FROM fk;
+DROP TABLE fk;