From: Alvaro Herrera Date: Fri, 18 Jan 2019 17:49:27 +0000 (-0300) Subject: Move CloneForeignKeyConstraints to tablecmds.c X-Git-Tag: REL_11_2~66 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=fca6cabed17c4960224408d44e3d384b560b78f5;p=postgresql Move CloneForeignKeyConstraints to tablecmds.c My commit 3de241dba86f introduced some code to create a clone of a foreign key to a partition, but I put it in pg_constraint.c because it was too close to the contents of the pg_constraint row. With the previous commit that split out the constraint tuple deconstruction into its own routine, it makes more sense to have the FK-cloning function in tablecmds.c, mostly because its static subroutine can then be used by a future bugfix. My initial posting of this patch had this routine as static in tablecmds.c, but sadly this function is already part of the Postgres 11 ABI as exported from pg_constraint.c, so keep it as exported also just to avoid breaking any possible users of it. --- diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index 6235729f69..5720c652b2 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -38,10 +38,6 @@ #include "utils/tqual.h" -static void clone_fk_constraints(Relation pg_constraint, Relation parentRel, - Relation partRel, List *clone, List **cloned); - - /* * CreateConstraintEntry * Create a constraint table entry. @@ -385,304 +381,6 @@ CreateConstraintEntry(const char *constraintName, return conOid; } -/* - * CloneForeignKeyConstraints - * Clone foreign keys from a partitioned table to a newly acquired - * partition. - * - * relationId is a partition of parentId, so we can be certain that it has the - * same columns with the same datatypes. The columns may be in different - * order, though. - * - * The *cloned list is appended ClonedConstraint elements describing what was - * created. - */ -void -CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) -{ - Relation pg_constraint; - Relation parentRel; - Relation rel; - ScanKeyData key; - SysScanDesc scan; - HeapTuple tuple; - List *clone = NIL; - - parentRel = heap_open(parentId, NoLock); /* already got lock */ - /* see ATAddForeignKeyConstraint about lock level */ - rel = heap_open(relationId, AccessExclusiveLock); - pg_constraint = heap_open(ConstraintRelationId, RowShareLock); - - /* Obtain the list of constraints to clone or attach */ - ScanKeyInit(&key, - Anum_pg_constraint_conrelid, BTEqualStrategyNumber, - F_OIDEQ, ObjectIdGetDatum(parentId)); - scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true, - NULL, 1, &key); - while ((tuple = systable_getnext(scan)) != NULL) - clone = lappend_oid(clone, HeapTupleGetOid(tuple)); - systable_endscan(scan); - - /* Do the actual work, recursing to partitions as needed */ - clone_fk_constraints(pg_constraint, parentRel, rel, clone, cloned); - - /* We're done. Clean up */ - heap_close(parentRel, NoLock); - heap_close(rel, NoLock); /* keep lock till commit */ - heap_close(pg_constraint, RowShareLock); -} - -/* - * clone_fk_constraints - * Recursive subroutine for CloneForeignKeyConstraints - * - * Clone the given list of FK constraints when a partition is attached. - * - * When cloning foreign keys to a partition, it may happen that equivalent - * constraints already exist in the partition for some of them. We can skip - * creating a clone in that case, and instead just attach the existing - * constraint to the one in the parent. - * - * This function recurses to partitions, if the new partition is partitioned; - * of course, only do this for FKs that were actually cloned. - */ -static void -clone_fk_constraints(Relation pg_constraint, Relation parentRel, - Relation partRel, List *clone, List **cloned) -{ - AttrNumber *attmap; - List *partFKs; - List *subclone = NIL; - ListCell *cell; - - /* - * The constraint key may differ, if the columns in the partition are - * different. This map is used to convert them. - */ - attmap = convert_tuples_by_name_map(RelationGetDescr(partRel), - RelationGetDescr(parentRel), - gettext_noop("could not convert row type")); - - partFKs = copyObject(RelationGetFKeyList(partRel)); - - foreach(cell, clone) - { - Oid parentConstrOid = lfirst_oid(cell); - Form_pg_constraint constrForm; - HeapTuple tuple; - AttrNumber conkey[INDEX_MAX_KEYS]; - AttrNumber mapped_conkey[INDEX_MAX_KEYS]; - AttrNumber confkey[INDEX_MAX_KEYS]; - Oid conpfeqop[INDEX_MAX_KEYS]; - Oid conppeqop[INDEX_MAX_KEYS]; - Oid conffeqop[INDEX_MAX_KEYS]; - Constraint *fkconstraint; - bool attach_it; - Oid constrOid; - ObjectAddress parentAddr, - childAddr; - int nelem; - ListCell *cell; - int i; - - tuple = SearchSysCache1(CONSTROID, parentConstrOid); - if (!tuple) - elog(ERROR, "cache lookup failed for constraint %u", - parentConstrOid); - constrForm = (Form_pg_constraint) GETSTRUCT(tuple); - - /* only foreign keys */ - if (constrForm->contype != CONSTRAINT_FOREIGN) - { - ReleaseSysCache(tuple); - continue; - } - - ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); - - DeconstructFkConstraintRow(tuple, &nelem, conkey, confkey, - conpfeqop, conppeqop, conffeqop); - for (i = 0; i < nelem; i++) - mapped_conkey[i] = attmap[conkey[i] - 1]; - - /* - * Before creating a new constraint, see whether any existing FKs are - * fit for the purpose. If one is, attach the parent constraint to it, - * and don't clone anything. This way we avoid the expensive - * verification step and don't end up with a duplicate FK. This also - * means we don't consider this constraint when recursing to - * partitions. - */ - attach_it = false; - foreach(cell, partFKs) - { - ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); - Form_pg_constraint partConstr; - HeapTuple partcontup; - - attach_it = true; - - /* - * Do some quick & easy initial checks. If any of these fail, we - * cannot use this constraint, but keep looking. - */ - if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem) - { - attach_it = false; - continue; - } - for (i = 0; i < nelem; i++) - { - if (fk->conkey[i] != mapped_conkey[i] || - fk->confkey[i] != confkey[i] || - fk->conpfeqop[i] != conpfeqop[i]) - { - attach_it = false; - break; - } - } - if (!attach_it) - continue; - - /* - * Looks good so far; do some more extensive checks. Presumably - * the check for 'convalidated' could be dropped, since we don't - * really care about that, but let's be careful for now. - */ - partcontup = SearchSysCache1(CONSTROID, - ObjectIdGetDatum(fk->conoid)); - if (!partcontup) - elog(ERROR, "cache lookup failed for constraint %u", - fk->conoid); - partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); - if (OidIsValid(partConstr->conparentid) || - !partConstr->convalidated || - partConstr->condeferrable != constrForm->condeferrable || - partConstr->condeferred != constrForm->condeferred || - partConstr->confupdtype != constrForm->confupdtype || - partConstr->confdeltype != constrForm->confdeltype || - partConstr->confmatchtype != constrForm->confmatchtype) - { - ReleaseSysCache(partcontup); - attach_it = false; - continue; - } - - ReleaseSysCache(partcontup); - - /* looks good! Attach this constraint */ - ConstraintSetParentConstraint(fk->conoid, - HeapTupleGetOid(tuple)); - CommandCounterIncrement(); - attach_it = true; - break; - } - - /* - * If we attached to an existing constraint, there is no need to - * create a new one. In fact, there's no need to recurse for this - * constraint to partitions, either. - */ - if (attach_it) - { - ReleaseSysCache(tuple); - continue; - } - - constrOid = - CreateConstraintEntry(NameStr(constrForm->conname), - constrForm->connamespace, - CONSTRAINT_FOREIGN, - constrForm->condeferrable, - constrForm->condeferred, - constrForm->convalidated, - HeapTupleGetOid(tuple), - RelationGetRelid(partRel), - mapped_conkey, - nelem, - nelem, - InvalidOid, /* not a domain constraint */ - constrForm->conindid, /* same index */ - constrForm->confrelid, /* same foreign rel */ - confkey, - conpfeqop, - conppeqop, - conffeqop, - nelem, - constrForm->confupdtype, - constrForm->confdeltype, - constrForm->confmatchtype, - NULL, - NULL, - NULL, - NULL, - false, - 1, false, true); - subclone = lappend_oid(subclone, constrOid); - - ObjectAddressSet(childAddr, ConstraintRelationId, constrOid); - recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO); - - fkconstraint = makeNode(Constraint); - /* for now this is all we need */ - fkconstraint->conname = pstrdup(NameStr(constrForm->conname)); - fkconstraint->fk_upd_action = constrForm->confupdtype; - fkconstraint->fk_del_action = constrForm->confdeltype; - fkconstraint->deferrable = constrForm->condeferrable; - fkconstraint->initdeferred = constrForm->condeferred; - - createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, - constrOid, constrForm->conindid, false); - - if (cloned) - { - ClonedConstraint *newc; - - /* - * Feed back caller about the constraints we created, so that they - * can set up constraint verification. - */ - newc = palloc(sizeof(ClonedConstraint)); - newc->relid = RelationGetRelid(partRel); - newc->refrelid = constrForm->confrelid; - newc->conindid = constrForm->conindid; - newc->conid = constrOid; - newc->constraint = fkconstraint; - - *cloned = lappend(*cloned, newc); - } - - ReleaseSysCache(tuple); - } - - pfree(attmap); - list_free_deep(partFKs); - - /* - * If the partition is partitioned, recurse to handle any constraints that - * were cloned. - */ - if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && - subclone != NIL) - { - PartitionDesc partdesc = RelationGetPartitionDesc(partRel); - int i; - - for (i = 0; i < partdesc->nparts; i++) - { - Relation childRel; - - childRel = heap_open(partdesc->oids[i], AccessExclusiveLock); - clone_fk_constraints(pg_constraint, - partRel, - childRel, - subclone, - cloned); - heap_close(childRel, NoLock); /* keep lock till commit */ - } - } -} - /* * Test whether given name is currently used as a constraint name * for the given object (relation or domain). diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 71b2e3f134..b1b7fe11ac 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -412,6 +412,8 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo * Relation rel, Constraint *fkconstraint, Oid parentConstr, bool recurse, bool recursing, LOCKMODE lockmode); +static void CloneFkReferencing(Relation pg_constraint, Relation parentRel, + Relation partRel, List *clone, List **cloned); static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, @@ -7784,6 +7786,309 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, return address; } +/* + * CloneForeignKeyConstraints + * Clone foreign keys from a partitioned table to a newly acquired + * partition. + * + * relationId is a partition of parentId, so we can be certain that it has the + * same columns with the same datatypes. The columns may be in different + * order, though. + * + * The *cloned list is appended ClonedConstraint elements describing what was + * created, for the purposes of validating the constraint in ALTER TABLE's + * Phase 3. + */ +void +CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) +{ + Relation pg_constraint; + Relation parentRel; + Relation rel; + ScanKeyData key; + SysScanDesc scan; + HeapTuple tuple; + List *clone = NIL; + + parentRel = heap_open(parentId, NoLock); /* already got lock */ + /* see ATAddForeignKeyConstraint about lock level */ + rel = heap_open(relationId, AccessExclusiveLock); + pg_constraint = heap_open(ConstraintRelationId, RowShareLock); + + /* Obtain the list of constraints to clone or attach */ + ScanKeyInit(&key, + Anum_pg_constraint_conrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(parentId)); + scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true, + NULL, 1, &key); + while ((tuple = systable_getnext(scan)) != NULL) + { + Oid oid = HeapTupleGetOid(tuple); + + clone = lappend_oid(clone, oid); + } + systable_endscan(scan); + + /* Do the actual work, recursing to partitions as needed */ + CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned); + + /* We're done. Clean up */ + heap_close(parentRel, NoLock); + heap_close(rel, NoLock); /* keep lock till commit */ + heap_close(pg_constraint, RowShareLock); +} + +/* + * CloneFkReferencing + * Recursive subroutine for CloneForeignKeyConstraints, referencing side + * + * Clone the given list of FK constraints when a partition is attached on the + * referencing side of those constraints. + * + * When cloning foreign keys to a partition, it may happen that equivalent + * constraints already exist in the partition for some of them. We can skip + * creating a clone in that case, and instead just attach the existing + * constraint to the one in the parent. + * + * This function recurses to partitions, if the new partition is partitioned; + * of course, only do this for FKs that were actually cloned. + */ +static void +CloneFkReferencing(Relation pg_constraint, Relation parentRel, + Relation partRel, List *clone, List **cloned) +{ + AttrNumber *attmap; + List *partFKs; + List *subclone = NIL; + ListCell *cell; + + /* + * The constraint key may differ, if the columns in the partition are + * different. This map is used to convert them. + */ + attmap = convert_tuples_by_name_map(RelationGetDescr(partRel), + RelationGetDescr(parentRel), + gettext_noop("could not convert row type")); + + partFKs = copyObject(RelationGetFKeyList(partRel)); + + foreach(cell, clone) + { + Oid parentConstrOid = lfirst_oid(cell); + Form_pg_constraint constrForm; + HeapTuple tuple; + int numfks; + AttrNumber conkey[INDEX_MAX_KEYS]; + AttrNumber mapped_conkey[INDEX_MAX_KEYS]; + AttrNumber confkey[INDEX_MAX_KEYS]; + Oid conpfeqop[INDEX_MAX_KEYS]; + Oid conppeqop[INDEX_MAX_KEYS]; + Oid conffeqop[INDEX_MAX_KEYS]; + Constraint *fkconstraint; + bool attach_it; + Oid constrOid; + ObjectAddress parentAddr, + childAddr; + ListCell *cell; + int i; + + tuple = SearchSysCache1(CONSTROID, parentConstrOid); + if (!tuple) + elog(ERROR, "cache lookup failed for constraint %u", + parentConstrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* only foreign keys */ + if (constrForm->contype != CONSTRAINT_FOREIGN) + { + ReleaseSysCache(tuple); + continue; + } + + ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); + + DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey, + conpfeqop, conppeqop, conffeqop); + for (i = 0; i < numfks; i++) + mapped_conkey[i] = attmap[conkey[i] - 1]; + + /* + * Before creating a new constraint, see whether any existing FKs are + * fit for the purpose. If one is, attach the parent constraint to it, + * and don't clone anything. This way we avoid the expensive + * verification step and don't end up with a duplicate FK. This also + * means we don't consider this constraint when recursing to + * partitions. + */ + attach_it = false; + foreach(cell, partFKs) + { + ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); + Form_pg_constraint partConstr; + HeapTuple partcontup; + + attach_it = true; + + /* + * Do some quick & easy initial checks. If any of these fail, we + * cannot use this constraint, but keep looking. + */ + if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks) + { + attach_it = false; + continue; + } + for (i = 0; i < numfks; i++) + { + if (fk->conkey[i] != mapped_conkey[i] || + fk->confkey[i] != confkey[i] || + fk->conpfeqop[i] != conpfeqop[i]) + { + attach_it = false; + break; + } + } + if (!attach_it) + continue; + + /* + * Looks good so far; do some more extensive checks. Presumably + * the check for 'convalidated' could be dropped, since we don't + * really care about that, but let's be careful for now. + */ + partcontup = SearchSysCache1(CONSTROID, + ObjectIdGetDatum(fk->conoid)); + if (!partcontup) + elog(ERROR, "cache lookup failed for constraint %u", + fk->conoid); + partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); + if (OidIsValid(partConstr->conparentid) || + !partConstr->convalidated || + partConstr->condeferrable != constrForm->condeferrable || + partConstr->condeferred != constrForm->condeferred || + partConstr->confupdtype != constrForm->confupdtype || + partConstr->confdeltype != constrForm->confdeltype || + partConstr->confmatchtype != constrForm->confmatchtype) + { + ReleaseSysCache(partcontup); + attach_it = false; + continue; + } + + ReleaseSysCache(partcontup); + + /* looks good! Attach this constraint */ + ConstraintSetParentConstraint(fk->conoid, parentConstrOid); + CommandCounterIncrement(); + attach_it = true; + break; + } + + /* + * If we attached to an existing constraint, there is no need to + * create a new one. In fact, there's no need to recurse for this + * constraint to partitions, either. + */ + if (attach_it) + { + ReleaseSysCache(tuple); + continue; + } + + constrOid = + CreateConstraintEntry(NameStr(constrForm->conname), + constrForm->connamespace, + CONSTRAINT_FOREIGN, + constrForm->condeferrable, + constrForm->condeferred, + constrForm->convalidated, + parentConstrOid, + RelationGetRelid(partRel), + mapped_conkey, + numfks, + numfks, + InvalidOid, /* not a domain constraint */ + constrForm->conindid, /* same index */ + constrForm->confrelid, /* same foreign rel */ + confkey, + conpfeqop, + conppeqop, + conffeqop, + numfks, + constrForm->confupdtype, + constrForm->confdeltype, + constrForm->confmatchtype, + NULL, + NULL, + NULL, + NULL, + false, + 1, false, true); + subclone = lappend_oid(subclone, constrOid); + + ObjectAddressSet(childAddr, ConstraintRelationId, constrOid); + recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO); + + fkconstraint = makeNode(Constraint); + /* for now this is all we need */ + fkconstraint->conname = pstrdup(NameStr(constrForm->conname)); + fkconstraint->fk_upd_action = constrForm->confupdtype; + fkconstraint->fk_del_action = constrForm->confdeltype; + fkconstraint->deferrable = constrForm->condeferrable; + fkconstraint->initdeferred = constrForm->condeferred; + + createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, + constrOid, constrForm->conindid, false); + + if (cloned) + { + ClonedConstraint *newc; + + /* + * Feed back caller about the constraints we created, so that they + * can set up constraint verification. + */ + newc = palloc(sizeof(ClonedConstraint)); + newc->relid = RelationGetRelid(partRel); + newc->refrelid = constrForm->confrelid; + newc->conindid = constrForm->conindid; + newc->conid = constrOid; + newc->constraint = fkconstraint; + + *cloned = lappend(*cloned, newc); + } + + ReleaseSysCache(tuple); + } + + pfree(attmap); + list_free_deep(partFKs); + + /* + * If the partition is partitioned, recurse to handle any constraints that + * were cloned. + */ + if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + subclone != NIL) + { + PartitionDesc partdesc = RelationGetPartitionDesc(partRel); + int i; + + for (i = 0; i < partdesc->nparts; i++) + { + Relation childRel; + + childRel = heap_open(partdesc->oids[i], AccessExclusiveLock); + CloneFkReferencing(pg_constraint, + partRel, + childRel, + subclone, + cloned); + heap_close(childRel, NoLock); /* keep lock till commit */ + } + } +} + /* * ALTER TABLE ALTER CONSTRAINT * diff --git a/src/include/catalog/pg_constraint.h b/src/include/catalog/pg_constraint.h index 028e11a795..cdea401baf 100644 --- a/src/include/catalog/pg_constraint.h +++ b/src/include/catalog/pg_constraint.h @@ -230,9 +230,6 @@ extern Oid CreateConstraintEntry(const char *constraintName, bool conNoInherit, bool is_internal); -extern void CloneForeignKeyConstraints(Oid parentId, Oid relationId, - List **cloned); - extern void RemoveConstraintById(Oid conId); extern void RenameConstraintById(Oid conId, const char *newname); diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 138de84e83..ac93bd3de8 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -78,6 +78,8 @@ extern void check_of_type(HeapTuple typetuple); extern void createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, Oid constraintOid, Oid indexOid, bool create_action); +extern void CloneForeignKeyConstraints(Oid parentId, Oid relationId, + List **cloned); extern void register_on_commit_action(Oid relid, OnCommitAction action); extern void remove_on_commit_action(Oid relid);