* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/catalog/dependency.c,v 1.11 2002/09/19 23:40:56 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/catalog/dependency.c,v 1.12 2002/09/22 00:37:09 tgl Exp $
*
*-------------------------------------------------------------------------
*/
} ObjectClasses;
/* expansible list of ObjectAddresses */
-typedef struct ObjectAddresses
+typedef struct
{
ObjectAddress *refs; /* => palloc'd array */
int numrefs; /* current number of references */
int maxrefs; /* current size of palloc'd array */
- struct ObjectAddresses *link; /* list link for use in recursion */
} ObjectAddresses;
/* for find_expr_references_walker */
static Oid object_classes[MAX_OCLASS];
+static void findAutoDeletableObjects(const ObjectAddress *object,
+ ObjectAddresses *oktodelete,
+ Relation depRel);
static bool recursiveDeletion(const ObjectAddress *object,
DropBehavior behavior,
const ObjectAddress *callingObject,
- ObjectAddresses *pending,
+ ObjectAddresses *oktodelete,
Relation depRel);
static void doDeletion(const ObjectAddress *object);
static bool find_expr_references_walker(Node *node,
ObjectAddresses *addrs);
static void add_exact_object_address(const ObjectAddress *object,
ObjectAddresses *addrs);
-static void del_object_address(const ObjectAddress *object,
+static bool object_address_present(const ObjectAddress *object,
ObjectAddresses *addrs);
-static void del_object_address_by_index(int index, ObjectAddresses *addrs);
static void term_object_addresses(ObjectAddresses *addrs);
static void init_object_classes(void);
static ObjectClasses getObjectClass(const ObjectAddress *object);
{
char *objDescription;
Relation depRel;
+ ObjectAddresses oktodelete;
/*
* Get object description for possible use in failure message. Must do
*/
depRel = heap_openr(DependRelationName, RowExclusiveLock);
- if (!recursiveDeletion(object, behavior, NULL, NULL, depRel))
+ /*
+ * Construct a list of objects that are reachable by AUTO or INTERNAL
+ * dependencies from the target object. These should be deleted silently,
+ * even if the actual deletion pass first reaches one of them via a
+ * non-auto dependency.
+ */
+ init_object_addresses(&oktodelete);
+
+ findAutoDeletableObjects(object, &oktodelete, depRel);
+
+ if (!recursiveDeletion(object, behavior, NULL, &oktodelete, depRel))
elog(ERROR, "Cannot drop %s because other objects depend on it"
"\n\tUse DROP ... CASCADE to drop the dependent objects too",
objDescription);
+ term_object_addresses(&oktodelete);
+
heap_close(depRel, RowExclusiveLock);
pfree(objDescription);
/*
- * recursiveDeletion: delete a single object for performDeletion.
+ * findAutoDeletableObjects: find all objects that are reachable by AUTO or
+ * INTERNAL dependency paths from the given object. Add them all to the
+ * oktodelete list. Note that the originally given object will also be
+ * added to the list.
+ *
+ * depRel is the already-open pg_depend relation.
+ */
+static void
+findAutoDeletableObjects(const ObjectAddress *object,
+ ObjectAddresses *oktodelete,
+ Relation depRel)
+{
+ ScanKeyData key[3];
+ int nkeys;
+ SysScanDesc scan;
+ HeapTuple tup;
+ ObjectAddress otherObject;
+
+ /*
+ * If this object is already in oktodelete, then we already visited it;
+ * don't do so again (this prevents infinite recursion if there's a loop
+ * in pg_depend). Otherwise, add it.
+ */
+ if (object_address_present(object, oktodelete))
+ return;
+ add_exact_object_address(object, oktodelete);
+
+ /*
+ * Scan pg_depend records that link to this object, showing the things
+ * that depend on it. For each one that is AUTO or INTERNAL, visit the
+ * referencing object.
+ *
+ * When dropping a whole object (subId = 0), find pg_depend records for
+ * its sub-objects too.
+ */
+ ScanKeyEntryInitialize(&key[0], 0x0,
+ Anum_pg_depend_refclassid, F_OIDEQ,
+ ObjectIdGetDatum(object->classId));
+ ScanKeyEntryInitialize(&key[1], 0x0,
+ Anum_pg_depend_refobjid, F_OIDEQ,
+ ObjectIdGetDatum(object->objectId));
+ if (object->objectSubId != 0)
+ {
+ ScanKeyEntryInitialize(&key[2], 0x0,
+ Anum_pg_depend_refobjsubid, F_INT4EQ,
+ Int32GetDatum(object->objectSubId));
+ nkeys = 3;
+ }
+ else
+ nkeys = 2;
+
+ scan = systable_beginscan(depRel, DependReferenceIndex, true,
+ SnapshotNow, nkeys, key);
+
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
+
+ switch (foundDep->deptype)
+ {
+ case DEPENDENCY_NORMAL:
+ /* ignore */
+ break;
+ case DEPENDENCY_AUTO:
+ case DEPENDENCY_INTERNAL:
+ /* recurse */
+ otherObject.classId = foundDep->classid;
+ otherObject.objectId = foundDep->objid;
+ otherObject.objectSubId = foundDep->objsubid;
+ findAutoDeletableObjects(&otherObject, oktodelete, depRel);
+ break;
+ case DEPENDENCY_PIN:
+ /*
+ * For a PIN dependency we just elog immediately; there
+ * won't be any others to examine, and we aren't ever
+ * going to let the user delete it.
+ */
+ elog(ERROR, "Cannot drop %s because it is required by the database system",
+ getObjectDescription(object));
+ break;
+ default:
+ elog(ERROR, "findAutoDeletableObjects: unknown dependency type '%c' for %s",
+ foundDep->deptype, getObjectDescription(object));
+ break;
+ }
+ }
+
+ systable_endscan(scan);
+}
+
+
+/*
+ * recursiveDeletion: delete a single object for performDeletion, plus
+ * (recursively) anything that depends on it.
*
* Returns TRUE if successful, FALSE if not.
*
* callingObject is NULL at the outer level, else identifies the object that
* we recursed from (the reference object that someone else needs to delete).
- * pending is a linked list of objects that outer recursion levels want to
- * delete. We remove the target object from any outer-level list it may
- * appear in.
+ *
+ * oktodelete is a list of objects verified deletable (ie, reachable by one
+ * or more AUTO or INTERNAL dependencies from the original target).
+ *
* depRel is the already-open pg_depend relation.
*
+ *
* In RESTRICT mode, we perform all the deletions anyway, but elog a NOTICE
* and return FALSE if we find a restriction violation. performDeletion
* will then abort the transaction to nullify the deletions. We have to
* while (b) not going into infinite recursion if there's a cycle.
*
* This is even more complex than one could wish, because it is possible for
- * the same pair of objects to be related by both NORMAL and AUTO (or IMPLICIT)
- * dependencies. (Since one or both paths might be indirect, it's very hard
- * to prevent this; we must cope instead.) If there is an AUTO/IMPLICIT
- * deletion path then we should perform the deletion, and not fail because
- * of the NORMAL dependency. So, when we hit a NORMAL dependency we don't
- * immediately decide we've failed; instead we stick the NORMAL dependent
- * object into a list of pending deletions. If we find a legal path to delete
- * that object later on, the recursive call will remove it from our pending
- * list. After we've exhausted all such possibilities, we remove the
- * remaining pending objects anyway, but emit a notice and prepare to return
- * FALSE. (We have to do it this way because the dependent objects *must* be
- * removed before we can remove the object they depend on.)
+ * the same pair of objects to be related by both NORMAL and AUTO/INTERNAL
+ * dependencies. Also, we might have a situation where we've been asked to
+ * delete object A, and objects B and C both have AUTO dependencies on A,
+ * but B also has a NORMAL dependency on C. (Since any of these paths might
+ * be indirect, we can't prevent these scenarios, but must cope instead.)
+ * If we visit C before B then we would mistakenly decide that the B->C link
+ * should prevent the restricted drop from occurring. To handle this, we make
+ * a pre-scan to find all the objects that are auto-deletable from A. If we
+ * visit C first, but B is present in the oktodelete list, then we make no
+ * complaint but recurse to delete B anyway. (Note that in general we must
+ * delete B before deleting C; the drop routine for B may try to access C.)
*
- * Note: in the case where the AUTO path is traversed first, we will never
- * see the NORMAL dependency path because of the pg_depend removals done in
- * recursive executions of step 1. The pending list is necessary essentially
- * just to make the behavior independent of the order in which pg_depend
+ * Note: in the case where the path to B is traversed first, we will not
+ * see the NORMAL dependency when we reach C, because of the pg_depend
+ * removals done in step 1. The oktodelete list is necessary just
+ * to make the behavior independent of the order in which pg_depend
* entries are visited.
*/
static bool
recursiveDeletion(const ObjectAddress *object,
DropBehavior behavior,
const ObjectAddress *callingObject,
- ObjectAddresses *pending,
+ ObjectAddresses *oktodelete,
Relation depRel)
{
bool ok = true;
char *objDescription;
- ObjectAddresses mypending;
ScanKeyData key[3];
int nkeys;
SysScanDesc scan;
*/
objDescription = getObjectDescription(object);
- /*
- * Initialize list of restricted objects, and set up chain link.
- */
- init_object_addresses(&mypending);
- mypending.link = pending;
-
/*
* Step 1: find and remove pg_depend records that link from this
* object to others. We have to do this anyway, and doing it first
* another object. We have three cases:
*
* 1. At the outermost recursion level, disallow the DROP.
- * (We just elog here, rather than considering this drop
- * to be pending, since no other dependencies are likely
- * to be interesting.)
+ * (We just elog here, rather than proceeding, since no
+ * other dependencies are likely to be interesting.)
*/
if (callingObject == NULL)
{
/*
* If we found we are owned by another object, ask it to delete itself
- * instead of proceeding.
+ * instead of proceeding. Complain if RESTRICT mode, unless the other
+ * object is in oktodelete.
*/
if (amOwned)
{
- if (behavior == DROP_RESTRICT)
+ if (object_address_present(&owningObject, oktodelete))
+ elog(DEBUG1, "Drop auto-cascades to %s",
+ getObjectDescription(&owningObject));
+ else if (behavior == DROP_RESTRICT)
{
elog(NOTICE, "%s depends on %s",
getObjectDescription(&owningObject),
if (!recursiveDeletion(&owningObject, behavior,
object,
- pending, depRel))
+ oktodelete, depRel))
ok = false;
pfree(objDescription);
- term_object_addresses(&mypending);
return ok;
}
switch (foundDep->deptype)
{
case DEPENDENCY_NORMAL:
- if (behavior == DROP_RESTRICT)
+ /*
+ * Perhaps there was another dependency path that would
+ * have allowed silent deletion of the otherObject, had
+ * we only taken that path first.
+ * In that case, act like this link is AUTO, too.
+ */
+ if (object_address_present(&otherObject, oktodelete))
+ elog(DEBUG1, "Drop auto-cascades to %s",
+ getObjectDescription(&otherObject));
+ else if (behavior == DROP_RESTRICT)
{
- /*
- * We've found a restricted object (or at least one
- * that's not deletable along this path). Log for
- * later processing. (Note it's okay if the same
- * object gets into mypending multiple times.)
- */
- add_exact_object_address(&otherObject, &mypending);
+ elog(NOTICE, "%s depends on %s",
+ getObjectDescription(&otherObject),
+ objDescription);
+ ok = false;
}
else
- {
elog(NOTICE, "Drop cascades to %s",
getObjectDescription(&otherObject));
- if (!recursiveDeletion(&otherObject, behavior,
- object,
- &mypending, depRel))
- ok = false;
- }
+ if (!recursiveDeletion(&otherObject, behavior,
+ object,
+ oktodelete, depRel))
+ ok = false;
break;
case DEPENDENCY_AUTO:
case DEPENDENCY_INTERNAL:
if (!recursiveDeletion(&otherObject, behavior,
object,
- &mypending, depRel))
+ oktodelete, depRel))
ok = false;
break;
case DEPENDENCY_PIN:
systable_endscan(scan);
- /*
- * If we found no restricted objects, or got rid of them all via other
- * paths, we're in good shape. Otherwise continue step 2 by
- * processing the remaining restricted objects.
- */
- if (mypending.numrefs > 0)
- {
- /*
- * Successively extract and delete each remaining object. Note
- * that the right things will happen if some of these objects
- * depend on others: we'll report/delete each one exactly once.
- */
- while (mypending.numrefs > 0)
- {
- ObjectAddress otherObject = mypending.refs[0];
-
- del_object_address_by_index(0, &mypending);
-
- elog(NOTICE, "%s depends on %s",
- getObjectDescription(&otherObject),
- objDescription);
- if (!recursiveDeletion(&otherObject, behavior,
- object,
- &mypending, depRel))
- ok = false;
- }
-
- ok = false;
- }
-
/*
* We do not need CommandCounterIncrement here, since if step 2 did
* anything then each recursive call will have ended with one.
*/
DeleteComments(object->objectId, object->classId, object->objectSubId);
- /*
- * If this object is mentioned in any caller's pending list, remove
- * it.
- */
- del_object_address(object, pending);
-
/*
* CommandCounterIncrement here to ensure that preceding changes are
* all visible.
* And we're done!
*/
pfree(objDescription);
- term_object_addresses(&mypending);
return ok;
}
addrs->maxrefs = 32; /* arbitrary initial array size */
addrs->refs = (ObjectAddress *)
palloc(addrs->maxrefs * sizeof(ObjectAddress));
- addrs->link = NULL;
/* Initialize object_classes[] if not done yet */
/* This will be needed by add_object_address() */
}
/*
- * If an ObjectAddresses array contains any matches for the given object,
- * remove it/them. Also, do the same in any linked ObjectAddresses arrays.
+ * Test whether an object is present in an ObjectAddresses array.
+ *
+ * We return "true" if object is a subobject of something in the array, too.
*/
-static void
-del_object_address(const ObjectAddress *object,
- ObjectAddresses *addrs)
+static bool
+object_address_present(const ObjectAddress *object,
+ ObjectAddresses *addrs)
{
- for (; addrs != NULL; addrs = addrs->link)
+ int i;
+
+ for (i = addrs->numrefs - 1; i >= 0; i--)
{
- int i;
+ ObjectAddress *thisobj = addrs->refs + i;
- /* Scan backwards to simplify deletion logic. */
- for (i = addrs->numrefs - 1; i >= 0; i--)
+ if (object->classId == thisobj->classId &&
+ object->objectId == thisobj->objectId)
{
- ObjectAddress *thisobj = addrs->refs + i;
-
- if (object->classId == thisobj->classId &&
- object->objectId == thisobj->objectId)
- {
- /*
- * Delete if exact match, or if thisobj is a subobject of
- * the passed-in object.
- */
- if (object->objectSubId == thisobj->objectSubId ||
- object->objectSubId == 0)
- del_object_address_by_index(i, addrs);
- }
+ if (object->objectSubId == thisobj->objectSubId ||
+ thisobj->objectSubId == 0)
+ return true;
}
}
-}
-/*
- * Remove an entry (specified by array index) from an ObjectAddresses array.
- * The end item in the list is moved down to fill the hole.
- */
-static void
-del_object_address_by_index(int index, ObjectAddresses *addrs)
-{
- Assert(index >= 0 && index < addrs->numrefs);
- addrs->refs[index] = addrs->refs[addrs->numrefs - 1];
- addrs->numrefs--;
+ return false;
}
/*
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.229 2002/09/19 23:40:56 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.230 2002/09/22 00:37:09 tgl Exp $
*
*
* INTERFACE ROUTINES
' ',
' ',
' ',
+ InvalidOid, /* no associated index */
expr, /* Tree form check constraint */
ccbin, /* Binary form check constraint */
ccsrc); /* Source form check constraint */
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/catalog/index.c,v 1.196 2002/09/04 20:31:14 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/catalog/index.c,v 1.197 2002/09/22 00:37:09 tgl Exp $
*
*
* INTERFACE ROUTINES
' ',
' ',
' ',
+ InvalidOid, /* no associated index */
NULL, /* no check constraint */
NULL,
NULL);
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/catalog/pg_constraint.c,v 1.6 2002/09/04 20:31:14 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/catalog/pg_constraint.c,v 1.7 2002/09/22 00:37:09 tgl Exp $
*
*-------------------------------------------------------------------------
*/
char foreignUpdateType,
char foreignDeleteType,
char foreignMatchType,
+ Oid indexRelId,
Node *conExpr,
const char *conBin,
const char *conSrc)
}
}
+ if (OidIsValid(indexRelId))
+ {
+ /*
+ * Register normal dependency on the unique index that supports
+ * a foreign-key constraint.
+ */
+ ObjectAddress relobject;
+
+ relobject.classId = RelOid_pg_class;
+ relobject.objectId = indexRelId;
+ relobject.objectSubId = 0;
+
+ recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
+ }
+
if (conExpr != NULL)
{
/*
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.41 2002/09/12 21:16:42 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.42 2002/09/22 00:37:09 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "parser/gramparse.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
+#include "parser/parse_oper.h"
#include "parser/parse_relation.h"
#include "parser/parse_type.h"
#include "utils/acl.h"
static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
static void CheckTupleType(Form_pg_class tuple_class);
static bool needs_toast_table(Relation rel);
+static void AlterTableAddCheckConstraint(Relation rel, Constraint *constr);
+static void AlterTableAddForeignKeyConstraint(Relation rel,
+ FkConstraint *fkconstraint);
+static int transformColumnNameList(Oid relId, List *colList,
+ const char *stmtname,
+ int16 *attnums, Oid *atttypids);
+static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
+ List **attnamelist,
+ int16 *attnums, Oid *atttypids);
+static Oid transformFkeyCheckAttrs(Relation pkrel,
+ int numattrs, int16 *attnums);
static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
Relation rel, Relation pkrel);
-static Oid createForeignKeyConstraint(Relation rel, Relation pkrel,
- FkConstraint *fkconstraint);
static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
Oid constrOid);
static char *fkMatchTypeToString(char match_type);
{
Constraint *constr = (Constraint *) newConstraint;
+ /*
+ * Assign or validate constraint name
+ */
+ if (constr->name)
+ {
+ if (ConstraintNameIsUsed(RelationGetRelid(rel),
+ RelationGetNamespace(rel),
+ constr->name))
+ elog(ERROR, "constraint \"%s\" already exists for relation \"%s\"",
+ constr->name, RelationGetRelationName(rel));
+ }
+ else
+ constr->name = GenerateConstraintName(RelationGetRelid(rel),
+ RelationGetNamespace(rel),
+ &counter);
+
/*
* Currently, we only expect to see CONSTR_CHECK nodes
* arriving here (see the preprocessing done in
switch (constr->contype)
{
case CONSTR_CHECK:
- {
- ParseState *pstate;
- bool successful = true;
- HeapScanDesc scan;
- ExprContext *econtext;
- TupleTableSlot *slot;
- HeapTuple tuple;
- RangeTblEntry *rte;
- List *qual;
- Node *expr;
-
- /*
- * Assign or validate constraint name
- */
- if (constr->name)
- {
- if (ConstraintNameIsUsed(RelationGetRelid(rel),
- RelationGetNamespace(rel),
- constr->name))
- elog(ERROR, "constraint \"%s\" already exists for relation \"%s\"",
- constr->name,
- RelationGetRelationName(rel));
- }
- else
- constr->name = GenerateConstraintName(RelationGetRelid(rel),
- RelationGetNamespace(rel),
- &counter);
-
- /*
- * We need to make a parse state and range
- * table to allow us to transformExpr and
- * fix_opids to get a version of the
- * expression we can pass to ExecQual
- */
- pstate = make_parsestate(NULL);
- rte = addRangeTableEntryForRelation(pstate,
- myrelid,
- makeAlias(RelationGetRelationName(rel), NIL),
- false,
- true);
- addRTEtoQuery(pstate, rte, true, true);
-
- /*
- * Convert the A_EXPR in raw_expr into an
- * EXPR
- */
- expr = transformExpr(pstate, constr->raw_expr);
-
- /*
- * Make sure it yields a boolean result.
- */
- expr = coerce_to_boolean(expr, "CHECK");
-
- /*
- * Make sure no outside relations are
- * referred to.
- */
- if (length(pstate->p_rtable) != 1)
- elog(ERROR, "Only relation '%s' can be referenced in CHECK",
- RelationGetRelationName(rel));
-
- /*
- * No subplans or aggregates, either...
- */
- if (contain_subplans(expr))
- elog(ERROR, "cannot use subselect in CHECK constraint expression");
- if (contain_agg_clause(expr))
- elog(ERROR, "cannot use aggregate function in CHECK constraint expression");
-
- /*
- * Might as well try to reduce any
- * constant expressions.
- */
- expr = eval_const_expressions(expr);
-
- /* And fix the opids */
- fix_opids(expr);
-
- qual = makeList1(expr);
-
- /* Make tuple slot to hold tuples */
- slot = MakeTupleTableSlot();
- ExecSetSlotDescriptor(slot, RelationGetDescr(rel), false);
- /* Make an expression context for ExecQual */
- econtext = MakeExprContext(slot, CurrentMemoryContext);
-
- /*
- * Scan through the rows now, checking the
- * expression at each row.
- */
- scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
-
- while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
- {
- ExecStoreTuple(tuple, slot, InvalidBuffer, false);
- if (!ExecQual(qual, econtext, true))
- {
- successful = false;
- break;
- }
- ResetExprContext(econtext);
- }
-
- heap_endscan(scan);
-
- FreeExprContext(econtext);
- pfree(slot);
-
- if (!successful)
- elog(ERROR, "AlterTableAddConstraint: rejected due to CHECK constraint %s",
- constr->name);
-
- /*
- * Call AddRelationRawConstraints to do
- * the real adding -- It duplicates some
- * of the above, but does not check the
- * validity of the constraint against
- * tuples already in the table.
- */
- AddRelationRawConstraints(rel, NIL,
- makeList1(constr));
-
- break;
- }
+ AlterTableAddCheckConstraint(rel, constr);
+ break;
default:
elog(ERROR, "ALTER TABLE / ADD CONSTRAINT is not implemented for that constraint type.");
}
case T_FkConstraint:
{
FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
- Relation pkrel;
- Oid constrOid;
/*
* Assign or validate constraint name
RelationGetNamespace(rel),
&counter);
- /*
- * Grab an exclusive lock on the pk table, so that
- * someone doesn't delete rows out from under us.
- * (Although a lesser lock would do for that purpose,
- * we'll need exclusive lock anyway to add triggers to
- * the pk table; trying to start with a lesser lock
- * will just create a risk of deadlock.)
- */
- pkrel = heap_openrv(fkconstraint->pktable,
- AccessExclusiveLock);
+ AlterTableAddForeignKeyConstraint(rel, fkconstraint);
- /*
- * Validity checks
- */
- if (pkrel->rd_rel->relkind != RELKIND_RELATION)
- elog(ERROR, "referenced relation \"%s\" is not a table",
- RelationGetRelationName(pkrel));
+ break;
+ }
+ default:
+ elog(ERROR, "ALTER TABLE / ADD CONSTRAINT unable to determine type of constraint passed");
+ }
- if (!allowSystemTableMods
- && IsSystemRelation(pkrel))
- elog(ERROR, "ALTER TABLE: relation \"%s\" is a system catalog",
- RelationGetRelationName(pkrel));
+ /* If we have multiple constraints to make, bump CC between 'em */
+ if (lnext(listptr))
+ CommandCounterIncrement();
+ }
- /* XXX shouldn't there be a permission check too? */
+ /* Close rel, but keep lock till commit */
+ heap_close(rel, NoLock);
+}
- if (isTempNamespace(RelationGetNamespace(pkrel)) &&
- !isTempNamespace(RelationGetNamespace(rel)))
- elog(ERROR, "ALTER TABLE / ADD CONSTRAINT: Unable to reference temporary table from permanent table constraint");
+/*
+ * Add a check constraint to a single table
+ *
+ * Subroutine for AlterTableAddConstraint. Must already hold exclusive
+ * lock on the rel, and have done appropriate validity/permissions checks
+ * for it.
+ */
+static void
+AlterTableAddCheckConstraint(Relation rel, Constraint *constr)
+{
+ ParseState *pstate;
+ bool successful = true;
+ HeapScanDesc scan;
+ ExprContext *econtext;
+ TupleTableSlot *slot;
+ HeapTuple tuple;
+ RangeTblEntry *rte;
+ List *qual;
+ Node *expr;
- /*
- * Check that the constraint is satisfied by existing
- * rows (we can skip this during table creation).
- *
- * NOTE: we assume parser has already checked for
- * existence of an appropriate unique index on the
- * referenced relation, and that the column datatypes
- * are comparable.
- */
- if (!fkconstraint->skip_validation)
- validateForeignKeyConstraint(fkconstraint, rel, pkrel);
+ /*
+ * We need to make a parse state and range
+ * table to allow us to transformExpr and
+ * fix_opids to get a version of the
+ * expression we can pass to ExecQual
+ */
+ pstate = make_parsestate(NULL);
+ rte = addRangeTableEntryForRelation(pstate,
+ RelationGetRelid(rel),
+ makeAlias(RelationGetRelationName(rel), NIL),
+ false,
+ true);
+ addRTEtoQuery(pstate, rte, true, true);
- /*
- * Record the FK constraint in pg_constraint.
- */
- constrOid = createForeignKeyConstraint(rel, pkrel,
- fkconstraint);
+ /*
+ * Convert the A_EXPR in raw_expr into an EXPR
+ */
+ expr = transformExpr(pstate, constr->raw_expr);
- /*
- * Create the triggers that will enforce the
- * constraint.
- */
- createForeignKeyTriggers(rel, fkconstraint, constrOid);
+ /*
+ * Make sure it yields a boolean result.
+ */
+ expr = coerce_to_boolean(expr, "CHECK");
- /*
- * Close pk table, but keep lock until we've
- * committed.
- */
- heap_close(pkrel, NoLock);
+ /*
+ * Make sure no outside relations are referred to.
+ */
+ if (length(pstate->p_rtable) != 1)
+ elog(ERROR, "Only relation '%s' can be referenced in CHECK",
+ RelationGetRelationName(rel));
- break;
+ /*
+ * No subplans or aggregates, either...
+ */
+ if (contain_subplans(expr))
+ elog(ERROR, "cannot use subselect in CHECK constraint expression");
+ if (contain_agg_clause(expr))
+ elog(ERROR, "cannot use aggregate function in CHECK constraint expression");
+
+ /*
+ * Might as well try to reduce any constant expressions.
+ */
+ expr = eval_const_expressions(expr);
+
+ /* And fix the opids */
+ fix_opids(expr);
+
+ qual = makeList1(expr);
+
+ /* Make tuple slot to hold tuples */
+ slot = MakeTupleTableSlot();
+ ExecSetSlotDescriptor(slot, RelationGetDescr(rel), false);
+ /* Make an expression context for ExecQual */
+ econtext = MakeExprContext(slot, CurrentMemoryContext);
+
+ /*
+ * Scan through the rows now, checking the expression at each row.
+ */
+ scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+ if (!ExecQual(qual, econtext, true))
+ {
+ successful = false;
+ break;
+ }
+ ResetExprContext(econtext);
+ }
+
+ heap_endscan(scan);
+
+ FreeExprContext(econtext);
+ pfree(slot);
+
+ if (!successful)
+ elog(ERROR, "AlterTableAddConstraint: rejected due to CHECK constraint %s",
+ constr->name);
+
+ /*
+ * Call AddRelationRawConstraints to do
+ * the real adding -- It duplicates some
+ * of the above, but does not check the
+ * validity of the constraint against
+ * tuples already in the table.
+ */
+ AddRelationRawConstraints(rel, NIL, makeList1(constr));
+}
+
+/*
+ * Add a foreign-key constraint to a single table
+ *
+ * Subroutine for AlterTableAddConstraint. Must already hold exclusive
+ * lock on the rel, and have done appropriate validity/permissions checks
+ * for it.
+ */
+static void
+AlterTableAddForeignKeyConstraint(Relation rel, FkConstraint *fkconstraint)
+{
+ const char *stmtname;
+ Relation pkrel;
+ AclResult aclresult;
+ int16 pkattnum[INDEX_MAX_KEYS];
+ int16 fkattnum[INDEX_MAX_KEYS];
+ Oid pktypoid[INDEX_MAX_KEYS];
+ Oid fktypoid[INDEX_MAX_KEYS];
+ int i;
+ int numfks,
+ numpks;
+ Oid indexOid;
+ Oid constrOid;
+
+ /* cheat a little to discover statement type for error messages */
+ stmtname = fkconstraint->skip_validation ? "CREATE TABLE" : "ALTER TABLE";
+
+ /*
+ * Grab an exclusive lock on the pk table, so that
+ * someone doesn't delete rows out from under us.
+ * (Although a lesser lock would do for that purpose,
+ * we'll need exclusive lock anyway to add triggers to
+ * the pk table; trying to start with a lesser lock
+ * will just create a risk of deadlock.)
+ */
+ pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
+
+ /*
+ * Validity and permissions checks
+ *
+ * Note: REFERENCES permissions checks are redundant with CREATE TRIGGER,
+ * but we may as well error out sooner instead of later.
+ */
+ if (pkrel->rd_rel->relkind != RELKIND_RELATION)
+ elog(ERROR, "referenced relation \"%s\" is not a table",
+ RelationGetRelationName(pkrel));
+
+ if (!allowSystemTableMods
+ && IsSystemRelation(pkrel))
+ elog(ERROR, "%s: relation \"%s\" is a system catalog",
+ stmtname, RelationGetRelationName(pkrel));
+
+ aclresult = pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(),
+ ACL_REFERENCES);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, RelationGetRelationName(pkrel));
+
+ aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
+ ACL_REFERENCES);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, RelationGetRelationName(rel));
+
+ if (isTempNamespace(RelationGetNamespace(pkrel)) &&
+ !isTempNamespace(RelationGetNamespace(rel)))
+ elog(ERROR, "%s: Unable to reference temporary table from permanent table constraint",
+ stmtname);
+
+ /*
+ * Look up the referencing attributes to make sure they
+ * exist, and record their attnums and type OIDs.
+ */
+ for (i = 0; i < INDEX_MAX_KEYS; i++)
+ {
+ pkattnum[i] = fkattnum[i] = 0;
+ pktypoid[i] = fktypoid[i] = InvalidOid;
+ }
+
+ numfks = transformColumnNameList(RelationGetRelid(rel),
+ fkconstraint->fk_attrs,
+ stmtname,
+ fkattnum, fktypoid);
+
+ /*
+ * If the attribute list for the referenced table was omitted,
+ * lookup the definition of the primary key and use it. Otherwise,
+ * validate the supplied attribute list. In either case, discover
+ * the index OID and the attnums and type OIDs of the attributes.
+ */
+ if (fkconstraint->pk_attrs == NIL)
+ {
+ numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
+ &fkconstraint->pk_attrs,
+ pkattnum, pktypoid);
+ }
+ else
+ {
+ numpks = transformColumnNameList(RelationGetRelid(pkrel),
+ fkconstraint->pk_attrs,
+ stmtname,
+ pkattnum, pktypoid);
+ /* Look for an index matching the column list */
+ indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum);
+ }
+
+ /* Be sure referencing and referenced column types are comparable */
+ if (numfks != numpks)
+ elog(ERROR, "%s: number of referencing and referenced attributes for foreign key disagree",
+ stmtname);
+
+ for (i = 0; i < numpks; i++)
+ {
+ /*
+ * fktypoid[i] is the foreign key table's i'th element's type
+ * pktypoid[i] is the primary key table's i'th element's type
+ *
+ * We let oper() do our work for us, including elog(ERROR) if the
+ * types don't compare with =
+ */
+ Operator o = oper(makeList1(makeString("=")),
+ fktypoid[i], pktypoid[i], false);
+
+ ReleaseSysCache(o);
+ }
+
+ /*
+ * Check that the constraint is satisfied by existing
+ * rows (we can skip this during table creation).
+ */
+ if (!fkconstraint->skip_validation)
+ validateForeignKeyConstraint(fkconstraint, rel, pkrel);
+
+ /*
+ * Record the FK constraint in pg_constraint.
+ */
+ constrOid = CreateConstraintEntry(fkconstraint->constr_name,
+ RelationGetNamespace(rel),
+ CONSTRAINT_FOREIGN,
+ fkconstraint->deferrable,
+ fkconstraint->initdeferred,
+ RelationGetRelid(rel),
+ fkattnum,
+ numfks,
+ InvalidOid, /* not a domain constraint */
+ RelationGetRelid(pkrel),
+ pkattnum,
+ numpks,
+ fkconstraint->fk_upd_action,
+ fkconstraint->fk_del_action,
+ fkconstraint->fk_matchtype,
+ indexOid,
+ NULL, /* no check constraint */
+ NULL,
+ NULL);
+
+ /*
+ * Create the triggers that will enforce the constraint.
+ */
+ createForeignKeyTriggers(rel, fkconstraint, constrOid);
+
+ /*
+ * Close pk table, but keep lock until we've committed.
+ */
+ heap_close(pkrel, NoLock);
+}
+
+
+/*
+ * transformColumnNameList - transform list of column names
+ *
+ * Lookup each name and return its attnum and type OID
+ */
+static int
+transformColumnNameList(Oid relId, List *colList,
+ const char *stmtname,
+ int16 *attnums, Oid *atttypids)
+{
+ List *l;
+ int attnum;
+
+ attnum = 0;
+ foreach(l, colList)
+ {
+ char *attname = strVal(lfirst(l));
+ HeapTuple atttuple;
+
+ atttuple = SearchSysCacheAttName(relId, attname);
+ if (!HeapTupleIsValid(atttuple))
+ elog(ERROR, "%s: column \"%s\" referenced in foreign key constraint does not exist",
+ stmtname, attname);
+ if (attnum >= INDEX_MAX_KEYS)
+ elog(ERROR, "Can only have %d keys in a foreign key",
+ INDEX_MAX_KEYS);
+ attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum;
+ atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
+ ReleaseSysCache(atttuple);
+ attnum++;
+ }
+
+ return attnum;
+}
+
+/*
+ * transformFkeyGetPrimaryKey -
+ *
+ * Look up the names, attnums, and types of the primary key attributes
+ * for the pkrel. Used when the column list in the REFERENCES specification
+ * is omitted.
+ */
+static int
+transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
+ List **attnamelist,
+ int16 *attnums, Oid *atttypids)
+{
+ List *indexoidlist,
+ *indexoidscan;
+ HeapTuple indexTuple = NULL;
+ Form_pg_index indexStruct = NULL;
+ int i;
+
+ /*
+ * Get the list of index OIDs for the table from the relcache, and
+ * look up each one in the pg_index syscache until we find one marked
+ * primary key (hopefully there isn't more than one such).
+ */
+ indexoidlist = RelationGetIndexList(pkrel);
+
+ foreach(indexoidscan, indexoidlist)
+ {
+ Oid indexoid = lfirsti(indexoidscan);
+
+ indexTuple = SearchSysCache(INDEXRELID,
+ ObjectIdGetDatum(indexoid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(indexTuple))
+ elog(ERROR, "transformFkeyGetPrimaryKey: index %u not found",
+ indexoid);
+ indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+ if (indexStruct->indisprimary)
+ {
+ *indexOid = indexoid;
+ break;
+ }
+ ReleaseSysCache(indexTuple);
+ indexStruct = NULL;
+ }
+
+ freeList(indexoidlist);
+
+ /*
+ * Check that we found it
+ */
+ if (indexStruct == NULL)
+ elog(ERROR, "PRIMARY KEY for referenced table \"%s\" not found",
+ RelationGetRelationName(pkrel));
+
+ /*
+ * Now build the list of PK attributes from the indkey definition
+ */
+ *attnamelist = NIL;
+ for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++)
+ {
+ int pkattno = indexStruct->indkey[i];
+
+ attnums[i] = pkattno;
+ atttypids[i] = attnumTypeId(pkrel, pkattno);
+ *attnamelist = lappend(*attnamelist,
+ makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
+ }
+
+ ReleaseSysCache(indexTuple);
+
+ return i;
+}
+
+/*
+ * transformFkeyCheckAttrs -
+ *
+ * Make sure that the attributes of a referenced table belong to a unique
+ * (or primary key) constraint. Return the OID of the index supporting
+ * the constraint.
+ */
+static Oid
+transformFkeyCheckAttrs(Relation pkrel,
+ int numattrs, int16 *attnums)
+{
+ Oid indexoid = InvalidOid;
+ bool found = false;
+ List *indexoidlist,
+ *indexoidscan;
+
+ /*
+ * Get the list of index OIDs for the table from the relcache, and
+ * look up each one in the pg_index syscache, and match unique indexes
+ * to the list of attnums we are given.
+ */
+ indexoidlist = RelationGetIndexList(pkrel);
+
+ foreach(indexoidscan, indexoidlist)
+ {
+ HeapTuple indexTuple;
+ Form_pg_index indexStruct;
+ int i, j;
+
+ indexoid = lfirsti(indexoidscan);
+ indexTuple = SearchSysCache(INDEXRELID,
+ ObjectIdGetDatum(indexoid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(indexTuple))
+ elog(ERROR, "transformFkeyCheckAttrs: index %u not found",
+ indexoid);
+ indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+
+ /*
+ * Must be unique, not a functional index, and not a partial index
+ */
+ if (indexStruct->indisunique &&
+ indexStruct->indproc == InvalidOid &&
+ VARSIZE(&indexStruct->indpred) <= VARHDRSZ)
+ {
+ for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++)
+ ;
+ if (i == numattrs)
+ {
+ /*
+ * The given attnum list may match the index columns in any
+ * order. Check that each list is a subset of the other.
+ */
+ for (i = 0; i < numattrs; i++)
+ {
+ found = false;
+ for (j = 0; j < numattrs; j++)
+ {
+ if (attnums[i] == indexStruct->indkey[j])
+ {
+ found = true;
+ break;
+ }
+ }
+ if (!found)
+ break;
}
- default:
- elog(ERROR, "ALTER TABLE / ADD CONSTRAINT unable to determine type of constraint passed");
+ if (found)
+ {
+ for (i = 0; i < numattrs; i++)
+ {
+ found = false;
+ for (j = 0; j < numattrs; j++)
+ {
+ if (attnums[j] == indexStruct->indkey[i])
+ {
+ found = true;
+ break;
+ }
+ }
+ if (!found)
+ break;
+ }
+ }
+ }
}
+ ReleaseSysCache(indexTuple);
+ if (found)
+ break;
}
- /* Close rel, but keep lock till commit */
- heap_close(rel, NoLock);
+ if (!found)
+ elog(ERROR, "UNIQUE constraint matching given keys for referenced table \"%s\" not found",
+ RelationGetRelationName(pkrel));
+
+ freeList(indexoidlist);
+
+ return indexoid;
}
/*
pfree(trig.tgargs);
}
-/*
- * Record an FK constraint in pg_constraint.
- */
-static Oid
-createForeignKeyConstraint(Relation rel, Relation pkrel,
- FkConstraint *fkconstraint)
-{
- int16 *fkattr;
- int16 *pkattr;
- int fkcount;
- int pkcount;
- List *l;
- int i;
-
- /* Convert foreign-key attr names to attr number array */
- fkcount = length(fkconstraint->fk_attrs);
- fkattr = (int16 *) palloc(fkcount * sizeof(int16));
- i = 0;
- foreach(l, fkconstraint->fk_attrs)
- {
- char *id = strVal(lfirst(l));
- AttrNumber attno;
-
- attno = get_attnum(RelationGetRelid(rel), id);
- if (attno == InvalidAttrNumber)
- elog(ERROR, "Relation \"%s\" has no column \"%s\"",
- RelationGetRelationName(rel), id);
- fkattr[i++] = attno;
- }
-
- /* The same for the referenced primary key attrs */
- pkcount = length(fkconstraint->pk_attrs);
- pkattr = (int16 *) palloc(pkcount * sizeof(int16));
- i = 0;
- foreach(l, fkconstraint->pk_attrs)
- {
- char *id = strVal(lfirst(l));
- AttrNumber attno;
-
- attno = get_attnum(RelationGetRelid(pkrel), id);
- if (attno == InvalidAttrNumber)
- elog(ERROR, "Relation \"%s\" has no column \"%s\"",
- RelationGetRelationName(pkrel), id);
- pkattr[i++] = attno;
- }
-
- /* Now we can make the pg_constraint entry */
- return CreateConstraintEntry(fkconstraint->constr_name,
- RelationGetNamespace(rel),
- CONSTRAINT_FOREIGN,
- fkconstraint->deferrable,
- fkconstraint->initdeferred,
- RelationGetRelid(rel),
- fkattr,
- fkcount,
- InvalidOid, /* not a domain constraint */
- RelationGetRelid(pkrel),
- pkattr,
- pkcount,
- fkconstraint->fk_upd_action,
- fkconstraint->fk_del_action,
- fkconstraint->fk_matchtype,
- NULL, /* no check constraint */
- NULL,
- NULL);
-}
-
/*
* Create the triggers that implement an FK constraint.
*/
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Header: /cvsroot/pgsql/src/backend/parser/analyze.c,v 1.249 2002/09/18 21:35:21 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/parser/analyze.c,v 1.250 2002/09/22 00:37:09 tgl Exp $
*
*-------------------------------------------------------------------------
*/
static void transformForUpdate(Query *qry, List *forUpdate);
static void transformConstraintAttrs(List *constraintList);
static void transformColumnType(ParseState *pstate, ColumnDef *column);
-static void transformFkeyCheckAttrs(FkConstraint *fkconstraint, Oid *pktypoid);
-static void transformFkeyGetPrimaryKey(FkConstraint *fkconstraint, Oid *pktypoid);
static bool relationHasPrimaryKey(Oid relationOid);
-static Oid transformFkeyGetColType(CreateStmtContext *cxt, char *colname);
static void release_pstate_resources(ParseState *pstate);
static FromExpr *makeFromExpr(List *fromlist, Node *quals);
static void
transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt)
{
- List *fkactions = NIL;
- List *fkclist;
-
if (cxt->fkconstraints == NIL)
return;
elog(NOTICE, "%s will create implicit trigger(s) for FOREIGN KEY check(s)",
cxt->stmtType);
- foreach(fkclist, cxt->fkconstraints)
+ /*
+ * For ALTER TABLE ADD CONSTRAINT, nothing to do. For CREATE TABLE,
+ * gin up an ALTER TABLE ADD CONSTRAINT command to execute after
+ * the basic CREATE TABLE is complete.
+ *
+ * Note: the ADD CONSTRAINT command must also execute after any index
+ * creation commands. Thus, this should run after
+ * transformIndexConstraints, so that the CREATE INDEX commands are
+ * already in cxt->alist.
+ */
+ if (strcmp(cxt->stmtType, "CREATE TABLE") == 0)
{
- FkConstraint *fkconstraint = (FkConstraint *) lfirst(fkclist);
- Oid pktypoid[INDEX_MAX_KEYS];
- Oid fktypoid[INDEX_MAX_KEYS];
- int i;
- int attnum;
- List *fkattrs;
-
- for (attnum = 0; attnum < INDEX_MAX_KEYS; attnum++)
- pktypoid[attnum] = fktypoid[attnum] = InvalidOid;
-
- /*
- * Look up the referencing attributes to make sure they exist (or
- * will exist) in this table, and remember their type OIDs.
- */
- attnum = 0;
- foreach(fkattrs, fkconstraint->fk_attrs)
- {
- char *fkattr = strVal(lfirst(fkattrs));
+ AlterTableStmt *alterstmt = makeNode(AlterTableStmt);
+ List *fkclist;
- if (attnum >= INDEX_MAX_KEYS)
- elog(ERROR, "Can only have %d keys in a foreign key",
- INDEX_MAX_KEYS);
- fktypoid[attnum++] = transformFkeyGetColType(cxt, fkattr);
- }
+ alterstmt->subtype = 'c'; /* preprocessed add constraint */
+ alterstmt->relation = cxt->relation;
+ alterstmt->name = NULL;
+ alterstmt->def = (Node *) cxt->fkconstraints;
- /*
- * If the attribute list for the referenced table was omitted,
- * lookup the definition of the primary key.
- */
- if (fkconstraint->pk_attrs == NIL)
+ /* Don't need to scan the table contents in this case */
+ foreach(fkclist, cxt->fkconstraints)
{
- if (strcmp(fkconstraint->pktable->relname, cxt->relation->relname) != 0)
- transformFkeyGetPrimaryKey(fkconstraint, pktypoid);
- else if (cxt->pkey != NULL)
- {
- /* Use the to-be-created primary key */
- List *attr;
+ FkConstraint *fkconstraint = (FkConstraint *) lfirst(fkclist);
- attnum = 0;
- foreach(attr, cxt->pkey->indexParams)
- {
- IndexElem *ielem = lfirst(attr);
- char *iname = ielem->name;
-
- Assert(iname); /* no func index here */
- fkconstraint->pk_attrs = lappend(fkconstraint->pk_attrs,
- makeString(iname));
- if (attnum >= INDEX_MAX_KEYS)
- elog(ERROR, "Can only have %d keys in a foreign key",
- INDEX_MAX_KEYS);
- pktypoid[attnum++] = transformFkeyGetColType(cxt,
- iname);
- }
- }
- else
- {
- /* In ALTER TABLE case, primary key may already exist */
- if (OidIsValid(cxt->relOid))
- transformFkeyGetPrimaryKey(fkconstraint, pktypoid);
- else
- elog(ERROR, "PRIMARY KEY for referenced table \"%s\" not found",
- fkconstraint->pktable->relname);
- }
- }
- else
- {
- /* Validate the specified referenced key list */
- if (strcmp(fkconstraint->pktable->relname, cxt->relation->relname) != 0)
- transformFkeyCheckAttrs(fkconstraint, pktypoid);
- else
- {
- /* Look for a matching new unique/primary constraint */
- List *index;
- bool found = false;
-
- foreach(index, cxt->alist)
- {
- IndexStmt *ind = lfirst(index);
- List *pkattrs;
-
- if (!ind->unique)
- continue;
- if (length(ind->indexParams) !=
- length(fkconstraint->pk_attrs))
- continue;
- attnum = 0;
- foreach(pkattrs, fkconstraint->pk_attrs)
- {
- char *pkattr = strVal(lfirst(pkattrs));
- List *indparms;
-
- found = false;
- foreach(indparms, ind->indexParams)
- {
- IndexElem *indparm = lfirst(indparms);
-
- if (indparm->name &&
- strcmp(indparm->name, pkattr) == 0)
- {
- found = true;
- break;
- }
- }
- if (!found)
- break;
- if (attnum >= INDEX_MAX_KEYS)
- elog(ERROR, "Can only have %d keys in a foreign key",
- INDEX_MAX_KEYS);
- pktypoid[attnum++] = transformFkeyGetColType(cxt,
- pkattr);
- }
- if (found)
- break;
- }
- if (!found)
- {
- /*
- * In ALTER TABLE case, such an index may already
- * exist
- */
- if (OidIsValid(cxt->relOid))
- transformFkeyCheckAttrs(fkconstraint, pktypoid);
- else
- elog(ERROR, "UNIQUE constraint matching given keys for referenced table \"%s\" not found",
- fkconstraint->pktable->relname);
- }
- }
- }
-
- /* Be sure referencing and referenced column types are comparable */
- for (i = 0; i < INDEX_MAX_KEYS && fktypoid[i] != 0; i++)
- {
- /*
- * fktypoid[i] is the foreign key table's i'th element's type
- * pktypoid[i] is the primary key table's i'th element's type
- *
- * We let oper() do our work for us, including elog(ERROR) if the
- * types don't compare with =
- */
- Operator o = oper(makeList1(makeString("=")),
- fktypoid[i], pktypoid[i], false);
-
- ReleaseSysCache(o);
- }
-
- /*
- * For ALTER TABLE ADD CONSTRAINT, we're done. For CREATE TABLE,
- * gin up an ALTER TABLE ADD CONSTRAINT command to execute after
- * the basic CREATE TABLE is complete.
- */
- if (strcmp(cxt->stmtType, "CREATE TABLE") == 0)
- {
- AlterTableStmt *alterstmt = makeNode(AlterTableStmt);
-
- alterstmt->subtype = 'c'; /* preprocessed add constraint */
- alterstmt->relation = cxt->relation;
- alterstmt->name = NULL;
- alterstmt->def = (Node *) makeList1(fkconstraint);
-
- /* Don't need to scan the table contents in this case */
fkconstraint->skip_validation = true;
-
- fkactions = lappend(fkactions, (Node *) alterstmt);
}
- }
- /*
- * Attach completed list of extra actions to cxt->alist. We cannot do
- * this earlier, because we assume above that cxt->alist still holds
- * only IndexStmts.
- */
- cxt->alist = nconc(cxt->alist, fkactions);
+ cxt->alist = lappend(cxt->alist, (Node *) alterstmt);
+ }
}
/*
transformAlterTableStmt(ParseState *pstate, AlterTableStmt *stmt,
List **extras_before, List **extras_after)
{
+ Relation rel;
CreateStmtContext cxt;
Query *qry;
* The only subtypes that currently require parse transformation
* handling are 'A'dd column and Add 'C'onstraint. These largely
* re-use code from CREATE TABLE.
+ *
+ * If we need to do any parse transformation, get exclusive lock on
+ * the relation to make sure it won't change before we execute the
+ * command.
*/
switch (stmt->subtype)
{
case 'A':
+ rel = heap_openrv(stmt->relation, AccessExclusiveLock);
+
cxt.stmtType = "ALTER TABLE";
cxt.relation = stmt->relation;
cxt.inhRelations = NIL;
- cxt.relOid = RangeVarGetRelid(stmt->relation, false);
+ cxt.relOid = RelationGetRelid(rel);
cxt.hasoids = SearchSysCacheExists(ATTNUM,
ObjectIdGetDatum(cxt.relOid),
Int16GetDatum(ObjectIdAttributeNumber),
((ColumnDef *) stmt->def)->constraints = cxt.ckconstraints;
*extras_before = nconc(*extras_before, cxt.blist);
*extras_after = nconc(cxt.alist, *extras_after);
+
+ heap_close(rel, NoLock); /* close rel, keep lock */
break;
case 'C':
+ rel = heap_openrv(stmt->relation, AccessExclusiveLock);
+
cxt.stmtType = "ALTER TABLE";
cxt.relation = stmt->relation;
cxt.inhRelations = NIL;
- cxt.relOid = RangeVarGetRelid(stmt->relation, false);
+ cxt.relOid = RelationGetRelid(rel);
cxt.hasoids = SearchSysCacheExists(ATTNUM,
ObjectIdGetDatum(cxt.relOid),
Int16GetDatum(ObjectIdAttributeNumber),
stmt->def = (Node *) nconc(cxt.ckconstraints, cxt.fkconstraints);
*extras_before = nconc(*extras_before, cxt.blist);
*extras_after = nconc(cxt.alist, *extras_after);
+
+ heap_close(rel, NoLock); /* close rel, keep lock */
break;
case 'c':
}
-/*
- * transformFkeyCheckAttrs -
- *
- * Make sure that the attributes of a referenced table
- * belong to a unique (or primary key) constraint.
- */
-static void
-transformFkeyCheckAttrs(FkConstraint *fkconstraint, Oid *pktypoid)
-{
- Relation pkrel;
- List *indexoidlist,
- *indexoidscan;
- int i;
- bool found = false;
-
- /*
- * Open the referenced table
- */
- pkrel = heap_openrv(fkconstraint->pktable, AccessShareLock);
-
- if (pkrel->rd_rel->relkind != RELKIND_RELATION)
- elog(ERROR, "Referenced relation \"%s\" is not a table",
- fkconstraint->pktable->relname);
-
- /*
- * Get the list of index OIDs for the table from the relcache, and
- * look up each one in the pg_index syscache for each unique one, and
- * then compare the attributes we were given to those defined.
- */
- indexoidlist = RelationGetIndexList(pkrel);
-
- foreach(indexoidscan, indexoidlist)
- {
- Oid indexoid = lfirsti(indexoidscan);
- HeapTuple indexTuple;
- Form_pg_index indexStruct;
-
- found = false;
- indexTuple = SearchSysCache(INDEXRELID,
- ObjectIdGetDatum(indexoid),
- 0, 0, 0);
- if (!HeapTupleIsValid(indexTuple))
- elog(ERROR, "transformFkeyCheckAttrs: index %u not found",
- indexoid);
- indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
-
- if (indexStruct->indisunique)
- {
- for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++)
- ;
- if (i == length(fkconstraint->pk_attrs))
- {
- /* go through the fkconstraint->pk_attrs list */
- List *attrl;
- int attnum = 0;
-
- foreach(attrl, fkconstraint->pk_attrs)
- {
- char *attrname = strVal(lfirst(attrl));
-
- found = false;
- for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++)
- {
- int pkattno = indexStruct->indkey[i];
-
- if (namestrcmp(attnumAttName(pkrel, pkattno),
- attrname) == 0)
- {
- pktypoid[attnum++] = attnumTypeId(pkrel, pkattno);
- found = true;
- break;
- }
- }
- if (!found)
- break;
- }
- }
- }
- ReleaseSysCache(indexTuple);
- if (found)
- break;
- }
- if (!found)
- elog(ERROR, "UNIQUE constraint matching given keys for referenced table \"%s\" not found",
- fkconstraint->pktable->relname);
-
- freeList(indexoidlist);
- heap_close(pkrel, AccessShareLock);
-}
-
-
-/*
- * transformFkeyGetPrimaryKey -
- *
- * Try to find the primary key attributes of a referenced table if
- * the column list in the REFERENCES specification was omitted.
- */
-static void
-transformFkeyGetPrimaryKey(FkConstraint *fkconstraint, Oid *pktypoid)
-{
- Relation pkrel;
- List *indexoidlist,
- *indexoidscan;
- HeapTuple indexTuple = NULL;
- Form_pg_index indexStruct = NULL;
- int i;
- int attnum = 0;
-
- /*
- * Open the referenced table
- */
- pkrel = heap_openrv(fkconstraint->pktable, AccessShareLock);
-
- if (pkrel->rd_rel->relkind != RELKIND_RELATION)
- elog(ERROR, "Referenced relation \"%s\" is not a table",
- fkconstraint->pktable->relname);
-
- /*
- * Get the list of index OIDs for the table from the relcache, and
- * look up each one in the pg_index syscache until we find one marked
- * primary key (hopefully there isn't more than one such).
- */
- indexoidlist = RelationGetIndexList(pkrel);
-
- foreach(indexoidscan, indexoidlist)
- {
- Oid indexoid = lfirsti(indexoidscan);
-
- indexTuple = SearchSysCache(INDEXRELID,
- ObjectIdGetDatum(indexoid),
- 0, 0, 0);
- if (!HeapTupleIsValid(indexTuple))
- elog(ERROR, "transformFkeyGetPrimaryKey: index %u not found",
- indexoid);
- indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
- if (indexStruct->indisprimary)
- break;
- ReleaseSysCache(indexTuple);
- indexStruct = NULL;
- }
-
- freeList(indexoidlist);
-
- /*
- * Check that we found it
- */
- if (indexStruct == NULL)
- elog(ERROR, "PRIMARY KEY for referenced table \"%s\" not found",
- fkconstraint->pktable->relname);
-
- /*
- * Now build the list of PK attributes from the indkey definition
- * using the attribute names of the PK relation descriptor
- */
- for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++)
- {
- int pkattno = indexStruct->indkey[i];
-
- pktypoid[attnum++] = attnumTypeId(pkrel, pkattno);
- fkconstraint->pk_attrs = lappend(fkconstraint->pk_attrs,
- makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
- }
-
- ReleaseSysCache(indexTuple);
-
- heap_close(pkrel, AccessShareLock);
-}
-
/*
* relationHasPrimaryKey -
*
return result;
}
-/*
- * transformFkeyGetColType -
- *
- * Find a referencing column by name, and return its type OID.
- * Error if it can't be found.
- */
-static Oid
-transformFkeyGetColType(CreateStmtContext *cxt, char *colname)
-{
- List *cols;
- List *inher;
- Oid result;
- Form_pg_attribute sysatt;
-
- /* First look for column among the newly-created columns */
- foreach(cols, cxt->columns)
- {
- ColumnDef *col = lfirst(cols);
-
- if (strcmp(col->colname, colname) == 0)
- return typenameTypeId(col->typename);
- }
- /* Perhaps it's a system column name */
- sysatt = SystemAttributeByName(colname, cxt->hasoids);
- if (sysatt)
- return sysatt->atttypid;
- /* Look for column among inherited columns (if CREATE TABLE case) */
- foreach(inher, cxt->inhRelations)
- {
- RangeVar *inh = lfirst(inher);
- Relation rel;
- int count;
-
- Assert(IsA(inh, RangeVar));
- rel = heap_openrv(inh, AccessShareLock);
- if (rel->rd_rel->relkind != RELKIND_RELATION)
- elog(ERROR, "inherited table \"%s\" is not a relation",
- inh->relname);
- for (count = 0; count < rel->rd_att->natts; count++)
- {
- Form_pg_attribute inhattr = rel->rd_att->attrs[count];
- char *inhname = NameStr(inhattr->attname);
-
- if (inhattr->attisdropped)
- continue;
- if (strcmp(inhname, colname) == 0)
- {
- result = inhattr->atttypid;
- heap_close(rel, NoLock);
- return result;
- }
- }
- heap_close(rel, NoLock);
- }
- /* Look for column among existing columns (if ALTER TABLE case) */
- if (OidIsValid(cxt->relOid))
- {
- HeapTuple atttuple;
-
- atttuple = SearchSysCacheAttName(cxt->relOid, colname);
- if (HeapTupleIsValid(atttuple))
- {
- result = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
- ReleaseSysCache(atttuple);
- return result;
- }
- }
-
- elog(ERROR, "%s: column \"%s\" referenced in foreign key constraint does not exist",
- cxt->stmtType, colname);
- return InvalidOid; /* keep compiler quiet */
-}
-
/*
* Preprocess a list of column constraint clauses
* to attach constraint attributes to their primary constraint nodes
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: pg_constraint.h,v 1.3 2002/09/04 20:31:37 momjian Exp $
+ * $Id: pg_constraint.h,v 1.4 2002/09/22 00:37:09 tgl Exp $
*
* NOTES
* the genbki.sh script reads this file and generates .bki
char foreignUpdateType,
char foreignDeleteType,
char foreignMatchType,
+ Oid indexRelId,
Node *conExpr,
const char *conBin,
const char *conSrc);
-- Try (and fail) to add constraint due to invalide destination columns explicitly given
ALTER TABLE tmp3 add constraint tmpconstr foreign key(a) references tmp2(b) match full;
NOTICE: ALTER TABLE will create implicit trigger(s) for FOREIGN KEY check(s)
-ERROR: UNIQUE constraint matching given keys for referenced table "tmp2" not found
+ERROR: ALTER TABLE: column "b" referenced in foreign key constraint does not exist
-- Try (and fail) to add constraint due to invalid data
ALTER TABLE tmp3 add constraint tmpconstr foreign key (a) references tmp2 match full;
NOTICE: ALTER TABLE will create implicit trigger(s) for FOREIGN KEY check(s)
ERROR: ALTER TABLE: column "........pg.dropped.1........" referenced in foreign key constraint does not exist
alter table atacc2 add foreign key (id) references atacc1(a);
NOTICE: ALTER TABLE will create implicit trigger(s) for FOREIGN KEY check(s)
-ERROR: UNIQUE constraint matching given keys for referenced table "atacc1" not found
+ERROR: ALTER TABLE: column "a" referenced in foreign key constraint does not exist
alter table atacc2 add foreign key (id) references atacc1("........pg.dropped.1........");
NOTICE: ALTER TABLE will create implicit trigger(s) for FOREIGN KEY check(s)
-ERROR: UNIQUE constraint matching given keys for referenced table "atacc1" not found
+ERROR: ALTER TABLE: column "........pg.dropped.1........" referenced in foreign key constraint does not exist
drop table atacc2;
create index "testing_idx" on atacc1(a);
ERROR: DefineIndex: attribute "a" not found
ERROR: CREATE TABLE: column "ftest2" referenced in foreign key constraint does not exist
CREATE TABLE FKTABLE_FAIL2 ( ftest1 int, CONSTRAINT fkfail1 FOREIGN KEY (ftest1) REFERENCES PKTABLE(ptest2));
NOTICE: CREATE TABLE will create implicit trigger(s) for FOREIGN KEY check(s)
-ERROR: UNIQUE constraint matching given keys for referenced table "pktable" not found
+ERROR: CREATE TABLE: column "ptest2" referenced in foreign key constraint does not exist
DROP TABLE FKTABLE_FAIL1;
ERROR: table "fktable_fail1" does not exist
DROP TABLE FKTABLE_FAIL2;
DROP VIEW atestv2;
-- this should cascade to drop atestv4
DROP VIEW atestv3 CASCADE;
-NOTICE: Drop cascades to rule _RETURN on view atestv3
NOTICE: Drop cascades to rule _RETURN on view atestv4
NOTICE: Drop cascades to view atestv4
-- this should complain "does not exist"