From: Tom Lane Date: Sat, 4 Jan 2003 00:46:08 +0000 (+0000) Subject: Partial code review for ALTER DOMAIN patch. Incorporates Rod Taylor's X-Git-Tag: REL7_4_BETA1~1297 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=17194f4112ad7399d5929ac928068eae7cf3c84a;p=postgresql Partial code review for ALTER DOMAIN patch. Incorporates Rod Taylor's patches of 9-Dec (permissions fix) and 13-Dec (performance) as well as a partial fix for locking issues: concurrent DROP COLUMN should not create trouble anymore. But concurrent DROP TABLE is still a risk, and there is no protection at all against creating a column of a domain while we are altering the domain. --- diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c index c088aaac99..33f2fa7c2b 100644 --- a/src/backend/commands/typecmds.c +++ b/src/backend/commands/typecmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/typecmds.c,v 1.25 2002/12/15 16:17:43 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/typecmds.c,v 1.26 2003/01/04 00:46:08 tgl Exp $ * * DESCRIPTION * The "DefineFoo" routines take the parse tree and pick out the @@ -39,6 +39,7 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_constraint.h" +#include "catalog/pg_depend.h" #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/tablecmds.h" @@ -59,20 +60,25 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" + +/* result structure for get_rels_with_domain() */ +typedef struct +{ + Relation rel; /* opened and locked relation */ + int natts; /* number of attributes of interest */ + int *atts; /* attribute numbers */ + /* atts[] is of allocated length RelationGetNumberOfAttributes(rel) */ +} RelToCheck; + + static Oid findTypeIOFunction(List *procname, Oid typeOid, bool isOutput); -static List *get_rels_with_domain(Oid domainOid); -static void domainPermissionCheck(HeapTuple tup, TypeName *typename); +static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode); +static void domainOwnerCheck(HeapTuple tup, TypeName *typename); static char *domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, int typMod, Constraint *constr, int *counter, char *domainName); -typedef struct -{ - Oid relOid; - int natts; - int *atts; -} relToCheck; /* * DefineType @@ -942,7 +948,7 @@ AlterDomainDefault(List *names, Node *defaultRaw) TypeNameToString(typename)); /* Doesn't return if user isn't allowed to alter the domain */ - domainPermissionCheck(tup, typename); + domainOwnerCheck(tup, typename); /* Setup new tuple */ MemSet(new_record, (Datum) 0, sizeof(new_record)); @@ -1029,12 +1035,8 @@ AlterDomainNotNull(List *names, bool notNull) { TypeName *typename; Oid domainoid; + Relation typrel; HeapTuple tup; - Relation rel; - Datum new_record[Natts_pg_type]; - char new_record_nulls[Natts_pg_type]; - char new_record_repl[Natts_pg_type]; - HeapTuple newtuple; Form_pg_type typTup; /* Make a TypeName so we can use standard type lookup machinery */ @@ -1044,9 +1046,9 @@ AlterDomainNotNull(List *names, bool notNull) typename->arrayBounds = NIL; /* Lock the type table */ - rel = heap_openr(TypeRelationName, RowExclusiveLock); + typrel = heap_openr(TypeRelationName, RowExclusiveLock); - /* Use LookupTypeName here so that shell types can be removed. */ + /* Use LookupTypeName here so that shell types can be found (why?). */ domainoid = LookupTypeName(typename); if (!OidIsValid(domainoid)) elog(ERROR, "Type \"%s\" does not exist", @@ -1055,93 +1057,84 @@ AlterDomainNotNull(List *names, bool notNull) tup = SearchSysCacheCopy(TYPEOID, ObjectIdGetDatum(domainoid), 0, 0, 0); - if (!HeapTupleIsValid(tup)) elog(ERROR, "AlterDomain: type \"%s\" does not exist", TypeNameToString(typename)); + typTup = (Form_pg_type) GETSTRUCT(tup); /* Doesn't return if user isn't allowed to alter the domain */ - domainPermissionCheck(tup, typename); + domainOwnerCheck(tup, typename); - typTup = (Form_pg_type) GETSTRUCT(tup); - - /* Is the domain already set to the destination constraint? */ + /* Is the domain already set to the desired constraint? */ if (typTup->typnotnull == notNull) - elog(ERROR, "AlterDomain: %s is already set to %s", + { + elog(NOTICE, "AlterDomain: %s is already set to %s", TypeNameToString(typename), notNull ? "NOT NULL" : "NULL"); + heap_close(typrel, RowExclusiveLock); + return; + } - /* Adding a NOT NULL constraint requires checking current domains */ + /* Adding a NOT NULL constraint requires checking existing columns */ if (notNull) { List *rels; List *rt; /* Fetch relation list with attributes based on this domain */ - rels = get_rels_with_domain(domainoid); + /* ShareLock is sufficient to prevent concurrent data changes */ + + rels = get_rels_with_domain(domainoid, ShareLock); foreach (rt, rels) { - Relation typrel; - HeapTuple tuple; + RelToCheck *rtc = (RelToCheck *) lfirst(rt); + Relation testrel = rtc->rel; + TupleDesc tupdesc = RelationGetDescr(testrel); HeapScanDesc scan; - TupleDesc tupdesc; - relToCheck *rtc = (relToCheck *) lfirst(rt); - - /* Lock relation */ - typrel = heap_open(rtc->relOid, ExclusiveLock); - - tupdesc = RelationGetDescr(typrel); + HeapTuple tuple; - /* Fetch tuples sequentially */ - scan = heap_beginscan(typrel, SnapshotNow, 0, NULL); + /* Scan all tuples in this relation */ + scan = heap_beginscan(testrel, SnapshotNow, 0, NULL); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { int i; - /* Test attributes */ + /* Test attributes that are of the domain */ for (i = 0; i < rtc->natts; i++) { + int attnum = rtc->atts[i]; Datum d; bool isNull; - d = heap_getattr(tuple, rtc->atts[i], tupdesc, &isNull); + d = heap_getattr(tuple, attnum, tupdesc, &isNull); if (isNull) - elog(ERROR, "ALTER DOMAIN: Relation \"%s\" Attribute \"%s\" " - "contains NULL values", - RelationGetRelationName(typrel), - NameStr(*attnumAttName(typrel, rtc->atts[i]))); + elog(ERROR, "ALTER DOMAIN: Relation \"%s\" attribute \"%s\" contains NULL values", + RelationGetRelationName(testrel), + NameStr(tupdesc->attrs[attnum - 1]->attname)); } } - heap_endscan(scan); - /* Release lock */ - heap_close(typrel, NoLock); + /* Close each rel after processing, but keep lock */ + heap_close(testrel, NoLock); } } + /* + * Okay to update pg_type row. We can scribble on typTup because it's + * a copy. + */ + typTup->typnotnull = notNull; - /* Setup new tuple */ - MemSet(new_record, (Datum) 0, sizeof(new_record)); - MemSet(new_record_nulls, ' ', sizeof(new_record_nulls)); - MemSet(new_record_repl, ' ', sizeof(new_record_repl)); - - new_record[Anum_pg_type_typnotnull - 1] = BoolGetDatum(notNull); - new_record_repl[Anum_pg_type_typnotnull - 1] = 'r'; - - /* Build the new tuple */ - newtuple = heap_modifytuple(tup, rel, - new_record, new_record_nulls, new_record_repl); - - simple_heap_update(rel, &tup->t_self, newtuple); + simple_heap_update(typrel, &tup->t_self, tup); - CatalogUpdateIndexes(rel, newtuple); + CatalogUpdateIndexes(typrel, tup); /* Clean up */ - heap_close(rel, NoLock); - heap_freetuple(newtuple); + heap_freetuple(tup); + heap_close(typrel, RowExclusiveLock); } /* @@ -1186,7 +1179,7 @@ AlterDomainDropConstraint(List *names, const char *constrName, DropBehavior beha TypeNameToString(typename)); /* Doesn't return if user isn't allowed to alter the domain */ - domainPermissionCheck(tup, typename); + domainOwnerCheck(tup, typename); /* Grab an appropriate lock on the pg_constraint relation */ conrel = heap_openr(ConstraintRelationName, RowExclusiveLock); @@ -1236,11 +1229,11 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) { TypeName *typename; Oid domainoid; + Relation typrel; HeapTuple tup; - Relation rel; + Form_pg_type typTup; List *rels; List *rt; - Form_pg_type typTup; EState *estate; ExprContext *econtext; char *ccbin; @@ -1256,9 +1249,9 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) typename->arrayBounds = NIL; /* Lock the type table */ - rel = heap_openr(TypeRelationName, RowExclusiveLock); + typrel = heap_openr(TypeRelationName, RowExclusiveLock); - /* Use LookupTypeName here so that shell types can be found. */ + /* Use LookupTypeName here so that shell types can be found (why?). */ domainoid = LookupTypeName(typename); if (!OidIsValid(domainoid)) elog(ERROR, "Type \"%s\" does not exist", @@ -1267,15 +1260,13 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) tup = SearchSysCacheCopy(TYPEOID, ObjectIdGetDatum(domainoid), 0, 0, 0); - if (!HeapTupleIsValid(tup)) elog(ERROR, "AlterDomain: type \"%s\" does not exist", TypeNameToString(typename)); - typTup = (Form_pg_type) GETSTRUCT(tup); /* Doesn't return if user isn't allowed to alter the domain */ - domainPermissionCheck(tup, typename); + domainOwnerCheck(tup, typename); /* Check for unsupported constraint types */ if (IsA(newConstraint, FkConstraint)) @@ -1346,35 +1337,34 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) /* build execution state for expr */ exprstate = ExecPrepareExpr(expr, estate); - rels = get_rels_with_domain(domainoid); + /* Fetch relation list with attributes based on this domain */ + /* ShareLock is sufficient to prevent concurrent data changes */ + + rels = get_rels_with_domain(domainoid, ShareLock); foreach (rt, rels) { - relToCheck *rtc = (relToCheck *) lfirst(rt); - Relation testrel; - TupleDesc tupdesc; - HeapTuple tuple; + RelToCheck *rtc = (RelToCheck *) lfirst(rt); + Relation testrel = rtc->rel; + TupleDesc tupdesc = RelationGetDescr(testrel); HeapScanDesc scan; + HeapTuple tuple; - /* Lock relation against changes */ - testrel = heap_open(rtc->relOid, ShareLock); - - tupdesc = RelationGetDescr(testrel); - - /* Scan through table */ + /* Scan all tuples in this relation */ scan = heap_beginscan(testrel, SnapshotNow, 0, NULL); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { int i; - /* Loop through each attribute of the tuple with a domain */ + /* Test attributes that are of the domain */ for (i = 0; i < rtc->natts; i++) { + int attnum = rtc->atts[i]; Datum d; bool isNull; Datum conResult; - d = heap_getattr(tuple, rtc->atts[i], tupdesc, &isNull); + d = heap_getattr(tuple, attnum, tupdesc, &isNull); econtext->domainValue_datum = d; econtext->domainValue_isNull = isNull; @@ -1384,13 +1374,13 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) &isNull, NULL); if (!isNull && !DatumGetBool(conResult)) - elog(ERROR, "AlterDomainAddConstraint: Domain %s constraint %s failed", - NameStr(typTup->typname), constr->name); + elog(ERROR, "ALTER DOMAIN: Relation \"%s\" attribute \"%s\" contains values that fail the new constraint", + RelationGetRelationName(testrel), + NameStr(tupdesc->attrs[attnum - 1]->attname)); } ResetExprContext(econtext); } - heap_endscan(scan); /* Hold relation lock till commit (XXX bad for concurrency) */ @@ -1400,114 +1390,158 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) FreeExecutorState(estate); /* Clean up */ - heap_close(rel, NoLock); + heap_close(typrel, RowExclusiveLock); } /* * get_rels_with_domain * * Fetch all relations / attributes which are using the domain - * while maintaining a RowExclusiveLock on the pg_attribute - * entries. + * + * The result is a list of RelToCheck structs, one for each distinct + * relation, each containing one or more attribute numbers that are of + * the domain type. We have opened each rel and acquired the specified lock + * type on it. + * + * XXX this is completely broken because there is no way to lock the domain + * to prevent columns from being added or dropped while our command runs. + * We can partially protect against column drops by locking relations as we + * come across them, but there is still a race condition (the window between + * seeing a pg_depend entry and acquiring lock on the relation it references). + * Also, holding locks on all these relations simultaneously creates a non- + * trivial risk of deadlock. We can minimize but not eliminate the deadlock + * risk by using the weakest suitable lock (ShareLock for most callers). + * + * XXX to support domains over domains, we'd need to make this smarter, + * or make its callers smarter, so that we could find columns of derived + * domains. Arrays of domains would be a problem too. * * Generally used for retrieving a list of tests when adding * new constraints to a domain. */ -List * -get_rels_with_domain(Oid domainOid) +static List * +get_rels_with_domain(Oid domainOid, LOCKMODE lockmode) { - Relation classRel; - HeapTuple classTup; - Relation attRel; - HeapScanDesc classScan; - List *rels = NIL; + List *result = NIL; + Relation depRel; + ScanKeyData key[2]; + SysScanDesc depScan; + HeapTuple depTup; /* - * We need to lock the domain rows for the length of the transaction, - * but once all of the tables and the appropriate attributes are - * found we can relese the relation lock. + * We scan pg_depend to find those things that depend on the domain. + * (We assume we can ignore refobjsubid for a domain.) */ - classRel = relation_openr(RelationRelationName, ExclusiveLock); - attRel = relation_openr(AttributeRelationName, RowExclusiveLock); + depRel = relation_openr(DependRelationName, AccessShareLock); - classScan = heap_beginscan(classRel, SnapshotNow, 0, NULL); + ScanKeyEntryInitialize(&key[0], 0x0, + Anum_pg_depend_refclassid, F_OIDEQ, + ObjectIdGetDatum(RelOid_pg_type)); + ScanKeyEntryInitialize(&key[1], 0x0, + Anum_pg_depend_refobjid, F_OIDEQ, + ObjectIdGetDatum(domainOid)); + + depScan = systable_beginscan(depRel, DependReferenceIndex, true, + SnapshotNow, 2, key); - /* Scan through pg_class for tables */ - while ((classTup = heap_getnext(classScan, ForwardScanDirection)) != NULL) + while (HeapTupleIsValid(depTup = systable_getnext(depScan))) { - relToCheck *rtc = NULL; - int nkeys = 0; - HeapTuple attTup; - HeapScanDesc attScan; - ScanKeyData attKey[2]; - Form_pg_class pg_class; + Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup); + RelToCheck *rtc = NULL; + List *rellist; + Form_pg_attribute pg_att; + int ptr; + + /* Ignore dependees that aren't user columns of tables */ + /* (we assume system columns are never of domain types) */ + if (pg_depend->classid != RelOid_pg_class || + pg_depend->objsubid <= 0) + continue; + + /* See if we already have an entry for this relation */ + foreach(rellist, result) + { + RelToCheck *rt = (RelToCheck *) lfirst(rellist); - /* Get our pg_class struct */ - pg_class = (Form_pg_class) GETSTRUCT(classTup); + if (RelationGetRelid(rt->rel) == pg_depend->objid) + { + rtc = rt; + break; + } + } - /* Fetch attributes from pg_attribute for the relation of the type domainOid */ - ScanKeyEntryInitialize(&attKey[nkeys++], 0, Anum_pg_attribute_attrelid, - F_OIDEQ, ObjectIdGetDatum(HeapTupleGetOid(classTup))); + if (rtc == NULL) + { + /* First attribute found for this relation */ + Relation rel; + + /* Acquire requested lock on relation */ + rel = heap_open(pg_depend->objid, lockmode); + + /* Build the RelToCheck entry with enough space for all atts */ + rtc = (RelToCheck *) palloc(sizeof(RelToCheck)); + rtc->rel = rel; + rtc->natts = 0; + rtc->atts = (int *) palloc(sizeof(int) * RelationGetNumberOfAttributes(rel)); + result = lcons(rtc, result); + } - ScanKeyEntryInitialize(&attKey[nkeys++], 0, Anum_pg_attribute_atttypid, - F_OIDEQ, ObjectIdGetDatum(domainOid)); + /* + * Confirm column has not been dropped, and is of the expected type. + * This defends against an ALTER DROP COLUMN occuring just before + * we acquired lock ... but if the whole table were dropped, we'd + * still have a problem. + */ + if (pg_depend->objsubid > RelationGetNumberOfAttributes(rtc->rel)) + continue; + pg_att = rtc->rel->rd_att->attrs[pg_depend->objsubid - 1]; + if (pg_att->attisdropped || pg_att->atttypid != domainOid) + continue; - /* Setup to scan pg_attribute */ - attScan = heap_beginscan(attRel, SnapshotNow, nkeys, attKey); + /* + * Okay, add column to result. We store the columns in column-number + * order; this is just a hack to improve predictability of regression + * test output ... + */ + Assert(rtc->natts < RelationGetNumberOfAttributes(rtc->rel)); - /* Scan through pg_attribute for attributes based on the domain */ - while ((attTup = heap_getnext(attScan, ForwardScanDirection)) != NULL) + ptr = rtc->natts++; + while (ptr > 0 && rtc->atts[ptr-1] > pg_depend->objsubid) { - if (rtc == NULL) - { - /* First one found for this rel */ - rtc = (relToCheck *)palloc(sizeof(relToCheck)); - rtc->atts = (int *)palloc(sizeof(int) * pg_class->relnatts); - rtc->relOid = HeapTupleGetOid(classTup); - rtc->natts = 0; - rels = lcons((void *)rtc, rels); - } - - /* Now add the attribute */ - rtc->atts[rtc->natts++] = ((Form_pg_attribute) GETSTRUCT(attTup))->attnum; + rtc->atts[ptr] = rtc->atts[ptr-1]; + ptr--; } - - heap_endscan(attScan); + rtc->atts[ptr] = pg_depend->objsubid; } - heap_endscan(classScan); + systable_endscan(depScan); - /* Release pg_class, hold pg_attribute for further processing */ - relation_close(classRel, ExclusiveLock); - relation_close(attRel, NoLock); + relation_close(depRel, AccessShareLock); - return rels; + return result; } /* - * domainPermissionCheck + * domainOwnerCheck * * Throw an error if the current user doesn't have permission to modify * the domain in an ALTER DOMAIN statement, or if the type isn't actually * a domain. */ -void -domainPermissionCheck(HeapTuple tup, TypeName *typename) +static void +domainOwnerCheck(HeapTuple tup, TypeName *typename) { Form_pg_type typTup = (Form_pg_type) GETSTRUCT(tup); - /* Permission check: must own type or its namespace */ - if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId()) && - !pg_namespace_ownercheck(typTup->typnamespace, - GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, TypeNameToString(typename)); - /* Check that this is actually a domain */ if (typTup->typtype != 'd') elog(ERROR, "%s is not a domain", TypeNameToString(typename)); -} + /* Permission check: must own type */ + if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, TypeNameToString(typename)); +} /* * domainAddConstraint - code shared between CREATE and ALTER DOMAIN diff --git a/src/test/regress/expected/domain.out b/src/test/regress/expected/domain.out index cbbb902379..e48120a68d 100644 --- a/src/test/regress/expected/domain.out +++ b/src/test/regress/expected/domain.out @@ -201,19 +201,19 @@ create table domnotnull ); insert into domnotnull default values; alter domain dnotnulltest set not null; -- fails -ERROR: ALTER DOMAIN: Relation "domnotnull" Attribute "col1" contains NULL values +ERROR: ALTER DOMAIN: Relation "domnotnull" attribute "col1" contains NULL values update domnotnull set col1 = 5; alter domain dnotnulltest set not null; -- fails -ERROR: ALTER DOMAIN: Relation "domnotnull" Attribute "col2" contains NULL values +ERROR: ALTER DOMAIN: Relation "domnotnull" attribute "col2" contains NULL values update domnotnull set col2 = 6; alter domain dnotnulltest set not null; alter domain dnotnulltest set not null; -- fails -ERROR: AlterDomain: dnotnulltest is already set to NOT NULL +NOTICE: AlterDomain: dnotnulltest is already set to NOT NULL update domnotnull set col1 = null; -- fails ERROR: Domain dnotnulltest does not allow NULL values alter domain dnotnulltest drop not null; alter domain dnotnulltest drop not null; -- fails -ERROR: AlterDomain: dnotnulltest is already set to NULL +NOTICE: AlterDomain: dnotnulltest is already set to NULL update domnotnull set col1 = null; drop domain dnotnulltest cascade; NOTICE: Drop cascades to table domnotnull column col2 @@ -253,7 +253,7 @@ create table domcontest (col1 con); insert into domcontest values (1); insert into domcontest values (2); alter domain con add constraint t check (VALUE < 1); -- fails -ERROR: AlterDomainAddConstraint: Domain con constraint t failed +ERROR: ALTER DOMAIN: Relation "domcontest" attribute "col1" contains values that fail the new constraint alter domain con add constraint t check (VALUE < 34); alter domain con add check (VALUE > 0); insert into domcontest values (-5); -- fails