*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/typecmds.c,v 1.25 2002/12/15 16:17:43 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/typecmds.c,v 1.26 2003/01/04 00:46:08 tgl Exp $
*
* DESCRIPTION
* The "DefineFoo" routines take the parse tree and pick out the
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_constraint.h"
+#include "catalog/pg_depend.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
+
+/* result structure for get_rels_with_domain() */
+typedef struct
+{
+ Relation rel; /* opened and locked relation */
+ int natts; /* number of attributes of interest */
+ int *atts; /* attribute numbers */
+ /* atts[] is of allocated length RelationGetNumberOfAttributes(rel) */
+} RelToCheck;
+
+
static Oid findTypeIOFunction(List *procname, Oid typeOid, bool isOutput);
-static List *get_rels_with_domain(Oid domainOid);
-static void domainPermissionCheck(HeapTuple tup, TypeName *typename);
+static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode);
+static void domainOwnerCheck(HeapTuple tup, TypeName *typename);
static char *domainAddConstraint(Oid domainOid, Oid domainNamespace,
Oid baseTypeOid,
int typMod, Constraint *constr,
int *counter, char *domainName);
-typedef struct
-{
- Oid relOid;
- int natts;
- int *atts;
-} relToCheck;
/*
* DefineType
TypeNameToString(typename));
/* Doesn't return if user isn't allowed to alter the domain */
- domainPermissionCheck(tup, typename);
+ domainOwnerCheck(tup, typename);
/* Setup new tuple */
MemSet(new_record, (Datum) 0, sizeof(new_record));
{
TypeName *typename;
Oid domainoid;
+ Relation typrel;
HeapTuple tup;
- Relation rel;
- Datum new_record[Natts_pg_type];
- char new_record_nulls[Natts_pg_type];
- char new_record_repl[Natts_pg_type];
- HeapTuple newtuple;
Form_pg_type typTup;
/* Make a TypeName so we can use standard type lookup machinery */
typename->arrayBounds = NIL;
/* Lock the type table */
- rel = heap_openr(TypeRelationName, RowExclusiveLock);
+ typrel = heap_openr(TypeRelationName, RowExclusiveLock);
- /* Use LookupTypeName here so that shell types can be removed. */
+ /* Use LookupTypeName here so that shell types can be found (why?). */
domainoid = LookupTypeName(typename);
if (!OidIsValid(domainoid))
elog(ERROR, "Type \"%s\" does not exist",
tup = SearchSysCacheCopy(TYPEOID,
ObjectIdGetDatum(domainoid),
0, 0, 0);
-
if (!HeapTupleIsValid(tup))
elog(ERROR, "AlterDomain: type \"%s\" does not exist",
TypeNameToString(typename));
+ typTup = (Form_pg_type) GETSTRUCT(tup);
/* Doesn't return if user isn't allowed to alter the domain */
- domainPermissionCheck(tup, typename);
+ domainOwnerCheck(tup, typename);
- typTup = (Form_pg_type) GETSTRUCT(tup);
-
- /* Is the domain already set to the destination constraint? */
+ /* Is the domain already set to the desired constraint? */
if (typTup->typnotnull == notNull)
- elog(ERROR, "AlterDomain: %s is already set to %s",
+ {
+ elog(NOTICE, "AlterDomain: %s is already set to %s",
TypeNameToString(typename),
notNull ? "NOT NULL" : "NULL");
+ heap_close(typrel, RowExclusiveLock);
+ return;
+ }
- /* Adding a NOT NULL constraint requires checking current domains */
+ /* Adding a NOT NULL constraint requires checking existing columns */
if (notNull)
{
List *rels;
List *rt;
/* Fetch relation list with attributes based on this domain */
- rels = get_rels_with_domain(domainoid);
+ /* ShareLock is sufficient to prevent concurrent data changes */
+
+ rels = get_rels_with_domain(domainoid, ShareLock);
foreach (rt, rels)
{
- Relation typrel;
- HeapTuple tuple;
+ RelToCheck *rtc = (RelToCheck *) lfirst(rt);
+ Relation testrel = rtc->rel;
+ TupleDesc tupdesc = RelationGetDescr(testrel);
HeapScanDesc scan;
- TupleDesc tupdesc;
- relToCheck *rtc = (relToCheck *) lfirst(rt);
-
- /* Lock relation */
- typrel = heap_open(rtc->relOid, ExclusiveLock);
-
- tupdesc = RelationGetDescr(typrel);
+ HeapTuple tuple;
- /* Fetch tuples sequentially */
- scan = heap_beginscan(typrel, SnapshotNow, 0, NULL);
+ /* Scan all tuples in this relation */
+ scan = heap_beginscan(testrel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
int i;
- /* Test attributes */
+ /* Test attributes that are of the domain */
for (i = 0; i < rtc->natts; i++)
{
+ int attnum = rtc->atts[i];
Datum d;
bool isNull;
- d = heap_getattr(tuple, rtc->atts[i], tupdesc, &isNull);
+ d = heap_getattr(tuple, attnum, tupdesc, &isNull);
if (isNull)
- elog(ERROR, "ALTER DOMAIN: Relation \"%s\" Attribute \"%s\" "
- "contains NULL values",
- RelationGetRelationName(typrel),
- NameStr(*attnumAttName(typrel, rtc->atts[i])));
+ elog(ERROR, "ALTER DOMAIN: Relation \"%s\" attribute \"%s\" contains NULL values",
+ RelationGetRelationName(testrel),
+ NameStr(tupdesc->attrs[attnum - 1]->attname));
}
}
-
heap_endscan(scan);
- /* Release lock */
- heap_close(typrel, NoLock);
+ /* Close each rel after processing, but keep lock */
+ heap_close(testrel, NoLock);
}
}
+ /*
+ * Okay to update pg_type row. We can scribble on typTup because it's
+ * a copy.
+ */
+ typTup->typnotnull = notNull;
- /* Setup new tuple */
- MemSet(new_record, (Datum) 0, sizeof(new_record));
- MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
- MemSet(new_record_repl, ' ', sizeof(new_record_repl));
-
- new_record[Anum_pg_type_typnotnull - 1] = BoolGetDatum(notNull);
- new_record_repl[Anum_pg_type_typnotnull - 1] = 'r';
-
- /* Build the new tuple */
- newtuple = heap_modifytuple(tup, rel,
- new_record, new_record_nulls, new_record_repl);
-
- simple_heap_update(rel, &tup->t_self, newtuple);
+ simple_heap_update(typrel, &tup->t_self, tup);
- CatalogUpdateIndexes(rel, newtuple);
+ CatalogUpdateIndexes(typrel, tup);
/* Clean up */
- heap_close(rel, NoLock);
- heap_freetuple(newtuple);
+ heap_freetuple(tup);
+ heap_close(typrel, RowExclusiveLock);
}
/*
TypeNameToString(typename));
/* Doesn't return if user isn't allowed to alter the domain */
- domainPermissionCheck(tup, typename);
+ domainOwnerCheck(tup, typename);
/* Grab an appropriate lock on the pg_constraint relation */
conrel = heap_openr(ConstraintRelationName, RowExclusiveLock);
{
TypeName *typename;
Oid domainoid;
+ Relation typrel;
HeapTuple tup;
- Relation rel;
+ Form_pg_type typTup;
List *rels;
List *rt;
- Form_pg_type typTup;
EState *estate;
ExprContext *econtext;
char *ccbin;
typename->arrayBounds = NIL;
/* Lock the type table */
- rel = heap_openr(TypeRelationName, RowExclusiveLock);
+ typrel = heap_openr(TypeRelationName, RowExclusiveLock);
- /* Use LookupTypeName here so that shell types can be found. */
+ /* Use LookupTypeName here so that shell types can be found (why?). */
domainoid = LookupTypeName(typename);
if (!OidIsValid(domainoid))
elog(ERROR, "Type \"%s\" does not exist",
tup = SearchSysCacheCopy(TYPEOID,
ObjectIdGetDatum(domainoid),
0, 0, 0);
-
if (!HeapTupleIsValid(tup))
elog(ERROR, "AlterDomain: type \"%s\" does not exist",
TypeNameToString(typename));
-
typTup = (Form_pg_type) GETSTRUCT(tup);
/* Doesn't return if user isn't allowed to alter the domain */
- domainPermissionCheck(tup, typename);
+ domainOwnerCheck(tup, typename);
/* Check for unsupported constraint types */
if (IsA(newConstraint, FkConstraint))
/* build execution state for expr */
exprstate = ExecPrepareExpr(expr, estate);
- rels = get_rels_with_domain(domainoid);
+ /* Fetch relation list with attributes based on this domain */
+ /* ShareLock is sufficient to prevent concurrent data changes */
+
+ rels = get_rels_with_domain(domainoid, ShareLock);
foreach (rt, rels)
{
- relToCheck *rtc = (relToCheck *) lfirst(rt);
- Relation testrel;
- TupleDesc tupdesc;
- HeapTuple tuple;
+ RelToCheck *rtc = (RelToCheck *) lfirst(rt);
+ Relation testrel = rtc->rel;
+ TupleDesc tupdesc = RelationGetDescr(testrel);
HeapScanDesc scan;
+ HeapTuple tuple;
- /* Lock relation against changes */
- testrel = heap_open(rtc->relOid, ShareLock);
-
- tupdesc = RelationGetDescr(testrel);
-
- /* Scan through table */
+ /* Scan all tuples in this relation */
scan = heap_beginscan(testrel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
int i;
- /* Loop through each attribute of the tuple with a domain */
+ /* Test attributes that are of the domain */
for (i = 0; i < rtc->natts; i++)
{
+ int attnum = rtc->atts[i];
Datum d;
bool isNull;
Datum conResult;
- d = heap_getattr(tuple, rtc->atts[i], tupdesc, &isNull);
+ d = heap_getattr(tuple, attnum, tupdesc, &isNull);
econtext->domainValue_datum = d;
econtext->domainValue_isNull = isNull;
&isNull, NULL);
if (!isNull && !DatumGetBool(conResult))
- elog(ERROR, "AlterDomainAddConstraint: Domain %s constraint %s failed",
- NameStr(typTup->typname), constr->name);
+ elog(ERROR, "ALTER DOMAIN: Relation \"%s\" attribute \"%s\" contains values that fail the new constraint",
+ RelationGetRelationName(testrel),
+ NameStr(tupdesc->attrs[attnum - 1]->attname));
}
ResetExprContext(econtext);
}
-
heap_endscan(scan);
/* Hold relation lock till commit (XXX bad for concurrency) */
FreeExecutorState(estate);
/* Clean up */
- heap_close(rel, NoLock);
+ heap_close(typrel, RowExclusiveLock);
}
/*
* get_rels_with_domain
*
* Fetch all relations / attributes which are using the domain
- * while maintaining a RowExclusiveLock on the pg_attribute
- * entries.
+ *
+ * The result is a list of RelToCheck structs, one for each distinct
+ * relation, each containing one or more attribute numbers that are of
+ * the domain type. We have opened each rel and acquired the specified lock
+ * type on it.
+ *
+ * XXX this is completely broken because there is no way to lock the domain
+ * to prevent columns from being added or dropped while our command runs.
+ * We can partially protect against column drops by locking relations as we
+ * come across them, but there is still a race condition (the window between
+ * seeing a pg_depend entry and acquiring lock on the relation it references).
+ * Also, holding locks on all these relations simultaneously creates a non-
+ * trivial risk of deadlock. We can minimize but not eliminate the deadlock
+ * risk by using the weakest suitable lock (ShareLock for most callers).
+ *
+ * XXX to support domains over domains, we'd need to make this smarter,
+ * or make its callers smarter, so that we could find columns of derived
+ * domains. Arrays of domains would be a problem too.
*
* Generally used for retrieving a list of tests when adding
* new constraints to a domain.
*/
-List *
-get_rels_with_domain(Oid domainOid)
+static List *
+get_rels_with_domain(Oid domainOid, LOCKMODE lockmode)
{
- Relation classRel;
- HeapTuple classTup;
- Relation attRel;
- HeapScanDesc classScan;
- List *rels = NIL;
+ List *result = NIL;
+ Relation depRel;
+ ScanKeyData key[2];
+ SysScanDesc depScan;
+ HeapTuple depTup;
/*
- * We need to lock the domain rows for the length of the transaction,
- * but once all of the tables and the appropriate attributes are
- * found we can relese the relation lock.
+ * We scan pg_depend to find those things that depend on the domain.
+ * (We assume we can ignore refobjsubid for a domain.)
*/
- classRel = relation_openr(RelationRelationName, ExclusiveLock);
- attRel = relation_openr(AttributeRelationName, RowExclusiveLock);
+ depRel = relation_openr(DependRelationName, AccessShareLock);
- classScan = heap_beginscan(classRel, SnapshotNow, 0, NULL);
+ ScanKeyEntryInitialize(&key[0], 0x0,
+ Anum_pg_depend_refclassid, F_OIDEQ,
+ ObjectIdGetDatum(RelOid_pg_type));
+ ScanKeyEntryInitialize(&key[1], 0x0,
+ Anum_pg_depend_refobjid, F_OIDEQ,
+ ObjectIdGetDatum(domainOid));
+
+ depScan = systable_beginscan(depRel, DependReferenceIndex, true,
+ SnapshotNow, 2, key);
- /* Scan through pg_class for tables */
- while ((classTup = heap_getnext(classScan, ForwardScanDirection)) != NULL)
+ while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
{
- relToCheck *rtc = NULL;
- int nkeys = 0;
- HeapTuple attTup;
- HeapScanDesc attScan;
- ScanKeyData attKey[2];
- Form_pg_class pg_class;
+ Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
+ RelToCheck *rtc = NULL;
+ List *rellist;
+ Form_pg_attribute pg_att;
+ int ptr;
+
+ /* Ignore dependees that aren't user columns of tables */
+ /* (we assume system columns are never of domain types) */
+ if (pg_depend->classid != RelOid_pg_class ||
+ pg_depend->objsubid <= 0)
+ continue;
+
+ /* See if we already have an entry for this relation */
+ foreach(rellist, result)
+ {
+ RelToCheck *rt = (RelToCheck *) lfirst(rellist);
- /* Get our pg_class struct */
- pg_class = (Form_pg_class) GETSTRUCT(classTup);
+ if (RelationGetRelid(rt->rel) == pg_depend->objid)
+ {
+ rtc = rt;
+ break;
+ }
+ }
- /* Fetch attributes from pg_attribute for the relation of the type domainOid */
- ScanKeyEntryInitialize(&attKey[nkeys++], 0, Anum_pg_attribute_attrelid,
- F_OIDEQ, ObjectIdGetDatum(HeapTupleGetOid(classTup)));
+ if (rtc == NULL)
+ {
+ /* First attribute found for this relation */
+ Relation rel;
+
+ /* Acquire requested lock on relation */
+ rel = heap_open(pg_depend->objid, lockmode);
+
+ /* Build the RelToCheck entry with enough space for all atts */
+ rtc = (RelToCheck *) palloc(sizeof(RelToCheck));
+ rtc->rel = rel;
+ rtc->natts = 0;
+ rtc->atts = (int *) palloc(sizeof(int) * RelationGetNumberOfAttributes(rel));
+ result = lcons(rtc, result);
+ }
- ScanKeyEntryInitialize(&attKey[nkeys++], 0, Anum_pg_attribute_atttypid,
- F_OIDEQ, ObjectIdGetDatum(domainOid));
+ /*
+ * Confirm column has not been dropped, and is of the expected type.
+ * This defends against an ALTER DROP COLUMN occuring just before
+ * we acquired lock ... but if the whole table were dropped, we'd
+ * still have a problem.
+ */
+ if (pg_depend->objsubid > RelationGetNumberOfAttributes(rtc->rel))
+ continue;
+ pg_att = rtc->rel->rd_att->attrs[pg_depend->objsubid - 1];
+ if (pg_att->attisdropped || pg_att->atttypid != domainOid)
+ continue;
- /* Setup to scan pg_attribute */
- attScan = heap_beginscan(attRel, SnapshotNow, nkeys, attKey);
+ /*
+ * Okay, add column to result. We store the columns in column-number
+ * order; this is just a hack to improve predictability of regression
+ * test output ...
+ */
+ Assert(rtc->natts < RelationGetNumberOfAttributes(rtc->rel));
- /* Scan through pg_attribute for attributes based on the domain */
- while ((attTup = heap_getnext(attScan, ForwardScanDirection)) != NULL)
+ ptr = rtc->natts++;
+ while (ptr > 0 && rtc->atts[ptr-1] > pg_depend->objsubid)
{
- if (rtc == NULL)
- {
- /* First one found for this rel */
- rtc = (relToCheck *)palloc(sizeof(relToCheck));
- rtc->atts = (int *)palloc(sizeof(int) * pg_class->relnatts);
- rtc->relOid = HeapTupleGetOid(classTup);
- rtc->natts = 0;
- rels = lcons((void *)rtc, rels);
- }
-
- /* Now add the attribute */
- rtc->atts[rtc->natts++] = ((Form_pg_attribute) GETSTRUCT(attTup))->attnum;
+ rtc->atts[ptr] = rtc->atts[ptr-1];
+ ptr--;
}
-
- heap_endscan(attScan);
+ rtc->atts[ptr] = pg_depend->objsubid;
}
- heap_endscan(classScan);
+ systable_endscan(depScan);
- /* Release pg_class, hold pg_attribute for further processing */
- relation_close(classRel, ExclusiveLock);
- relation_close(attRel, NoLock);
+ relation_close(depRel, AccessShareLock);
- return rels;
+ return result;
}
/*
- * domainPermissionCheck
+ * domainOwnerCheck
*
* Throw an error if the current user doesn't have permission to modify
* the domain in an ALTER DOMAIN statement, or if the type isn't actually
* a domain.
*/
-void
-domainPermissionCheck(HeapTuple tup, TypeName *typename)
+static void
+domainOwnerCheck(HeapTuple tup, TypeName *typename)
{
Form_pg_type typTup = (Form_pg_type) GETSTRUCT(tup);
- /* Permission check: must own type or its namespace */
- if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId()) &&
- !pg_namespace_ownercheck(typTup->typnamespace,
- GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, TypeNameToString(typename));
-
/* Check that this is actually a domain */
if (typTup->typtype != 'd')
elog(ERROR, "%s is not a domain",
TypeNameToString(typename));
-}
+ /* Permission check: must own type */
+ if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, TypeNameToString(typename));
+}
/*
* domainAddConstraint - code shared between CREATE and ALTER DOMAIN