1 /*-------------------------------------------------------------------------
4 * POSTGRES create/destroy relation with inheritance utility code.
6 * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $Header: /cvsroot/pgsql/src/backend/commands/Attic/creatinh.c,v 1.58 2000/05/30 00:49:43 momjian Exp $
13 *-------------------------------------------------------------------------
18 #include "access/heapam.h"
19 #include "catalog/catname.h"
20 #include "catalog/indexing.h"
21 #include "catalog/heap.h"
22 #include "catalog/pg_inherits.h"
23 #include "catalog/pg_ipl.h"
24 #include "catalog/pg_type.h"
25 #include "commands/creatinh.h"
26 #include "utils/syscache.h"
33 static bool checkAttrExists(const char *attributeName,
34 const char *attributeType, List *schema);
35 static List *MergeAttributes(List *schema, List *supers, List **supconstr);
36 static void StoreCatalogInheritance(Oid relationId, List *supers);
38 /* ----------------------------------------------------------------
40 * Creates a new relation.
41 * ----------------------------------------------------------------
44 DefineRelation(CreateStmt *stmt, char relkind)
46 char *relname = palloc(NAMEDATALEN);
47 List *schema = stmt->tableElts;
48 int numberOfAttributes;
53 List *old_constraints;
59 if (strlen(stmt->relname) >= NAMEDATALEN)
60 elog(ERROR, "the relation name %s is >= %d characters long",
61 stmt->relname, NAMEDATALEN);
62 StrNCpy(relname, stmt->relname, NAMEDATALEN);
66 * XXX parameter handling missing below.
69 inheritList = stmt->inhRelnames;
72 * generate relation schema, including inherited attributes.
75 schema = MergeAttributes(schema, inheritList, &old_constraints);
77 numberOfAttributes = length(schema);
78 if (numberOfAttributes <= 0)
80 elog(ERROR, "DefineRelation: %s",
81 "please inherit from a relation or define an attribute");
85 * create a relation descriptor from the relation schema
86 * and create the relation. Note that in this stage only
87 * inherited (pre-cooked) defaults and constraints will be
88 * included into the new relation. (BuildDescForRelation
89 * takes care of the inherited defaults, but we have to copy
90 * inherited constraints here.)
93 descriptor = BuildDescForRelation(schema, relname);
95 if (old_constraints != NIL)
97 ConstrCheck *check = (ConstrCheck *) palloc(length(old_constraints) *
101 foreach(listptr, old_constraints)
103 Constraint *cdef = (Constraint *) lfirst(listptr);
105 if (cdef->contype != CONSTR_CHECK)
108 if (cdef->name != NULL)
110 for (i = 0; i < ncheck; i++)
112 if (strcmp(check[i].ccname, cdef->name) == 0)
113 elog(ERROR, "Duplicate CHECK constraint name: '%s'",
116 check[ncheck].ccname = cdef->name;
120 check[ncheck].ccname = (char *) palloc(NAMEDATALEN);
121 snprintf(check[ncheck].ccname, NAMEDATALEN, "$%d", ncheck + 1);
123 Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
124 check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
129 if (descriptor->constr == NULL)
131 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
132 descriptor->constr->defval = NULL;
133 descriptor->constr->num_defval = 0;
134 descriptor->constr->has_not_null = false;
136 descriptor->constr->num_check = ncheck;
137 descriptor->constr->check = check;
141 relationId = heap_create_with_catalog(relname, descriptor,
142 relkind, stmt->istemp);
144 StoreCatalogInheritance(relationId, inheritList);
147 * Now add any newly specified column default values and CHECK
148 * constraints to the new relation. These are passed to us in the
149 * form of raw parsetrees; we need to transform them to executable
150 * expression trees before they can be added. The most convenient way
151 * to do that is to apply the parser's transformExpr routine, but
152 * transformExpr doesn't work unless we have a pre-existing relation.
153 * So, the transformation has to be postponed to this final step of
156 * First, scan schema to find new column defaults.
161 foreach(listptr, schema)
163 ColumnDef *colDef = lfirst(listptr);
164 RawColumnDefault *rawEnt;
168 if (colDef->raw_default == NULL)
170 Assert(colDef->cooked_default == NULL);
172 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
173 rawEnt->attnum = attnum;
174 rawEnt->raw_default = colDef->raw_default;
175 rawDefaults = lappend(rawDefaults, rawEnt);
178 /* If no raw defaults and no constraints, nothing to do. */
179 if (rawDefaults == NIL && stmt->constraints == NIL)
183 * We must bump the command counter to make the newly-created relation
184 * tuple visible for opening.
186 CommandCounterIncrement();
189 * Open the new relation.
191 rel = heap_openr(relname, AccessExclusiveLock);
194 * Parse and add the defaults/constraints.
196 AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
199 * Clean up. We keep lock on new relation (although it shouldn't be
200 * visible to anyone else anyway, until commit).
202 heap_close(rel, NoLock);
207 * Deletes a new relation.
210 * BadArg if name is invalid.
213 * If the relation has indices defined on it, then the index relations
214 * themselves will be destroyed, too.
217 RemoveRelation(char *name)
220 heap_drop_with_catalog(name);
224 * TruncateRelation --
225 * Removes all the rows from a relation
228 * BadArg if name is invalid
231 * Rows are removed, indices are truncated and reconstructed.
234 TruncateRelation(char *name)
242 * Returns new schema given initial schema and supers.
245 * 'schema' is the column/attribute definition for the table. (It's a list
246 * of ColumnDef's.) It is destructively changed.
247 * 'inheritList' is the list of inherited relations (a list of Value(str)'s).
250 * The order in which the attributes are inherited is very important.
251 * Intuitively, the inherited attributes should come first. If a table
252 * inherits from multiple parents, the order of those attributes are
253 * according to the order of the parents specified in CREATE TABLE.
257 * create table person (name text, age int4, location point);
258 * create table emp (salary int4, manager text) inherits(person);
259 * create table student (gpa float8) inherits (person);
260 * create table stud_emp (percent int4) inherits (emp, student);
262 * the order of the attributes of stud_emp is as follow:
265 * person {1:name, 2:age, 3:location}
267 * {6:gpa} student emp {4:salary, 5:manager}
269 * stud_emp {7:percent}
272 MergeAttributes(List *schema, List *supers, List **supconstr)
275 List *inhSchema = NIL;
276 List *constraints = NIL;
279 * Validates that there are no duplications. Validity checking of
280 * types occurs later.
282 foreach(entry, schema)
284 ColumnDef *coldef = lfirst(entry);
287 foreach(rest, lnext(entry))
291 * check for duplicated names within the new relation
293 ColumnDef *restdef = lfirst(rest);
295 if (!strcmp(coldef->colname, restdef->colname))
297 elog(ERROR, "CREATE TABLE: attribute \"%s\" duplicated",
302 foreach(entry, supers)
306 foreach(rest, lnext(entry))
308 if (!strcmp(strVal(lfirst(entry)), strVal(lfirst(rest))))
310 elog(ERROR, "CREATE TABLE: inherited relation \"%s\" duplicated",
311 strVal(lfirst(entry)));
317 * merge the inherited attributes into the schema
319 foreach(entry, supers)
321 char *name = strVal(lfirst(entry));
323 List *partialResult = NIL;
328 relation = heap_openr(name, AccessShareLock);
329 tupleDesc = RelationGetDescr(relation);
330 constr = tupleDesc->constr;
332 if (relation->rd_rel->relkind != RELKIND_RELATION)
333 elog(ERROR, "CREATE TABLE: inherited relation \"%s\" is not a table", name);
335 for (attrno = relation->rd_rel->relnatts - 1; attrno >= 0; attrno--)
337 Form_pg_attribute attribute = tupleDesc->attrs[attrno];
345 * form name, type and constraints
347 attributeName = NameStr(attribute->attname);
348 tuple = SearchSysCacheTuple(TYPEOID,
349 ObjectIdGetDatum(attribute->atttypid),
351 Assert(HeapTupleIsValid(tuple));
352 attributeType = NameStr(((Form_pg_type) GETSTRUCT(tuple))->typname);
358 if (checkAttrExists(attributeName, attributeType, schema))
359 elog(ERROR, "CREATE TABLE: attribute \"%s\" already exists in inherited schema",
362 if (checkAttrExists(attributeName, attributeType, inhSchema))
365 * this entry already exists
370 * add an entry to the schema
372 def = makeNode(ColumnDef);
373 typename = makeNode(TypeName);
374 def->colname = pstrdup(attributeName);
375 typename->name = pstrdup(attributeType);
376 typename->typmod = attribute->atttypmod;
377 def->typename = typename;
378 def->is_not_null = attribute->attnotnull;
379 def->raw_default = NULL;
380 def->cooked_default = NULL;
381 if (attribute->atthasdef)
383 AttrDefault *attrdef;
386 Assert(constr != NULL);
388 attrdef = constr->defval;
389 for (i = 0; i < constr->num_defval; i++)
391 if (attrdef[i].adnum == attrno + 1)
393 def->cooked_default = pstrdup(attrdef[i].adbin);
397 Assert(def->cooked_default != NULL);
399 partialResult = lcons(def, partialResult);
402 if (constr && constr->num_check > 0)
404 ConstrCheck *check = constr->check;
407 for (i = 0; i < constr->num_check; i++)
409 Constraint *cdef = makeNode(Constraint);
411 cdef->contype = CONSTR_CHECK;
412 if (check[i].ccname[0] == '$')
415 cdef->name = pstrdup(check[i].ccname);
416 cdef->raw_expr = NULL;
417 cdef->cooked_expr = pstrdup(check[i].ccbin);
418 constraints = lappend(constraints, cdef);
423 * Close the parent rel, but keep our AccessShareLock on it until
424 * xact commit. That will prevent someone else from deleting or
425 * ALTERing the parent before the child is committed.
427 heap_close(relation, NoLock);
430 * wants the inherited schema to appear in the order they are
431 * specified in CREATE TABLE
433 inhSchema = nconc(inhSchema, partialResult);
437 * put the inherited schema before our the schema for this table
439 schema = nconc(inhSchema, schema);
440 *supconstr = constraints;
445 * StoreCatalogInheritance
446 * Updates the system catalogs with proper inheritance information.
449 StoreCatalogInheritance(Oid relationId, List *supers)
462 AssertArg(OidIsValid(relationId));
468 * Catalog INHERITS information.
471 relation = heap_openr(InheritsRelationName, RowExclusiveLock);
472 desc = RelationGetDescr(relation);
476 foreach(entry, supers)
478 Datum datum[Natts_pg_inherits];
479 char nullarr[Natts_pg_inherits];
481 tuple = SearchSysCacheTuple(RELNAME,
482 PointerGetDatum(strVal(lfirst(entry))),
484 AssertArg(HeapTupleIsValid(tuple));
487 * build idList for use below
489 idList = lappendi(idList, tuple->t_data->t_oid);
491 datum[0] = ObjectIdGetDatum(relationId); /* inhrel */
492 datum[1] = ObjectIdGetDatum(tuple->t_data->t_oid); /* inhparent */
493 datum[2] = Int16GetDatum(seqNumber); /* inhseqno */
499 tuple = heap_formtuple(desc, datum, nullarr);
501 heap_insert(relation, tuple);
503 if (RelationGetForm(relation)->relhasindex)
505 Relation idescs[Num_pg_inherits_indices];
507 CatalogOpenIndices(Num_pg_inherits_indices, Name_pg_inherits_indices, idescs);
508 CatalogIndexInsert(idescs, Num_pg_inherits_indices, relation, tuple);
509 CatalogCloseIndices(Num_pg_inherits_indices, idescs);
512 heap_freetuple(tuple);
517 heap_close(relation, RowExclusiveLock);
520 * Catalog IPL information.
523 * 0. list superclasses (by Oid) in order given (see idList).
524 * 1. append after each relationId, its superclasses, recursively.
525 * 3. remove all but last of duplicates.
534 foreach(entry, idList)
542 id = (Oid) lfirsti(entry);
546 for (number = 1;; number += 1)
548 tuple = SearchSysCacheTuple(INHRELID,
549 ObjectIdGetDatum(id),
550 Int16GetDatum(number),
553 if (!HeapTupleIsValid(tuple))
556 lnext(current) = lconsi(((Form_pg_inherits)
557 GETSTRUCT(tuple))->inhparent,
560 current = lnext(current);
562 lnext(current) = next;
569 foreach(entry, idList)
576 name = lfirsti(entry);
577 foreach(rest, lnext(entry))
579 if (name == lfirsti(rest))
589 * entry list must be of length >= 2 or else no match
591 * so, remove this entry.
593 lfirst(entry) = lfirst(lnext(entry));
594 lnext(entry) = lnext(lnext(entry));
605 relation = heap_openr(InheritancePrecidenceListRelationName, RowExclusiveLock);
606 desc = RelationGetDescr(relation);
610 foreach(entry, idList)
612 Datum datum[Natts_pg_ipl];
613 char nullarr[Natts_pg_ipl];
615 datum[0] = ObjectIdGetDatum(relationId); /* iplrel */
616 datum[1] = ObjectIdGetDatum(lfirsti(entry));
618 datum[2] = Int16GetDatum(seqNumber); /* iplseqno */
624 tuple = heap_formtuple(desc, datum, nullarr);
626 heap_insert(relation, tuple);
627 heap_freetuple(tuple);
632 heap_close(relation, RowExclusiveLock);
638 * returns true if attribute already exists in schema, false otherwise.
641 checkAttrExists(const char *attributeName, const char *attributeType, List *schema)
647 ColumnDef *def = lfirst(s);
649 if (strcmp(attributeName, def->colname) == 0)
653 * attribute exists. Make sure the types are the same.
655 if (strcmp(attributeType, def->typename->name) != 0)
656 elog(ERROR, "CREATE TABLE: attribute \"%s\" type conflict (%s and %s)",
657 attributeName, attributeType, def->typename->name);