]> granicus.if.org Git - postgresql/blob - src/backend/commands/creatinh.c
b8f8a56eafd31d9b20b5626d9d117af140073784
[postgresql] / src / backend / commands / creatinh.c
1 /*-------------------------------------------------------------------------
2  *
3  * creatinh.c
4  *        POSTGRES create/destroy relation with inheritance utility code.
5  *
6  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/commands/Attic/creatinh.c,v 1.58 2000/05/30 00:49:43 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "catalog/catname.h"
20 #include "catalog/indexing.h"
21 #include "catalog/heap.h"
22 #include "catalog/pg_inherits.h"
23 #include "catalog/pg_ipl.h"
24 #include "catalog/pg_type.h"
25 #include "commands/creatinh.h"
26 #include "utils/syscache.h"
27
28 /* ----------------
29  *              local stuff
30  * ----------------
31  */
32
33 static bool checkAttrExists(const char *attributeName,
34                                 const char *attributeType, List *schema);
35 static List *MergeAttributes(List *schema, List *supers, List **supconstr);
36 static void StoreCatalogInheritance(Oid relationId, List *supers);
37
38 /* ----------------------------------------------------------------
39  *              DefineRelation
40  *                              Creates a new relation.
41  * ----------------------------------------------------------------
42  */
43 void
44 DefineRelation(CreateStmt *stmt, char relkind)
45 {
46         char       *relname = palloc(NAMEDATALEN);
47         List       *schema = stmt->tableElts;
48         int                     numberOfAttributes;
49         Oid                     relationId;
50         Relation        rel;
51         List       *inheritList;
52         TupleDesc       descriptor;
53         List       *old_constraints;
54         List       *rawDefaults;
55         List       *listptr;
56         int                     i;
57         AttrNumber      attnum;
58
59         if (strlen(stmt->relname) >= NAMEDATALEN)
60                 elog(ERROR, "the relation name %s is >= %d characters long",
61                          stmt->relname, NAMEDATALEN);
62         StrNCpy(relname, stmt->relname, NAMEDATALEN);
63
64         /* ----------------
65          *      Handle parameters
66          *      XXX parameter handling missing below.
67          * ----------------
68          */
69         inheritList = stmt->inhRelnames;
70
71         /* ----------------
72          *      generate relation schema, including inherited attributes.
73          * ----------------
74          */
75         schema = MergeAttributes(schema, inheritList, &old_constraints);
76
77         numberOfAttributes = length(schema);
78         if (numberOfAttributes <= 0)
79         {
80                 elog(ERROR, "DefineRelation: %s",
81                          "please inherit from a relation or define an attribute");
82         }
83
84         /* ----------------
85          *      create a relation descriptor from the relation schema
86          *      and create the relation.  Note that in this stage only
87          *      inherited (pre-cooked) defaults and constraints will be
88          *      included into the new relation.  (BuildDescForRelation
89          *      takes care of the inherited defaults, but we have to copy
90          *      inherited constraints here.)
91          * ----------------
92          */
93         descriptor = BuildDescForRelation(schema, relname);
94
95         if (old_constraints != NIL)
96         {
97                 ConstrCheck *check = (ConstrCheck *) palloc(length(old_constraints) *
98                                                                                                         sizeof(ConstrCheck));
99                 int                     ncheck = 0;
100
101                 foreach(listptr, old_constraints)
102                 {
103                         Constraint *cdef = (Constraint *) lfirst(listptr);
104
105                         if (cdef->contype != CONSTR_CHECK)
106                                 continue;
107
108                         if (cdef->name != NULL)
109                         {
110                                 for (i = 0; i < ncheck; i++)
111                                 {
112                                         if (strcmp(check[i].ccname, cdef->name) == 0)
113                                                 elog(ERROR, "Duplicate CHECK constraint name: '%s'",
114                                                          cdef->name);
115                                 }
116                                 check[ncheck].ccname = cdef->name;
117                         }
118                         else
119                         {
120                                 check[ncheck].ccname = (char *) palloc(NAMEDATALEN);
121                                 snprintf(check[ncheck].ccname, NAMEDATALEN, "$%d", ncheck + 1);
122                         }
123                         Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
124                         check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
125                         ncheck++;
126                 }
127                 if (ncheck > 0)
128                 {
129                         if (descriptor->constr == NULL)
130                         {
131                                 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
132                                 descriptor->constr->defval = NULL;
133                                 descriptor->constr->num_defval = 0;
134                                 descriptor->constr->has_not_null = false;
135                         }
136                         descriptor->constr->num_check = ncheck;
137                         descriptor->constr->check = check;
138                 }
139         }
140
141         relationId = heap_create_with_catalog(relname, descriptor,
142                                                                                   relkind, stmt->istemp);
143
144         StoreCatalogInheritance(relationId, inheritList);
145
146         /*
147          * Now add any newly specified column default values and CHECK
148          * constraints to the new relation.  These are passed to us in the
149          * form of raw parsetrees; we need to transform them to executable
150          * expression trees before they can be added. The most convenient way
151          * to do that is to apply the parser's transformExpr routine, but
152          * transformExpr doesn't work unless we have a pre-existing relation.
153          * So, the transformation has to be postponed to this final step of
154          * CREATE TABLE.
155          *
156          * First, scan schema to find new column defaults.
157          */
158         rawDefaults = NIL;
159         attnum = 0;
160
161         foreach(listptr, schema)
162         {
163                 ColumnDef  *colDef = lfirst(listptr);
164                 RawColumnDefault *rawEnt;
165
166                 attnum++;
167
168                 if (colDef->raw_default == NULL)
169                         continue;
170                 Assert(colDef->cooked_default == NULL);
171
172                 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
173                 rawEnt->attnum = attnum;
174                 rawEnt->raw_default = colDef->raw_default;
175                 rawDefaults = lappend(rawDefaults, rawEnt);
176         }
177
178         /* If no raw defaults and no constraints, nothing to do. */
179         if (rawDefaults == NIL && stmt->constraints == NIL)
180                 return;
181
182         /*
183          * We must bump the command counter to make the newly-created relation
184          * tuple visible for opening.
185          */
186         CommandCounterIncrement();
187
188         /*
189          * Open the new relation.
190          */
191         rel = heap_openr(relname, AccessExclusiveLock);
192
193         /*
194          * Parse and add the defaults/constraints.
195          */
196         AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
197
198         /*
199          * Clean up.  We keep lock on new relation (although it shouldn't be
200          * visible to anyone else anyway, until commit).
201          */
202         heap_close(rel, NoLock);
203 }
204
205 /*
206  * RemoveRelation
207  *              Deletes a new relation.
208  *
209  * Exceptions:
210  *              BadArg if name is invalid.
211  *
212  * Note:
213  *              If the relation has indices defined on it, then the index relations
214  * themselves will be destroyed, too.
215  */
216 void
217 RemoveRelation(char *name)
218 {
219         AssertArg(name);
220         heap_drop_with_catalog(name);
221 }
222
223 /*
224  * TruncateRelation --
225  *                                Removes all the rows from a relation
226  *
227  * Exceptions:
228  *                                BadArg if name is invalid
229  *
230  * Note:
231  *                                Rows are removed, indices are truncated and reconstructed.
232  */
233 void
234 TruncateRelation(char *name)
235 {
236         AssertArg(name);
237         heap_truncate(name);
238 }
239
240 /*
241  * MergeAttributes
242  *              Returns new schema given initial schema and supers.
243  *
244  *
245  * 'schema' is the column/attribute definition for the table. (It's a list
246  *              of ColumnDef's.) It is destructively changed.
247  * 'inheritList' is the list of inherited relations (a list of Value(str)'s).
248  *
249  * Notes:
250  *        The order in which the attributes are inherited is very important.
251  *        Intuitively, the inherited attributes should come first. If a table
252  *        inherits from multiple parents, the order of those attributes are
253  *        according to the order of the parents specified in CREATE TABLE.
254  *
255  *        Here's an example:
256  *
257  *              create table person (name text, age int4, location point);
258  *              create table emp (salary int4, manager text) inherits(person);
259  *              create table student (gpa float8) inherits (person);
260  *              create table stud_emp (percent int4) inherits (emp, student);
261  *
262  *        the order of the attributes of stud_emp is as follow:
263  *
264  *
265  *                                                      person {1:name, 2:age, 3:location}
266  *                                                      /        \
267  *                         {6:gpa}      student   emp {4:salary, 5:manager}
268  *                                                      \        /
269  *                                                 stud_emp {7:percent}
270  */
271 static List *
272 MergeAttributes(List *schema, List *supers, List **supconstr)
273 {
274         List       *entry;
275         List       *inhSchema = NIL;
276         List       *constraints = NIL;
277
278         /*
279          * Validates that there are no duplications. Validity checking of
280          * types occurs later.
281          */
282         foreach(entry, schema)
283         {
284                 ColumnDef  *coldef = lfirst(entry);
285                 List       *rest;
286
287                 foreach(rest, lnext(entry))
288                 {
289
290                         /*
291                          * check for duplicated names within the new relation
292                          */
293                         ColumnDef  *restdef = lfirst(rest);
294
295                         if (!strcmp(coldef->colname, restdef->colname))
296                         {
297                                 elog(ERROR, "CREATE TABLE: attribute \"%s\" duplicated",
298                                          coldef->colname);
299                         }
300                 }
301         }
302         foreach(entry, supers)
303         {
304                 List       *rest;
305
306                 foreach(rest, lnext(entry))
307                 {
308                         if (!strcmp(strVal(lfirst(entry)), strVal(lfirst(rest))))
309                         {
310                                 elog(ERROR, "CREATE TABLE: inherited relation \"%s\" duplicated",
311                                          strVal(lfirst(entry)));
312                         }
313                 }
314         }
315
316         /*
317          * merge the inherited attributes into the schema
318          */
319         foreach(entry, supers)
320         {
321                 char       *name = strVal(lfirst(entry));
322                 Relation        relation;
323                 List       *partialResult = NIL;
324                 AttrNumber      attrno;
325                 TupleDesc       tupleDesc;
326                 TupleConstr *constr;
327
328                 relation = heap_openr(name, AccessShareLock);
329                 tupleDesc = RelationGetDescr(relation);
330                 constr = tupleDesc->constr;
331
332                 if (relation->rd_rel->relkind != RELKIND_RELATION)
333                         elog(ERROR, "CREATE TABLE: inherited relation \"%s\" is not a table", name);
334
335                 for (attrno = relation->rd_rel->relnatts - 1; attrno >= 0; attrno--)
336                 {
337                         Form_pg_attribute attribute = tupleDesc->attrs[attrno];
338                         char       *attributeName;
339                         char       *attributeType;
340                         HeapTuple       tuple;
341                         ColumnDef  *def;
342                         TypeName   *typename;
343
344                         /*
345                          * form name, type and constraints
346                          */
347                         attributeName = NameStr(attribute->attname);
348                         tuple = SearchSysCacheTuple(TYPEOID,
349                                                                    ObjectIdGetDatum(attribute->atttypid),
350                                                                                 0, 0, 0);
351                         Assert(HeapTupleIsValid(tuple));
352                         attributeType = NameStr(((Form_pg_type) GETSTRUCT(tuple))->typname);
353
354                         /*
355                          * check validity
356                          *
357                          */
358                         if (checkAttrExists(attributeName, attributeType, schema))
359                                 elog(ERROR, "CREATE TABLE: attribute \"%s\" already exists in inherited schema",
360                                          attributeName);
361
362                         if (checkAttrExists(attributeName, attributeType, inhSchema))
363
364                                 /*
365                                  * this entry already exists
366                                  */
367                                 continue;
368
369                         /*
370                          * add an entry to the schema
371                          */
372                         def = makeNode(ColumnDef);
373                         typename = makeNode(TypeName);
374                         def->colname = pstrdup(attributeName);
375                         typename->name = pstrdup(attributeType);
376                         typename->typmod = attribute->atttypmod;
377                         def->typename = typename;
378                         def->is_not_null = attribute->attnotnull;
379                         def->raw_default = NULL;
380                         def->cooked_default = NULL;
381                         if (attribute->atthasdef)
382                         {
383                                 AttrDefault *attrdef;
384                                 int                     i;
385
386                                 Assert(constr != NULL);
387
388                                 attrdef = constr->defval;
389                                 for (i = 0; i < constr->num_defval; i++)
390                                 {
391                                         if (attrdef[i].adnum == attrno + 1)
392                                         {
393                                                 def->cooked_default = pstrdup(attrdef[i].adbin);
394                                                 break;
395                                         }
396                                 }
397                                 Assert(def->cooked_default != NULL);
398                         }
399                         partialResult = lcons(def, partialResult);
400                 }
401
402                 if (constr && constr->num_check > 0)
403                 {
404                         ConstrCheck *check = constr->check;
405                         int                     i;
406
407                         for (i = 0; i < constr->num_check; i++)
408                         {
409                                 Constraint *cdef = makeNode(Constraint);
410
411                                 cdef->contype = CONSTR_CHECK;
412                                 if (check[i].ccname[0] == '$')
413                                         cdef->name = NULL;
414                                 else
415                                         cdef->name = pstrdup(check[i].ccname);
416                                 cdef->raw_expr = NULL;
417                                 cdef->cooked_expr = pstrdup(check[i].ccbin);
418                                 constraints = lappend(constraints, cdef);
419                         }
420                 }
421
422                 /*
423                  * Close the parent rel, but keep our AccessShareLock on it until
424                  * xact commit.  That will prevent someone else from deleting or
425                  * ALTERing the parent before the child is committed.
426                  */
427                 heap_close(relation, NoLock);
428
429                 /*
430                  * wants the inherited schema to appear in the order they are
431                  * specified in CREATE TABLE
432                  */
433                 inhSchema = nconc(inhSchema, partialResult);
434         }
435
436         /*
437          * put the inherited schema before our the schema for this table
438          */
439         schema = nconc(inhSchema, schema);
440         *supconstr = constraints;
441         return schema;
442 }
443
444 /*
445  * StoreCatalogInheritance
446  *              Updates the system catalogs with proper inheritance information.
447  */
448 static void
449 StoreCatalogInheritance(Oid relationId, List *supers)
450 {
451         Relation        relation;
452         TupleDesc       desc;
453         int16           seqNumber;
454         List       *entry;
455         List       *idList;
456         HeapTuple       tuple;
457
458         /* ----------------
459          *      sanity checks
460          * ----------------
461          */
462         AssertArg(OidIsValid(relationId));
463
464         if (supers == NIL)
465                 return;
466
467         /* ----------------
468          * Catalog INHERITS information.
469          * ----------------
470          */
471         relation = heap_openr(InheritsRelationName, RowExclusiveLock);
472         desc = RelationGetDescr(relation);
473
474         seqNumber = 1;
475         idList = NIL;
476         foreach(entry, supers)
477         {
478                 Datum           datum[Natts_pg_inherits];
479                 char            nullarr[Natts_pg_inherits];
480
481                 tuple = SearchSysCacheTuple(RELNAME,
482                                                                   PointerGetDatum(strVal(lfirst(entry))),
483                                                                         0, 0, 0);
484                 AssertArg(HeapTupleIsValid(tuple));
485
486                 /*
487                  * build idList for use below
488                  */
489                 idList = lappendi(idList, tuple->t_data->t_oid);
490
491                 datum[0] = ObjectIdGetDatum(relationId);                /* inhrel */
492                 datum[1] = ObjectIdGetDatum(tuple->t_data->t_oid);              /* inhparent */
493                 datum[2] = Int16GetDatum(seqNumber);    /* inhseqno */
494
495                 nullarr[0] = ' ';
496                 nullarr[1] = ' ';
497                 nullarr[2] = ' ';
498
499                 tuple = heap_formtuple(desc, datum, nullarr);
500
501                 heap_insert(relation, tuple);
502
503                 if (RelationGetForm(relation)->relhasindex)
504                 {
505                         Relation        idescs[Num_pg_inherits_indices];
506
507                         CatalogOpenIndices(Num_pg_inherits_indices, Name_pg_inherits_indices, idescs);
508                         CatalogIndexInsert(idescs, Num_pg_inherits_indices, relation, tuple);
509                         CatalogCloseIndices(Num_pg_inherits_indices, idescs);
510                 }
511
512                 heap_freetuple(tuple);
513
514                 seqNumber += 1;
515         }
516
517         heap_close(relation, RowExclusiveLock);
518
519         /* ----------------
520          * Catalog IPL information.
521          *
522          * Algorithm:
523          *      0. list superclasses (by Oid) in order given (see idList).
524          *      1. append after each relationId, its superclasses, recursively.
525          *      3. remove all but last of duplicates.
526          *      4. store result.
527          * ----------------
528          */
529
530         /* ----------------
531          *      1.
532          * ----------------
533          */
534         foreach(entry, idList)
535         {
536                 HeapTuple       tuple;
537                 Oid                     id;
538                 int16           number;
539                 List       *next;
540                 List       *current;
541
542                 id = (Oid) lfirsti(entry);
543                 current = entry;
544                 next = lnext(entry);
545
546                 for (number = 1;; number += 1)
547                 {
548                         tuple = SearchSysCacheTuple(INHRELID,
549                                                                                 ObjectIdGetDatum(id),
550                                                                                 Int16GetDatum(number),
551                                                                                 0, 0);
552
553                         if (!HeapTupleIsValid(tuple))
554                                 break;
555
556                         lnext(current) = lconsi(((Form_pg_inherits)
557                                                                          GETSTRUCT(tuple))->inhparent,
558                                                                         NIL);
559
560                         current = lnext(current);
561                 }
562                 lnext(current) = next;
563         }
564
565         /* ----------------
566          *      2.
567          * ----------------
568          */
569         foreach(entry, idList)
570         {
571                 Oid                     name;
572                 List       *rest;
573                 bool            found = false;
574
575 again:
576                 name = lfirsti(entry);
577                 foreach(rest, lnext(entry))
578                 {
579                         if (name == lfirsti(rest))
580                         {
581                                 found = true;
582                                 break;
583                         }
584                 }
585                 if (found)
586                 {
587
588                         /*
589                          * entry list must be of length >= 2 or else no match
590                          *
591                          * so, remove this entry.
592                          */
593                         lfirst(entry) = lfirst(lnext(entry));
594                         lnext(entry) = lnext(lnext(entry));
595
596                         found = false;
597                         goto again;
598                 }
599         }
600
601         /* ----------------
602          *      3.
603          * ----------------
604          */
605         relation = heap_openr(InheritancePrecidenceListRelationName, RowExclusiveLock);
606         desc = RelationGetDescr(relation);
607
608         seqNumber = 1;
609
610         foreach(entry, idList)
611         {
612                 Datum           datum[Natts_pg_ipl];
613                 char            nullarr[Natts_pg_ipl];
614
615                 datum[0] = ObjectIdGetDatum(relationId);                /* iplrel */
616                 datum[1] = ObjectIdGetDatum(lfirsti(entry));
617                 /* iplinherits */
618                 datum[2] = Int16GetDatum(seqNumber);    /* iplseqno */
619
620                 nullarr[0] = ' ';
621                 nullarr[1] = ' ';
622                 nullarr[2] = ' ';
623
624                 tuple = heap_formtuple(desc, datum, nullarr);
625
626                 heap_insert(relation, tuple);
627                 heap_freetuple(tuple);
628
629                 seqNumber += 1;
630         }
631
632         heap_close(relation, RowExclusiveLock);
633 }
634
635
636
637 /*
638  * returns true if attribute already exists in schema, false otherwise.
639  */
640 static bool
641 checkAttrExists(const char *attributeName, const char *attributeType, List *schema)
642 {
643         List       *s;
644
645         foreach(s, schema)
646         {
647                 ColumnDef  *def = lfirst(s);
648
649                 if (strcmp(attributeName, def->colname) == 0)
650                 {
651
652                         /*
653                          * attribute exists. Make sure the types are the same.
654                          */
655                         if (strcmp(attributeType, def->typename->name) != 0)
656                                 elog(ERROR, "CREATE TABLE: attribute \"%s\" type conflict (%s and %s)",
657                                          attributeName, attributeType, def->typename->name);
658                         return true;
659                 }
660         }
661         return false;
662 }