1 /*-------------------------------------------------------------------------
4 * Commands for creating and altering table structures and settings
6 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.189 2006/07/02 01:58:36 momjian Exp $
13 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/tuptoaster.h"
19 #include "catalog/catalog.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_constraint.h"
26 #include "catalog/pg_depend.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_namespace.h"
29 #include "catalog/pg_opclass.h"
30 #include "catalog/pg_trigger.h"
31 #include "catalog/pg_type.h"
32 #include "commands/cluster.h"
33 #include "commands/defrem.h"
34 #include "commands/tablecmds.h"
35 #include "commands/tablespace.h"
36 #include "commands/trigger.h"
37 #include "commands/typecmds.h"
38 #include "executor/executor.h"
39 #include "lib/stringinfo.h"
40 #include "miscadmin.h"
41 #include "nodes/makefuncs.h"
42 #include "optimizer/clauses.h"
43 #include "optimizer/plancat.h"
44 #include "optimizer/prep.h"
45 #include "parser/analyze.h"
46 #include "parser/gramparse.h"
47 #include "parser/parser.h"
48 #include "parser/parse_clause.h"
49 #include "parser/parse_coerce.h"
50 #include "parser/parse_expr.h"
51 #include "parser/parse_oper.h"
52 #include "parser/parse_relation.h"
53 #include "parser/parse_type.h"
54 #include "rewrite/rewriteHandler.h"
55 #include "storage/smgr.h"
56 #include "utils/acl.h"
57 #include "utils/builtins.h"
58 #include "utils/fmgroids.h"
59 #include "utils/inval.h"
60 #include "utils/lsyscache.h"
61 #include "utils/memutils.h"
62 #include "utils/relcache.h"
63 #include "utils/syscache.h"
67 * ON COMMIT action list
69 typedef struct OnCommitItem
71 Oid relid; /* relid of relation */
72 OnCommitAction oncommit; /* what to do at end of xact */
75 * If this entry was created during the current transaction,
76 * creating_subid is the ID of the creating subxact; if created in a prior
77 * transaction, creating_subid is zero. If deleted during the current
78 * transaction, deleting_subid is the ID of the deleting subxact; if no
79 * deletion request is pending, deleting_subid is zero.
81 SubTransactionId creating_subid;
82 SubTransactionId deleting_subid;
85 static List *on_commits = NIL;
89 * State information for ALTER TABLE
91 * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo
92 * structs, one for each table modified by the operation (the named table
93 * plus any child tables that are affected). We save lists of subcommands
94 * to apply to this table (possibly modified by parse transformation steps);
95 * these lists will be executed in Phase 2. If a Phase 3 step is needed,
96 * necessary information is stored in the constraints and newvals lists.
98 * Phase 2 is divided into multiple passes; subcommands are executed in
99 * a pass determined by subcommand type.
102 #define AT_PASS_DROP 0 /* DROP (all flavors) */
103 #define AT_PASS_ALTER_TYPE 1 /* ALTER COLUMN TYPE */
104 #define AT_PASS_OLD_INDEX 2 /* re-add existing indexes */
105 #define AT_PASS_OLD_CONSTR 3 /* re-add existing constraints */
106 #define AT_PASS_COL_ATTRS 4 /* set other column attributes */
107 /* We could support a RENAME COLUMN pass here, but not currently used */
108 #define AT_PASS_ADD_COL 5 /* ADD COLUMN */
109 #define AT_PASS_ADD_INDEX 6 /* ADD indexes */
110 #define AT_PASS_ADD_CONSTR 7 /* ADD constraints, defaults */
111 #define AT_PASS_MISC 8 /* other stuff */
112 #define AT_NUM_PASSES 9
114 typedef struct AlteredTableInfo
116 /* Information saved before any work commences: */
117 Oid relid; /* Relation to work on */
118 char relkind; /* Its relkind */
119 TupleDesc oldDesc; /* Pre-modification tuple descriptor */
120 /* Information saved by Phase 1 for Phase 2: */
121 List *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
122 /* Information saved by Phases 1/2 for Phase 3: */
123 List *constraints; /* List of NewConstraint */
124 List *newvals; /* List of NewColumnValue */
125 Oid newTableSpace; /* new tablespace; 0 means no change */
126 /* Objects to rebuild after completing ALTER TYPE operations */
127 List *changedConstraintOids; /* OIDs of constraints to rebuild */
128 List *changedConstraintDefs; /* string definitions of same */
129 List *changedIndexOids; /* OIDs of indexes to rebuild */
130 List *changedIndexDefs; /* string definitions of same */
133 /* Struct describing one new constraint to check in Phase 3 scan */
134 typedef struct NewConstraint
136 char *name; /* Constraint name, or NULL if none */
137 ConstrType contype; /* CHECK, NOT_NULL, or FOREIGN */
138 AttrNumber attnum; /* only relevant for NOT_NULL */
139 Oid refrelid; /* PK rel, if FOREIGN */
140 Node *qual; /* Check expr or FkConstraint struct */
141 List *qualstate; /* Execution state for CHECK */
145 * Struct describing one new column value that needs to be computed during
146 * Phase 3 copy (this could be either a new column with a non-null default, or
147 * a column that we're changing the type of). Columns without such an entry
148 * are just copied from the old table during ATRewriteTable. Note that the
149 * expr is an expression over *old* table values.
151 typedef struct NewColumnValue
153 AttrNumber attnum; /* which column */
154 Expr *expr; /* expression to compute */
155 ExprState *exprstate; /* execution state */
159 static void truncate_check_rel(Relation rel);
160 static List *MergeAttributes(List *schema, List *supers, bool istemp,
161 List **supOids, List **supconstr, int *supOidCount);
162 static void MergeConstraintsIntoExisting(Relation rel, Relation relation);
163 static void MergeAttributesIntoExisting(Relation rel, Relation relation);
164 static bool change_varattnos_walker(Node *node, const AttrNumber *newattno);
165 static void StoreCatalogInheritance(Oid relationId, List *supers);
166 static void StoreCatalogInheritance1(Oid relationId, Oid parentOid,
167 int16 seqNumber, Relation catalogRelation);
168 static int findAttrByName(const char *attributeName, List *schema);
169 static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
170 static bool needs_toast_table(Relation rel);
171 static void AlterIndexNamespaces(Relation classRel, Relation rel,
172 Oid oldNspOid, Oid newNspOid);
173 static void AlterSeqNamespaces(Relation classRel, Relation rel,
174 Oid oldNspOid, Oid newNspOid,
175 const char *newNspName);
176 static int transformColumnNameList(Oid relId, List *colList,
177 int16 *attnums, Oid *atttypids);
178 static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
180 int16 *attnums, Oid *atttypids,
182 static Oid transformFkeyCheckAttrs(Relation pkrel,
183 int numattrs, int16 *attnums,
185 static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
186 Relation rel, Relation pkrel);
187 static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
189 static char *fkMatchTypeToString(char match_type);
190 static void ATController(Relation rel, List *cmds, bool recurse);
191 static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
192 bool recurse, bool recursing);
193 static void ATRewriteCatalogs(List **wqueue);
194 static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
195 static void ATRewriteTables(List **wqueue);
196 static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
197 static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
198 static void ATSimplePermissions(Relation rel, bool allowView);
199 static void ATSimpleRecursion(List **wqueue, Relation rel,
200 AlterTableCmd *cmd, bool recurse);
201 static void ATOneLevelRecursion(List **wqueue, Relation rel,
203 static void find_composite_type_dependencies(Oid typeOid,
204 const char *origTblName);
205 static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
207 static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
209 static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
210 static void add_column_support_dependency(Oid relid, int32 attnum,
212 static void ATExecDropNotNull(Relation rel, const char *colName);
213 static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
214 const char *colName);
215 static void ATExecColumnDefault(Relation rel, const char *colName,
217 static void ATPrepSetStatistics(Relation rel, const char *colName,
219 static void ATExecSetStatistics(Relation rel, const char *colName,
221 static void ATExecSetStorage(Relation rel, const char *colName,
223 static void ATExecDropColumn(Relation rel, const char *colName,
224 DropBehavior behavior,
225 bool recurse, bool recursing);
226 static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
227 IndexStmt *stmt, bool is_rebuild);
228 static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
229 Node *newConstraint);
230 static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
231 FkConstraint *fkconstraint);
232 static void ATPrepDropConstraint(List **wqueue, Relation rel,
233 bool recurse, AlterTableCmd *cmd);
234 static void ATExecDropConstraint(Relation rel, const char *constrName,
235 DropBehavior behavior, bool quiet);
236 static void ATPrepAlterColumnType(List **wqueue,
237 AlteredTableInfo *tab, Relation rel,
238 bool recurse, bool recursing,
240 static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
241 const char *colName, TypeName *typename);
242 static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
243 static void ATPostAlterTypeParse(char *cmd, List **wqueue);
244 static void change_owner_recurse_to_sequences(Oid relationOid,
246 static void ATExecClusterOn(Relation rel, const char *indexName);
247 static void ATExecDropCluster(Relation rel);
248 static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
249 char *tablespacename);
250 static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
251 static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
252 bool enable, bool skip_system);
253 static void ATExecAddInherits(Relation rel, RangeVar *parent);
254 static void ATExecDropInherits(Relation rel, RangeVar *parent);
255 static void copy_relation_data(Relation rel, SMgrRelation dst);
256 static void update_ri_trigger_args(Oid relid,
260 bool update_relname);
263 /* ----------------------------------------------------------------
265 * Creates a new relation.
267 * If successful, returns the OID of the new relation.
268 * ----------------------------------------------------------------
271 DefineRelation(CreateStmt *stmt, char relkind)
273 char relname[NAMEDATALEN];
275 List *schema = stmt->tableElts;
279 TupleDesc descriptor;
281 List *old_constraints;
290 * Truncate relname to appropriate length (probably a waste of time, as
291 * parser should have done this already).
293 StrNCpy(relname, stmt->relation->relname, NAMEDATALEN);
296 * Check consistency of arguments
298 if (stmt->oncommit != ONCOMMIT_NOOP && !stmt->relation->istemp)
300 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
301 errmsg("ON COMMIT can only be used on temporary tables")));
304 * Look up the namespace in which we are supposed to create the relation.
305 * Check we have permission to create there. Skip check if bootstrapping,
306 * since permissions machinery may not be working yet.
308 namespaceId = RangeVarGetCreationNamespace(stmt->relation);
310 if (!IsBootstrapProcessingMode())
314 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
316 if (aclresult != ACLCHECK_OK)
317 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
318 get_namespace_name(namespaceId));
322 * Select tablespace to use. If not specified, use default_tablespace
323 * (which may in turn default to database's default).
325 if (stmt->tablespacename)
327 tablespaceId = get_tablespace_oid(stmt->tablespacename);
328 if (!OidIsValid(tablespaceId))
330 (errcode(ERRCODE_UNDEFINED_OBJECT),
331 errmsg("tablespace \"%s\" does not exist",
332 stmt->tablespacename)));
336 tablespaceId = GetDefaultTablespace();
337 /* note InvalidOid is OK in this case */
340 /* Check permissions except when using database's default */
341 if (OidIsValid(tablespaceId))
345 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
347 if (aclresult != ACLCHECK_OK)
348 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
349 get_tablespace_name(tablespaceId));
353 * Look up inheritance ancestors and generate relation schema, including
354 * inherited attributes.
356 schema = MergeAttributes(schema, stmt->inhRelations,
357 stmt->relation->istemp,
358 &inheritOids, &old_constraints, &parentOidCount);
361 * Create a relation descriptor from the relation schema and create the
362 * relation. Note that in this stage only inherited (pre-cooked) defaults
363 * and constraints will be included into the new relation.
364 * (BuildDescForRelation takes care of the inherited defaults, but we have
365 * to copy inherited constraints here.)
367 descriptor = BuildDescForRelation(schema);
369 localHasOids = interpretOidsOption(stmt->hasoids);
370 descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
372 if (old_constraints != NIL)
374 ConstrCheck *check = (ConstrCheck *)
375 palloc0(list_length(old_constraints) * sizeof(ConstrCheck));
378 foreach(listptr, old_constraints)
380 Constraint *cdef = (Constraint *) lfirst(listptr);
383 if (cdef->contype != CONSTR_CHECK)
385 Assert(cdef->name != NULL);
386 Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
389 * In multiple-inheritance situations, it's possible to inherit
390 * the same grandparent constraint through multiple parents.
391 * Hence, discard inherited constraints that match as to both name
392 * and expression. Otherwise, gripe if the names conflict.
394 for (i = 0; i < ncheck; i++)
396 if (strcmp(check[i].ccname, cdef->name) != 0)
398 if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0)
404 (errcode(ERRCODE_DUPLICATE_OBJECT),
405 errmsg("duplicate check constraint name \"%s\"",
410 check[ncheck].ccname = cdef->name;
411 check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
417 if (descriptor->constr == NULL)
419 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
420 descriptor->constr->defval = NULL;
421 descriptor->constr->num_defval = 0;
422 descriptor->constr->has_not_null = false;
424 descriptor->constr->num_check = ncheck;
425 descriptor->constr->check = check;
429 relationId = heap_create_with_catalog(relname,
440 allowSystemTableMods);
442 StoreCatalogInheritance(relationId, inheritOids);
445 * We must bump the command counter to make the newly-created relation
446 * tuple visible for opening.
448 CommandCounterIncrement();
451 * Open the new relation and acquire exclusive lock on it. This isn't
452 * really necessary for locking out other backends (since they can't see
453 * the new rel anyway until we commit), but it keeps the lock manager from
454 * complaining about deadlock risks.
456 rel = relation_open(relationId, AccessExclusiveLock);
459 * Now add any newly specified column default values and CHECK constraints
460 * to the new relation. These are passed to us in the form of raw
461 * parsetrees; we need to transform them to executable expression trees
462 * before they can be added. The most convenient way to do that is to
463 * apply the parser's transformExpr routine, but transformExpr doesn't
464 * work unless we have a pre-existing relation. So, the transformation has
465 * to be postponed to this final step of CREATE TABLE.
467 * Another task that's conveniently done at this step is to add dependency
468 * links between columns and supporting relations (such as SERIAL
471 * First, scan schema to find new column defaults.
476 foreach(listptr, schema)
478 ColumnDef *colDef = lfirst(listptr);
482 if (colDef->raw_default != NULL)
484 RawColumnDefault *rawEnt;
486 Assert(colDef->cooked_default == NULL);
488 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
489 rawEnt->attnum = attnum;
490 rawEnt->raw_default = colDef->raw_default;
491 rawDefaults = lappend(rawDefaults, rawEnt);
494 /* Create dependency for supporting relation for this column */
495 if (colDef->support != NULL)
496 add_column_support_dependency(relationId, attnum, colDef->support);
500 * Parse and add the defaults/constraints, if any.
502 if (rawDefaults || stmt->constraints)
503 AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
506 * Clean up. We keep lock on new relation (although it shouldn't be
507 * visible to anyone else anyway, until commit).
509 relation_close(rel, NoLock);
516 * Deletes a relation.
519 RemoveRelation(const RangeVar *relation, DropBehavior behavior)
522 ObjectAddress object;
524 relOid = RangeVarGetRelid(relation, false);
526 object.classId = RelationRelationId;
527 object.objectId = relOid;
528 object.objectSubId = 0;
530 performDeletion(&object, behavior);
535 * Executes a TRUNCATE command.
537 * This is a multi-relation truncate. We first open and grab exclusive
538 * lock on all relations involved, checking permissions and otherwise
539 * verifying that the relation is OK for truncation. In CASCADE mode,
540 * relations having FK references to the targeted relations are automatically
541 * added to the group; in RESTRICT mode, we check that all FK references are
542 * internal to the group that's being truncated. Finally all the relations
543 * are truncated and reindexed.
546 ExecuteTruncate(TruncateStmt *stmt)
553 * Open, exclusive-lock, and check all the explicitly-specified relations
555 foreach(cell, stmt->relations)
557 RangeVar *rv = lfirst(cell);
560 rel = heap_openrv(rv, AccessExclusiveLock);
561 truncate_check_rel(rel);
562 rels = lappend(rels, rel);
563 relids = lappend_oid(relids, RelationGetRelid(rel));
567 * In CASCADE mode, suck in all referencing relations as well. This
568 * requires multiple iterations to find indirectly-dependent relations.
569 * At each phase, we need to exclusive-lock new rels before looking
570 * for their dependencies, else we might miss something. Also, we
571 * check each rel as soon as we open it, to avoid a faux pas such as
572 * holding lock for a long time on a rel we have no permissions for.
574 if (stmt->behavior == DROP_CASCADE)
580 newrelids = heap_truncate_find_FKs(relids);
581 if (newrelids == NIL)
582 break; /* nothing else to add */
584 foreach(cell, newrelids)
586 Oid relid = lfirst_oid(cell);
589 rel = heap_open(relid, AccessExclusiveLock);
591 (errmsg("truncate cascades to table \"%s\"",
592 RelationGetRelationName(rel))));
593 truncate_check_rel(rel);
594 rels = lappend(rels, rel);
595 relids = lappend_oid(relids, relid);
601 * Check foreign key references. In CASCADE mode, this should be
602 * unnecessary since we just pulled in all the references; but as
603 * a cross-check, do it anyway if in an Assert-enabled build.
605 #ifdef USE_ASSERT_CHECKING
606 heap_truncate_check_FKs(rels, false);
608 if (stmt->behavior == DROP_RESTRICT)
609 heap_truncate_check_FKs(rels, false);
613 * OK, truncate each table.
617 Relation rel = (Relation) lfirst(cell);
622 * Create a new empty storage file for the relation, and assign it as
623 * the relfilenode value. The old storage file is scheduled for
624 * deletion at commit.
626 setNewRelfilenode(rel);
628 heap_relid = RelationGetRelid(rel);
629 toast_relid = rel->rd_rel->reltoastrelid;
631 heap_close(rel, NoLock);
634 * The same for the toast table, if any.
636 if (OidIsValid(toast_relid))
638 rel = relation_open(toast_relid, AccessExclusiveLock);
639 setNewRelfilenode(rel);
640 heap_close(rel, NoLock);
644 * Reconstruct the indexes to match, and we're done.
646 reindex_relation(heap_relid, true);
651 * Check that a given rel is safe to truncate. Subroutine for ExecuteTruncate
654 truncate_check_rel(Relation rel)
656 /* Only allow truncate on regular tables */
657 if (rel->rd_rel->relkind != RELKIND_RELATION)
659 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
660 errmsg("\"%s\" is not a table",
661 RelationGetRelationName(rel))));
663 /* Permissions checks */
664 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
665 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
666 RelationGetRelationName(rel));
668 if (!allowSystemTableMods && IsSystemRelation(rel))
670 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
671 errmsg("permission denied: \"%s\" is a system catalog",
672 RelationGetRelationName(rel))));
675 * We can never allow truncation of shared or nailed-in-cache
676 * relations, because we can't support changing their relfilenode
679 if (rel->rd_rel->relisshared || rel->rd_isnailed)
681 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
682 errmsg("cannot truncate system relation \"%s\"",
683 RelationGetRelationName(rel))));
686 * Don't allow truncate on temp tables of other backends ... their
687 * local buffer manager is not going to cope.
689 if (isOtherTempNamespace(RelationGetNamespace(rel)))
691 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
692 errmsg("cannot truncate temporary tables of other sessions")));
697 * Returns new schema given initial schema and superclasses.
700 * 'schema' is the column/attribute definition for the table. (It's a list
701 * of ColumnDef's.) It is destructively changed.
702 * 'supers' is a list of names (as RangeVar nodes) of parent relations.
703 * 'istemp' is TRUE if we are creating a temp relation.
706 * 'supOids' receives a list of the OIDs of the parent relations.
707 * 'supconstr' receives a list of constraints belonging to the parents,
708 * updated as necessary to be valid for the child.
709 * 'supOidCount' is set to the number of parents that have OID columns.
712 * Completed schema list.
715 * The order in which the attributes are inherited is very important.
716 * Intuitively, the inherited attributes should come first. If a table
717 * inherits from multiple parents, the order of those attributes are
718 * according to the order of the parents specified in CREATE TABLE.
722 * create table person (name text, age int4, location point);
723 * create table emp (salary int4, manager text) inherits(person);
724 * create table student (gpa float8) inherits (person);
725 * create table stud_emp (percent int4) inherits (emp, student);
727 * The order of the attributes of stud_emp is:
729 * person {1:name, 2:age, 3:location}
731 * {6:gpa} student emp {4:salary, 5:manager}
733 * stud_emp {7:percent}
735 * If the same attribute name appears multiple times, then it appears
736 * in the result table in the proper location for its first appearance.
738 * Constraints (including NOT NULL constraints) for the child table
739 * are the union of all relevant constraints, from both the child schema
742 * The default value for a child column is defined as:
743 * (1) If the child schema specifies a default, that value is used.
744 * (2) If neither the child nor any parent specifies a default, then
745 * the column will not have a default.
746 * (3) If conflicting defaults are inherited from different parents
747 * (and not overridden by the child), an error is raised.
748 * (4) Otherwise the inherited default is used.
749 * Rule (3) is new in Postgres 7.1; in earlier releases you got a
750 * rather arbitrary choice of which parent default to use.
754 MergeAttributes(List *schema, List *supers, bool istemp,
755 List **supOids, List **supconstr, int *supOidCount)
758 List *inhSchema = NIL;
759 List *parentOids = NIL;
760 List *constraints = NIL;
761 int parentsWithOids = 0;
762 bool have_bogus_defaults = false;
763 char *bogus_marker = "Bogus!"; /* marks conflicting defaults */
767 * Check for and reject tables with too many columns. We perform this
768 * check relatively early for two reasons: (a) we don't run the risk of
769 * overflowing an AttrNumber in subsequent code (b) an O(n^2) algorithm is
770 * okay if we're processing <= 1600 columns, but could take minutes to
771 * execute if the user attempts to create a table with hundreds of
772 * thousands of columns.
774 * Note that we also need to check that any we do not exceed this figure
775 * after including columns from inherited relations.
777 if (list_length(schema) > MaxHeapAttributeNumber)
779 (errcode(ERRCODE_TOO_MANY_COLUMNS),
780 errmsg("tables can have at most %d columns",
781 MaxHeapAttributeNumber)));
784 * Check for duplicate names in the explicit list of attributes.
786 * Although we might consider merging such entries in the same way that we
787 * handle name conflicts for inherited attributes, it seems to make more
788 * sense to assume such conflicts are errors.
790 foreach(entry, schema)
792 ColumnDef *coldef = lfirst(entry);
795 for_each_cell(rest, lnext(entry))
797 ColumnDef *restdef = lfirst(rest);
799 if (strcmp(coldef->colname, restdef->colname) == 0)
801 (errcode(ERRCODE_DUPLICATE_COLUMN),
802 errmsg("column \"%s\" duplicated",
808 * Scan the parents left-to-right, and merge their attributes to form a
809 * list of inherited attributes (inhSchema). Also check to see if we need
810 * to inherit an OID column.
813 foreach(entry, supers)
815 RangeVar *parent = (RangeVar *) lfirst(entry);
819 AttrNumber *newattno;
820 AttrNumber parent_attno;
822 relation = heap_openrv(parent, AccessShareLock);
824 if (relation->rd_rel->relkind != RELKIND_RELATION)
826 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
827 errmsg("inherited relation \"%s\" is not a table",
829 /* Permanent rels cannot inherit from temporary ones */
830 if (!istemp && isTempNamespace(RelationGetNamespace(relation)))
832 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
833 errmsg("cannot inherit from temporary relation \"%s\"",
837 * We should have an UNDER permission flag for this, but for now,
838 * demand that creator of a child table own the parent.
840 if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
841 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
842 RelationGetRelationName(relation));
845 * Reject duplications in the list of parents.
847 if (list_member_oid(parentOids, RelationGetRelid(relation)))
849 (errcode(ERRCODE_DUPLICATE_TABLE),
850 errmsg("inherited relation \"%s\" duplicated",
853 parentOids = lappend_oid(parentOids, RelationGetRelid(relation));
855 if (relation->rd_rel->relhasoids)
858 tupleDesc = RelationGetDescr(relation);
859 constr = tupleDesc->constr;
862 * newattno[] will contain the child-table attribute numbers for the
863 * attributes of this parent table. (They are not the same for
864 * parents after the first one, nor if we have dropped columns.)
866 newattno = (AttrNumber *)
867 palloc(tupleDesc->natts * sizeof(AttrNumber));
869 for (parent_attno = 1; parent_attno <= tupleDesc->natts;
872 Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
873 char *attributeName = NameStr(attribute->attname);
878 * Ignore dropped columns in the parent.
880 if (attribute->attisdropped)
883 * change_varattnos_of_a_node asserts that this is greater
884 * than zero, so if anything tries to use it, we should find
887 newattno[parent_attno - 1] = 0;
892 * Does it conflict with some previously inherited column?
894 exist_attno = findAttrByName(attributeName, inhSchema);
898 * Yes, try to merge the two column definitions. They must
899 * have the same type and typmod.
902 (errmsg("merging multiple inherited definitions of column \"%s\"",
904 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
905 if (typenameTypeId(NULL, def->typename) != attribute->atttypid ||
906 def->typename->typmod != attribute->atttypmod)
908 (errcode(ERRCODE_DATATYPE_MISMATCH),
909 errmsg("inherited column \"%s\" has a type conflict",
911 errdetail("%s versus %s",
912 TypeNameToString(def->typename),
913 format_type_be(attribute->atttypid))));
915 /* Merge of NOT NULL constraints = OR 'em together */
916 def->is_not_null |= attribute->attnotnull;
917 /* Default and other constraints are handled below */
918 newattno[parent_attno - 1] = exist_attno;
923 * No, create a new inherited column
925 def = makeNode(ColumnDef);
926 def->colname = pstrdup(attributeName);
927 def->typename = makeTypeNameFromOid(attribute->atttypid,
928 attribute->atttypmod);
930 def->is_local = false;
931 def->is_not_null = attribute->attnotnull;
932 def->raw_default = NULL;
933 def->cooked_default = NULL;
934 def->constraints = NIL;
936 inhSchema = lappend(inhSchema, def);
937 newattno[parent_attno - 1] = ++child_attno;
941 * Copy default if any
943 if (attribute->atthasdef)
945 char *this_default = NULL;
946 AttrDefault *attrdef;
949 /* Find default in constraint structure */
950 Assert(constr != NULL);
951 attrdef = constr->defval;
952 for (i = 0; i < constr->num_defval; i++)
954 if (attrdef[i].adnum == parent_attno)
956 this_default = attrdef[i].adbin;
960 Assert(this_default != NULL);
963 * If default expr could contain any vars, we'd need to fix
964 * 'em, but it can't; so default is ready to apply to child.
966 * If we already had a default from some prior parent, check
967 * to see if they are the same. If so, no problem; if not,
968 * mark the column as having a bogus default. Below, we will
969 * complain if the bogus default isn't overridden by the child
972 Assert(def->raw_default == NULL);
973 if (def->cooked_default == NULL)
974 def->cooked_default = pstrdup(this_default);
975 else if (strcmp(def->cooked_default, this_default) != 0)
977 def->cooked_default = bogus_marker;
978 have_bogus_defaults = true;
984 * Now copy the constraints of this parent, adjusting attnos using the
985 * completed newattno[] map
987 if (constr && constr->num_check > 0)
989 ConstrCheck *check = constr->check;
992 for (i = 0; i < constr->num_check; i++)
994 Constraint *cdef = makeNode(Constraint);
997 cdef->contype = CONSTR_CHECK;
998 cdef->name = pstrdup(check[i].ccname);
999 cdef->raw_expr = NULL;
1000 /* adjust varattnos of ccbin here */
1001 expr = stringToNode(check[i].ccbin);
1002 change_varattnos_of_a_node(expr, newattno);
1003 cdef->cooked_expr = nodeToString(expr);
1004 constraints = lappend(constraints, cdef);
1011 * Close the parent rel, but keep our AccessShareLock on it until xact
1012 * commit. That will prevent someone else from deleting or ALTERing
1013 * the parent before the child is committed.
1015 heap_close(relation, NoLock);
1019 * If we had no inherited attributes, the result schema is just the
1020 * explicitly declared columns. Otherwise, we need to merge the declared
1021 * columns into the inherited schema list.
1023 if (inhSchema != NIL)
1025 foreach(entry, schema)
1027 ColumnDef *newdef = lfirst(entry);
1028 char *attributeName = newdef->colname;
1032 * Does it conflict with some previously inherited column?
1034 exist_attno = findAttrByName(attributeName, inhSchema);
1035 if (exist_attno > 0)
1040 * Yes, try to merge the two column definitions. They must
1041 * have the same type and typmod.
1044 (errmsg("merging column \"%s\" with inherited definition",
1046 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
1047 if (typenameTypeId(NULL, def->typename) != typenameTypeId(NULL, newdef->typename) ||
1048 def->typename->typmod != newdef->typename->typmod)
1050 (errcode(ERRCODE_DATATYPE_MISMATCH),
1051 errmsg("column \"%s\" has a type conflict",
1053 errdetail("%s versus %s",
1054 TypeNameToString(def->typename),
1055 TypeNameToString(newdef->typename))));
1056 /* Mark the column as locally defined */
1057 def->is_local = true;
1058 /* Merge of NOT NULL constraints = OR 'em together */
1059 def->is_not_null |= newdef->is_not_null;
1060 /* If new def has a default, override previous default */
1061 if (newdef->raw_default != NULL)
1063 def->raw_default = newdef->raw_default;
1064 def->cooked_default = newdef->cooked_default;
1070 * No, attach new column to result schema
1072 inhSchema = lappend(inhSchema, newdef);
1079 * Check that we haven't exceeded the legal # of columns after merging
1080 * in inherited columns.
1082 if (list_length(schema) > MaxHeapAttributeNumber)
1084 (errcode(ERRCODE_TOO_MANY_COLUMNS),
1085 errmsg("tables can have at most %d columns",
1086 MaxHeapAttributeNumber)));
1090 * If we found any conflicting parent default values, check to make sure
1091 * they were overridden by the child.
1093 if (have_bogus_defaults)
1095 foreach(entry, schema)
1097 ColumnDef *def = lfirst(entry);
1099 if (def->cooked_default == bogus_marker)
1101 (errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
1102 errmsg("column \"%s\" inherits conflicting default values",
1104 errhint("To resolve the conflict, specify a default explicitly.")));
1108 *supOids = parentOids;
1109 *supconstr = constraints;
1110 *supOidCount = parentsWithOids;
1115 * Varattnos of pg_constraint.conbin must be rewritten when subclasses inherit
1116 * constraints from parent classes, since the inherited attributes could
1117 * be given different column numbers in multiple-inheritance cases.
1119 * Note that the passed node tree is modified in place!
1121 * This function is used elsewhere such as in analyze.c
1126 change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
1128 change_varattnos_walker(node, newattno);
1131 /* Generate a map for change_varattnos_of_a_node from two tupledesc's. */
1134 varattnos_map(TupleDesc old, TupleDesc new)
1137 AttrNumber *attmap = palloc0(sizeof(AttrNumber)*old->natts);
1138 for (i=1; i <= old->natts; i++) {
1139 if (old->attrs[i-1]->attisdropped) {
1143 for (j=1; j<= new->natts; j++)
1144 if (!strcmp(NameStr(old->attrs[i-1]->attname), NameStr(new->attrs[j-1]->attname)))
1150 /* Generate a map for change_varattnos_of_a_node from a tupledesc and a list of
1154 varattnos_map_schema(TupleDesc old, List *schema)
1157 AttrNumber *attmap = palloc0(sizeof(AttrNumber)*old->natts);
1158 for (i=1; i <= old->natts; i++) {
1159 if (old->attrs[i-1]->attisdropped) {
1163 attmap[i-1] = findAttrByName(NameStr(old->attrs[i-1]->attname), schema);
1169 change_varattnos_walker(Node *node, const AttrNumber *newattno)
1175 Var *var = (Var *) node;
1177 if (var->varlevelsup == 0 && var->varno == 1 &&
1181 * ??? the following may be a problem when the node is multiply
1182 * referenced though stringToNode() doesn't create such a node
1185 Assert(newattno[var->varattno - 1] > 0);
1186 var->varattno = newattno[var->varattno - 1];
1190 return expression_tree_walker(node, change_varattnos_walker,
1195 * StoreCatalogInheritance
1196 * Updates the system catalogs with proper inheritance information.
1198 * supers is a list of the OIDs of the new relation's direct ancestors.
1201 StoreCatalogInheritance(Oid relationId, List *supers)
1210 AssertArg(OidIsValid(relationId));
1216 * Store INHERITS information in pg_inherits using direct ancestors only.
1217 * Also enter dependencies on the direct ancestors, and make sure they are
1218 * marked with relhassubclass = true.
1220 * (Once upon a time, both direct and indirect ancestors were found here
1221 * and then entered into pg_ipl. Since that catalog doesn't exist
1222 * anymore, there's no need to look for indirect ancestors.)
1224 relation = heap_open(InheritsRelationId, RowExclusiveLock);
1227 foreach(entry, supers)
1229 StoreCatalogInheritance1(relationId, lfirst_oid(entry), seqNumber, relation);
1233 heap_close(relation, RowExclusiveLock);
1237 StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 seqNumber, Relation relation)
1239 Datum datum[Natts_pg_inherits];
1240 char nullarr[Natts_pg_inherits];
1241 ObjectAddress childobject,
1244 TupleDesc desc = RelationGetDescr(relation);
1246 datum[0] = ObjectIdGetDatum(relationId); /* inhrel */
1247 datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
1248 datum[2] = Int16GetDatum(seqNumber); /* inhseqno */
1254 tuple = heap_formtuple(desc, datum, nullarr);
1256 simple_heap_insert(relation, tuple);
1258 CatalogUpdateIndexes(relation, tuple);
1260 heap_freetuple(tuple);
1263 * Store a dependency too
1265 parentobject.classId = RelationRelationId;
1266 parentobject.objectId = parentOid;
1267 parentobject.objectSubId = 0;
1268 childobject.classId = RelationRelationId;
1269 childobject.objectId = relationId;
1270 childobject.objectSubId = 0;
1272 recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
1275 * Mark the parent as having subclasses.
1277 setRelhassubclassInRelation(parentOid, true);
1282 * Look for an existing schema entry with the given name.
1284 * Returns the index (starting with 1) if attribute already exists in schema,
1288 findAttrByName(const char *attributeName, List *schema)
1295 ColumnDef *def = lfirst(s);
1297 if (strcmp(attributeName, def->colname) == 0)
1306 * Update a relation's pg_class.relhassubclass entry to the given value
1309 setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
1311 Relation relationRelation;
1313 Form_pg_class classtuple;
1316 * Fetch a modifiable copy of the tuple, modify it, update pg_class.
1318 * If the tuple already has the right relhassubclass setting, we don't
1319 * need to update it, but we still need to issue an SI inval message.
1321 relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
1322 tuple = SearchSysCacheCopy(RELOID,
1323 ObjectIdGetDatum(relationId),
1325 if (!HeapTupleIsValid(tuple))
1326 elog(ERROR, "cache lookup failed for relation %u", relationId);
1327 classtuple = (Form_pg_class) GETSTRUCT(tuple);
1329 if (classtuple->relhassubclass != relhassubclass)
1331 classtuple->relhassubclass = relhassubclass;
1332 simple_heap_update(relationRelation, &tuple->t_self, tuple);
1334 /* keep the catalog indexes up to date */
1335 CatalogUpdateIndexes(relationRelation, tuple);
1339 /* no need to change tuple, but force relcache rebuild anyway */
1340 CacheInvalidateRelcacheByTuple(tuple);
1343 heap_freetuple(tuple);
1344 heap_close(relationRelation, RowExclusiveLock);
1349 * renameatt - changes the name of a attribute in a relation
1351 * Attname attribute is changed in attribute catalog.
1352 * No record of the previous attname is kept (correct?).
1354 * get proper relrelation from relation catalog (if not arg)
1355 * scan attribute catalog
1356 * for name conflict (within rel)
1357 * for original attribute (if not arg)
1358 * modify attname in attribute tuple
1359 * insert modified attribute in attribute catalog
1360 * delete original attribute from attribute catalog
1363 renameatt(Oid myrelid,
1364 const char *oldattname,
1365 const char *newattname,
1369 Relation targetrelation;
1370 Relation attrelation;
1372 Form_pg_attribute attform;
1375 ListCell *indexoidscan;
1378 * Grab an exclusive lock on the target table, which we will NOT release
1379 * until end of transaction.
1381 targetrelation = relation_open(myrelid, AccessExclusiveLock);
1384 * permissions checking. this would normally be done in utility.c, but
1385 * this particular routine is recursive.
1387 * normally, only the owner of a class can change its schema.
1389 if (!pg_class_ownercheck(myrelid, GetUserId()))
1390 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1391 RelationGetRelationName(targetrelation));
1392 if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1394 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1395 errmsg("permission denied: \"%s\" is a system catalog",
1396 RelationGetRelationName(targetrelation))));
1399 * if the 'recurse' flag is set then we are supposed to rename this
1400 * attribute in all classes that inherit from 'relname' (as well as in
1403 * any permissions or problems with duplicate attributes will cause the
1404 * whole transaction to abort, which is what we want -- all or nothing.
1411 /* this routine is actually in the planner */
1412 children = find_all_inheritors(myrelid);
1415 * find_all_inheritors does the recursive search of the inheritance
1416 * hierarchy, so all we have to do is process all of the relids in the
1417 * list that it returns.
1419 foreach(child, children)
1421 Oid childrelid = lfirst_oid(child);
1423 if (childrelid == myrelid)
1425 /* note we need not recurse again */
1426 renameatt(childrelid, oldattname, newattname, false, true);
1432 * If we are told not to recurse, there had better not be any child
1433 * tables; else the rename would put them out of step.
1436 find_inheritance_children(myrelid) != NIL)
1438 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1439 errmsg("inherited column \"%s\" must be renamed in child tables too",
1443 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
1445 atttup = SearchSysCacheCopyAttName(myrelid, oldattname);
1446 if (!HeapTupleIsValid(atttup))
1448 (errcode(ERRCODE_UNDEFINED_COLUMN),
1449 errmsg("column \"%s\" does not exist",
1451 attform = (Form_pg_attribute) GETSTRUCT(atttup);
1453 attnum = attform->attnum;
1456 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1457 errmsg("cannot rename system column \"%s\"",
1461 * if the attribute is inherited, forbid the renaming, unless we are
1462 * already inside a recursive rename.
1464 if (attform->attinhcount > 0 && !recursing)
1466 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1467 errmsg("cannot rename inherited column \"%s\"",
1470 /* should not already exist */
1471 /* this test is deliberately not attisdropped-aware */
1472 if (SearchSysCacheExists(ATTNAME,
1473 ObjectIdGetDatum(myrelid),
1474 PointerGetDatum(newattname),
1477 (errcode(ERRCODE_DUPLICATE_COLUMN),
1478 errmsg("column \"%s\" of relation \"%s\" already exists",
1479 newattname, RelationGetRelationName(targetrelation))));
1481 namestrcpy(&(attform->attname), newattname);
1483 simple_heap_update(attrelation, &atttup->t_self, atttup);
1485 /* keep system catalog indexes current */
1486 CatalogUpdateIndexes(attrelation, atttup);
1488 heap_freetuple(atttup);
1491 * Update column names of indexes that refer to the column being renamed.
1493 indexoidlist = RelationGetIndexList(targetrelation);
1495 foreach(indexoidscan, indexoidlist)
1497 Oid indexoid = lfirst_oid(indexoidscan);
1499 Form_pg_index indexform;
1503 * Scan through index columns to see if there's any simple index
1504 * entries for this attribute. We ignore expressional entries.
1506 indextup = SearchSysCache(INDEXRELID,
1507 ObjectIdGetDatum(indexoid),
1509 if (!HeapTupleIsValid(indextup))
1510 elog(ERROR, "cache lookup failed for index %u", indexoid);
1511 indexform = (Form_pg_index) GETSTRUCT(indextup);
1513 for (i = 0; i < indexform->indnatts; i++)
1515 if (attnum != indexform->indkey.values[i])
1519 * Found one, rename it.
1521 atttup = SearchSysCacheCopy(ATTNUM,
1522 ObjectIdGetDatum(indexoid),
1523 Int16GetDatum(i + 1),
1525 if (!HeapTupleIsValid(atttup))
1526 continue; /* should we raise an error? */
1529 * Update the (copied) attribute tuple.
1531 namestrcpy(&(((Form_pg_attribute) GETSTRUCT(atttup))->attname),
1534 simple_heap_update(attrelation, &atttup->t_self, atttup);
1536 /* keep system catalog indexes current */
1537 CatalogUpdateIndexes(attrelation, atttup);
1539 heap_freetuple(atttup);
1542 ReleaseSysCache(indextup);
1545 list_free(indexoidlist);
1547 heap_close(attrelation, RowExclusiveLock);
1550 * Update att name in any RI triggers associated with the relation.
1552 if (targetrelation->rd_rel->reltriggers > 0)
1554 /* update tgargs column reference where att is primary key */
1555 update_ri_trigger_args(RelationGetRelid(targetrelation),
1556 oldattname, newattname,
1558 /* update tgargs column reference where att is foreign key */
1559 update_ri_trigger_args(RelationGetRelid(targetrelation),
1560 oldattname, newattname,
1564 relation_close(targetrelation, NoLock); /* close rel but keep lock */
1568 * renamerel - change the name of a relation
1570 * XXX - When renaming sequences, we don't bother to modify the
1571 * sequence name that is stored within the sequence itself
1572 * (this would cause problems with MVCC). In the future,
1573 * the sequence name should probably be removed from the
1574 * sequence, AFAIK there's no need for it to be there.
1577 renamerel(Oid myrelid, const char *newrelname)
1579 Relation targetrelation;
1580 Relation relrelation; /* for RELATION relation */
1585 bool relhastriggers;
1588 * Grab an exclusive lock on the target table or index, which we will NOT
1589 * release until end of transaction.
1591 targetrelation = relation_open(myrelid, AccessExclusiveLock);
1593 oldrelname = pstrdup(RelationGetRelationName(targetrelation));
1594 namespaceId = RelationGetNamespace(targetrelation);
1596 if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1598 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1599 errmsg("permission denied: \"%s\" is a system catalog",
1600 RelationGetRelationName(targetrelation))));
1602 relkind = targetrelation->rd_rel->relkind;
1603 relhastriggers = (targetrelation->rd_rel->reltriggers > 0);
1606 * Find relation's pg_class tuple, and make sure newrelname isn't in use.
1608 relrelation = heap_open(RelationRelationId, RowExclusiveLock);
1610 reltup = SearchSysCacheCopy(RELOID,
1611 PointerGetDatum(myrelid),
1613 if (!HeapTupleIsValid(reltup)) /* shouldn't happen */
1614 elog(ERROR, "cache lookup failed for relation %u", myrelid);
1616 if (get_relname_relid(newrelname, namespaceId) != InvalidOid)
1618 (errcode(ERRCODE_DUPLICATE_TABLE),
1619 errmsg("relation \"%s\" already exists",
1623 * Update pg_class tuple with new relname. (Scribbling on reltup is OK
1624 * because it's a copy...)
1626 namestrcpy(&(((Form_pg_class) GETSTRUCT(reltup))->relname), newrelname);
1628 simple_heap_update(relrelation, &reltup->t_self, reltup);
1630 /* keep the system catalog indexes current */
1631 CatalogUpdateIndexes(relrelation, reltup);
1633 heap_freetuple(reltup);
1634 heap_close(relrelation, RowExclusiveLock);
1637 * Also rename the associated type, if any.
1639 if (relkind != RELKIND_INDEX)
1640 TypeRename(oldrelname, namespaceId, newrelname);
1643 * Update rel name in any RI triggers associated with the relation.
1647 /* update tgargs where relname is primary key */
1648 update_ri_trigger_args(myrelid,
1652 /* update tgargs where relname is foreign key */
1653 update_ri_trigger_args(myrelid,
1660 * Close rel, but keep exclusive lock!
1662 relation_close(targetrelation, NoLock);
1666 * Scan pg_trigger for RI triggers that are on the specified relation
1667 * (if fk_scan is false) or have it as the tgconstrrel (if fk_scan
1668 * is true). Update RI trigger args fields matching oldname to contain
1669 * newname instead. If update_relname is true, examine the relname
1670 * fields; otherwise examine the attname fields.
1673 update_ri_trigger_args(Oid relid,
1674 const char *oldname,
1675 const char *newname,
1677 bool update_relname)
1680 ScanKeyData skey[1];
1681 SysScanDesc trigscan;
1683 Datum values[Natts_pg_trigger];
1684 char nulls[Natts_pg_trigger];
1685 char replaces[Natts_pg_trigger];
1687 tgrel = heap_open(TriggerRelationId, RowExclusiveLock);
1690 ScanKeyInit(&skey[0],
1691 Anum_pg_trigger_tgconstrrelid,
1692 BTEqualStrategyNumber, F_OIDEQ,
1693 ObjectIdGetDatum(relid));
1694 trigscan = systable_beginscan(tgrel, TriggerConstrRelidIndexId,
1700 ScanKeyInit(&skey[0],
1701 Anum_pg_trigger_tgrelid,
1702 BTEqualStrategyNumber, F_OIDEQ,
1703 ObjectIdGetDatum(relid));
1704 trigscan = systable_beginscan(tgrel, TriggerRelidNameIndexId,
1709 while ((tuple = systable_getnext(trigscan)) != NULL)
1711 Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1721 const char *arga[RI_MAX_ARGUMENTS];
1724 tg_type = RI_FKey_trigger_type(pg_trigger->tgfoid);
1725 if (tg_type == RI_TRIGGER_NONE)
1727 /* Not an RI trigger, forget it */
1732 * It is an RI trigger, so parse the tgargs bytea.
1734 * NB: we assume the field will never be compressed or moved out of
1735 * line; so does trigger.c ...
1737 tgnargs = pg_trigger->tgnargs;
1739 DatumGetPointer(fastgetattr(tuple,
1740 Anum_pg_trigger_tgargs,
1741 tgrel->rd_att, &isnull));
1742 if (isnull || tgnargs < RI_FIRST_ATTNAME_ARGNO ||
1743 tgnargs > RI_MAX_ARGUMENTS)
1745 /* This probably shouldn't happen, but ignore busted triggers */
1748 argp = (const char *) VARDATA(val);
1749 for (i = 0; i < tgnargs; i++)
1752 argp += strlen(argp) + 1;
1756 * Figure out which item(s) to look at. If the trigger is primary-key
1757 * type and attached to my rel, I should look at the PK fields; if it
1758 * is foreign-key type and attached to my rel, I should look at the FK
1759 * fields. But the opposite rule holds when examining triggers found
1760 * by tgconstrrel search.
1762 examine_pk = (tg_type == RI_TRIGGER_PK) == (!fk_scan);
1767 /* Change the relname if needed */
1768 i = examine_pk ? RI_PK_RELNAME_ARGNO : RI_FK_RELNAME_ARGNO;
1769 if (strcmp(arga[i], oldname) == 0)
1777 /* Change attname(s) if needed */
1778 i = examine_pk ? RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX :
1779 RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_FK_IDX;
1780 for (; i < tgnargs; i += 2)
1782 if (strcmp(arga[i], oldname) == 0)
1792 /* Don't need to update this tuple */
1797 * Construct modified tgargs bytea.
1800 for (i = 0; i < tgnargs; i++)
1801 newlen += strlen(arga[i]) + 1;
1802 newtgargs = (bytea *) palloc(newlen);
1803 VARATT_SIZEP(newtgargs) = newlen;
1805 for (i = 0; i < tgnargs; i++)
1807 strcpy(((char *) newtgargs) + newlen, arga[i]);
1808 newlen += strlen(arga[i]) + 1;
1812 * Build modified tuple.
1814 for (i = 0; i < Natts_pg_trigger; i++)
1816 values[i] = (Datum) 0;
1820 values[Anum_pg_trigger_tgargs - 1] = PointerGetDatum(newtgargs);
1821 replaces[Anum_pg_trigger_tgargs - 1] = 'r';
1823 tuple = heap_modifytuple(tuple, RelationGetDescr(tgrel), values, nulls, replaces);
1826 * Update pg_trigger and its indexes
1828 simple_heap_update(tgrel, &tuple->t_self, tuple);
1830 CatalogUpdateIndexes(tgrel, tuple);
1833 * Invalidate trigger's relation's relcache entry so that other
1834 * backends (and this one too!) are sent SI message to make them
1835 * rebuild relcache entries. (Ideally this should happen
1838 * We can skip this for triggers on relid itself, since that relcache
1839 * flush will happen anyway due to the table or column rename. We
1840 * just need to catch the far ends of RI relationships.
1842 pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1843 if (pg_trigger->tgrelid != relid)
1844 CacheInvalidateRelcacheByRelid(pg_trigger->tgrelid);
1846 /* free up our scratch memory */
1848 heap_freetuple(tuple);
1851 systable_endscan(trigscan);
1853 heap_close(tgrel, RowExclusiveLock);
1856 * Increment cmd counter to make updates visible; this is needed in case
1857 * the same tuple has to be updated again by next pass (can happen in case
1858 * of a self-referential FK relationship).
1860 CommandCounterIncrement();
1865 * Execute ALTER TABLE, which can be a list of subcommands
1867 * ALTER TABLE is performed in three phases:
1868 * 1. Examine subcommands and perform pre-transformation checking.
1869 * 2. Update system catalogs.
1870 * 3. Scan table(s) to check new constraints, and optionally recopy
1871 * the data into new table(s).
1872 * Phase 3 is not performed unless one or more of the subcommands requires
1873 * it. The intention of this design is to allow multiple independent
1874 * updates of the table schema to be performed with only one pass over the
1877 * ATPrepCmd performs phase 1. A "work queue" entry is created for
1878 * each table to be affected (there may be multiple affected tables if the
1879 * commands traverse a table inheritance hierarchy). Also we do preliminary
1880 * validation of the subcommands, including parse transformation of those
1881 * expressions that need to be evaluated with respect to the old table
1884 * ATRewriteCatalogs performs phase 2 for each affected table (note that
1885 * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
1886 * Certain subcommands need to be performed before others to avoid
1887 * unnecessary conflicts; for example, DROP COLUMN should come before
1888 * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple
1889 * lists, one for each logical "pass" of phase 2.
1891 * ATRewriteTables performs phase 3 for those tables that need it.
1893 * Thanks to the magic of MVCC, an error anywhere along the way rolls back
1894 * the whole operation; we don't have to do anything special to clean up.
1897 AlterTable(AlterTableStmt *stmt)
1899 ATController(relation_openrv(stmt->relation, AccessExclusiveLock),
1901 interpretInhOption(stmt->relation->inhOpt));
1905 * AlterTableInternal
1907 * ALTER TABLE with target specified by OID
1910 AlterTableInternal(Oid relid, List *cmds, bool recurse)
1912 ATController(relation_open(relid, AccessExclusiveLock),
1918 ATController(Relation rel, List *cmds, bool recurse)
1923 /* Phase 1: preliminary examination of commands, create work queue */
1926 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
1928 ATPrepCmd(&wqueue, rel, cmd, recurse, false);
1931 /* Close the relation, but keep lock until commit */
1932 relation_close(rel, NoLock);
1934 /* Phase 2: update system catalogs */
1935 ATRewriteCatalogs(&wqueue);
1937 /* Phase 3: scan/rewrite tables as needed */
1938 ATRewriteTables(&wqueue);
1944 * Traffic cop for ALTER TABLE Phase 1 operations, including simple
1945 * recursion and permission checks.
1947 * Caller must have acquired AccessExclusiveLock on relation already.
1948 * This lock should be held until commit.
1951 ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
1952 bool recurse, bool recursing)
1954 AlteredTableInfo *tab;
1957 /* Find or create work queue entry for this table */
1958 tab = ATGetQueueEntry(wqueue, rel);
1961 * Copy the original subcommand for each table. This avoids conflicts
1962 * when different child tables need to make different parse
1963 * transformations (for example, the same column may have different column
1964 * numbers in different children).
1966 cmd = copyObject(cmd);
1969 * Do permissions checking, recursion to child tables if needed, and any
1970 * additional phase-1 processing needed.
1972 switch (cmd->subtype)
1974 case AT_AddColumn: /* ADD COLUMN */
1975 ATSimplePermissions(rel, false);
1976 /* Performs own recursion */
1977 ATPrepAddColumn(wqueue, rel, recurse, cmd);
1978 pass = AT_PASS_ADD_COL;
1980 case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
1983 * We allow defaults on views so that INSERT into a view can have
1984 * default-ish behavior. This works because the rewriter
1985 * substitutes default values into INSERTs before it expands
1988 ATSimplePermissions(rel, true);
1989 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1990 /* No command-specific prep needed */
1991 pass = AT_PASS_ADD_CONSTR;
1993 case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
1994 ATSimplePermissions(rel, false);
1995 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1996 /* No command-specific prep needed */
1997 pass = AT_PASS_DROP;
1999 case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
2000 ATSimplePermissions(rel, false);
2001 ATSimpleRecursion(wqueue, rel, cmd, recurse);
2002 /* No command-specific prep needed */
2003 pass = AT_PASS_ADD_CONSTR;
2005 case AT_SetStatistics: /* ALTER COLUMN STATISTICS */
2006 ATSimpleRecursion(wqueue, rel, cmd, recurse);
2007 /* Performs own permission checks */
2008 ATPrepSetStatistics(rel, cmd->name, cmd->def);
2009 pass = AT_PASS_COL_ATTRS;
2011 case AT_SetStorage: /* ALTER COLUMN STORAGE */
2012 ATSimplePermissions(rel, false);
2013 ATSimpleRecursion(wqueue, rel, cmd, recurse);
2014 /* No command-specific prep needed */
2015 pass = AT_PASS_COL_ATTRS;
2017 case AT_DropColumn: /* DROP COLUMN */
2018 ATSimplePermissions(rel, false);
2019 /* Recursion occurs during execution phase */
2020 /* No command-specific prep needed except saving recurse flag */
2022 cmd->subtype = AT_DropColumnRecurse;
2023 pass = AT_PASS_DROP;
2025 case AT_AddIndex: /* ADD INDEX */
2026 ATSimplePermissions(rel, false);
2027 /* This command never recurses */
2028 /* No command-specific prep needed */
2029 pass = AT_PASS_ADD_INDEX;
2031 case AT_AddConstraint: /* ADD CONSTRAINT */
2032 ATSimplePermissions(rel, false);
2035 * Currently we recurse only for CHECK constraints, never for
2036 * foreign-key constraints. UNIQUE/PKEY constraints won't be seen
2039 if (IsA(cmd->def, Constraint))
2040 ATSimpleRecursion(wqueue, rel, cmd, recurse);
2041 /* No command-specific prep needed */
2042 pass = AT_PASS_ADD_CONSTR;
2044 case AT_DropConstraint: /* DROP CONSTRAINT */
2045 ATSimplePermissions(rel, false);
2046 /* Performs own recursion */
2047 ATPrepDropConstraint(wqueue, rel, recurse, cmd);
2048 pass = AT_PASS_DROP;
2050 case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
2051 ATSimplePermissions(rel, false);
2052 ATSimpleRecursion(wqueue, rel, cmd, recurse);
2053 /* No command-specific prep needed */
2054 pass = AT_PASS_DROP;
2056 case AT_AlterColumnType: /* ALTER COLUMN TYPE */
2057 ATSimplePermissions(rel, false);
2058 /* Performs own recursion */
2059 ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
2060 pass = AT_PASS_ALTER_TYPE;
2062 case AT_ToastTable: /* CREATE TOAST TABLE */
2063 ATSimplePermissions(rel, false);
2064 /* This command never recurses */
2065 /* No command-specific prep needed */
2066 pass = AT_PASS_MISC;
2068 case AT_ChangeOwner: /* ALTER OWNER */
2069 /* This command never recurses */
2070 /* No command-specific prep needed */
2071 pass = AT_PASS_MISC;
2073 case AT_ClusterOn: /* CLUSTER ON */
2074 case AT_DropCluster: /* SET WITHOUT CLUSTER */
2075 ATSimplePermissions(rel, false);
2076 /* These commands never recurse */
2077 /* No command-specific prep needed */
2078 pass = AT_PASS_MISC;
2080 case AT_DropOids: /* SET WITHOUT OIDS */
2081 ATSimplePermissions(rel, false);
2082 /* Performs own recursion */
2083 if (rel->rd_rel->relhasoids)
2085 AlterTableCmd *dropCmd = makeNode(AlterTableCmd);
2087 dropCmd->subtype = AT_DropColumn;
2088 dropCmd->name = pstrdup("oid");
2089 dropCmd->behavior = cmd->behavior;
2090 ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
2092 pass = AT_PASS_DROP;
2094 case AT_SetTableSpace: /* SET TABLESPACE */
2095 /* This command never recurses */
2096 ATPrepSetTableSpace(tab, rel, cmd->name);
2097 pass = AT_PASS_MISC; /* doesn't actually matter */
2099 case AT_EnableTrig: /* ENABLE TRIGGER variants */
2100 case AT_EnableTrigAll:
2101 case AT_EnableTrigUser:
2102 case AT_DisableTrig: /* DISABLE TRIGGER variants */
2103 case AT_DisableTrigAll:
2104 case AT_DisableTrigUser:
2105 case AT_AddInherits:
2106 case AT_DropInherits:
2107 ATSimplePermissions(rel, false);
2108 /* These commands never recurse */
2109 /* No command-specific prep needed */
2110 pass = AT_PASS_MISC;
2113 elog(ERROR, "unrecognized alter table type: %d",
2114 (int) cmd->subtype);
2115 pass = 0; /* keep compiler quiet */
2119 /* Add the subcommand to the appropriate list for phase 2 */
2120 tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd);
2126 * Traffic cop for ALTER TABLE Phase 2 operations. Subcommands are
2127 * dispatched in a "safe" execution order (designed to avoid unnecessary
2131 ATRewriteCatalogs(List **wqueue)
2137 * We process all the tables "in parallel", one pass at a time. This is
2138 * needed because we may have to propagate work from one table to another
2139 * (specifically, ALTER TYPE on a foreign key's PK has to dispatch the
2140 * re-adding of the foreign key constraint to the other table). Work can
2141 * only be propagated into later passes, however.
2143 for (pass = 0; pass < AT_NUM_PASSES; pass++)
2145 /* Go through each table that needs to be processed */
2146 foreach(ltab, *wqueue)
2148 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2149 List *subcmds = tab->subcmds[pass];
2157 * Exclusive lock was obtained by phase 1, needn't get it again
2159 rel = relation_open(tab->relid, NoLock);
2161 foreach(lcmd, subcmds)
2162 ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
2165 * After the ALTER TYPE pass, do cleanup work (this is not done in
2166 * ATExecAlterColumnType since it should be done only once if
2167 * multiple columns of a table are altered).
2169 if (pass == AT_PASS_ALTER_TYPE)
2170 ATPostAlterTypeCleanup(wqueue, tab);
2172 relation_close(rel, NoLock);
2177 * Do an implicit CREATE TOAST TABLE if we executed any subcommands that
2178 * might have added a column or changed column storage.
2180 foreach(ltab, *wqueue)
2182 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2184 if (tab->relkind == RELKIND_RELATION &&
2185 (tab->subcmds[AT_PASS_ADD_COL] ||
2186 tab->subcmds[AT_PASS_ALTER_TYPE] ||
2187 tab->subcmds[AT_PASS_COL_ATTRS]))
2188 AlterTableCreateToastTable(tab->relid, true);
2193 * ATExecCmd: dispatch a subcommand to appropriate execution routine
2196 ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
2198 switch (cmd->subtype)
2200 case AT_AddColumn: /* ADD COLUMN */
2201 ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
2203 case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
2204 ATExecColumnDefault(rel, cmd->name, cmd->def);
2206 case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
2207 ATExecDropNotNull(rel, cmd->name);
2209 case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
2210 ATExecSetNotNull(tab, rel, cmd->name);
2212 case AT_SetStatistics: /* ALTER COLUMN STATISTICS */
2213 ATExecSetStatistics(rel, cmd->name, cmd->def);
2215 case AT_SetStorage: /* ALTER COLUMN STORAGE */
2216 ATExecSetStorage(rel, cmd->name, cmd->def);
2218 case AT_DropColumn: /* DROP COLUMN */
2219 ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
2221 case AT_DropColumnRecurse: /* DROP COLUMN with recursion */
2222 ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
2224 case AT_AddIndex: /* ADD INDEX */
2225 ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
2227 case AT_ReAddIndex: /* ADD INDEX */
2228 ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
2230 case AT_AddConstraint: /* ADD CONSTRAINT */
2231 ATExecAddConstraint(tab, rel, cmd->def);
2233 case AT_DropConstraint: /* DROP CONSTRAINT */
2234 ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
2236 case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
2237 ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
2239 case AT_AlterColumnType: /* ALTER COLUMN TYPE */
2240 ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
2242 case AT_ToastTable: /* CREATE TOAST TABLE */
2243 AlterTableCreateToastTable(RelationGetRelid(rel), false);
2245 case AT_ChangeOwner: /* ALTER OWNER */
2246 ATExecChangeOwner(RelationGetRelid(rel),
2247 get_roleid_checked(cmd->name),
2250 case AT_ClusterOn: /* CLUSTER ON */
2251 ATExecClusterOn(rel, cmd->name);
2253 case AT_DropCluster: /* SET WITHOUT CLUSTER */
2254 ATExecDropCluster(rel);
2256 case AT_DropOids: /* SET WITHOUT OIDS */
2259 * Nothing to do here; we'll have generated a DropColumn
2260 * subcommand to do the real work
2263 case AT_SetTableSpace: /* SET TABLESPACE */
2266 * Nothing to do here; Phase 3 does the work
2269 case AT_EnableTrig: /* ENABLE TRIGGER name */
2270 ATExecEnableDisableTrigger(rel, cmd->name, true, false);
2272 case AT_DisableTrig: /* DISABLE TRIGGER name */
2273 ATExecEnableDisableTrigger(rel, cmd->name, false, false);
2275 case AT_EnableTrigAll: /* ENABLE TRIGGER ALL */
2276 ATExecEnableDisableTrigger(rel, NULL, true, false);
2278 case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */
2279 ATExecEnableDisableTrigger(rel, NULL, false, false);
2281 case AT_EnableTrigUser: /* ENABLE TRIGGER USER */
2282 ATExecEnableDisableTrigger(rel, NULL, true, true);
2284 case AT_DisableTrigUser: /* DISABLE TRIGGER USER */
2285 ATExecEnableDisableTrigger(rel, NULL, false, true);
2287 case AT_DropInherits:
2288 ATExecDropInherits(rel, cmd->parent);
2290 case AT_AddInherits:
2291 ATExecAddInherits(rel, cmd->parent);
2294 elog(ERROR, "unrecognized alter table type: %d",
2295 (int) cmd->subtype);
2300 * Bump the command counter to ensure the next subcommand in the sequence
2301 * can see the changes so far
2303 CommandCounterIncrement();
2307 * ATRewriteTables: ALTER TABLE phase 3
2310 ATRewriteTables(List **wqueue)
2314 /* Go through each table that needs to be checked or rewritten */
2315 foreach(ltab, *wqueue)
2317 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2320 * We only need to rewrite the table if at least one column needs to
2323 if (tab->newvals != NIL)
2325 /* Build a temporary relation and copy data */
2327 char NewHeapName[NAMEDATALEN];
2330 ObjectAddress object;
2332 OldHeap = heap_open(tab->relid, NoLock);
2335 * We can never allow rewriting of shared or nailed-in-cache
2336 * relations, because we can't support changing their relfilenode
2339 if (OldHeap->rd_rel->relisshared || OldHeap->rd_isnailed)
2341 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2342 errmsg("cannot rewrite system relation \"%s\"",
2343 RelationGetRelationName(OldHeap))));
2346 * Don't allow rewrite on temp tables of other backends ... their
2347 * local buffer manager is not going to cope.
2349 if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
2351 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2352 errmsg("cannot rewrite temporary tables of other sessions")));
2355 * Select destination tablespace (same as original unless user
2356 * requested a change)
2358 if (tab->newTableSpace)
2359 NewTableSpace = tab->newTableSpace;
2361 NewTableSpace = OldHeap->rd_rel->reltablespace;
2363 heap_close(OldHeap, NoLock);
2366 * Create the new heap, using a temporary name in the same
2367 * namespace as the existing table. NOTE: there is some risk of
2368 * collision with user relnames. Working around this seems more
2369 * trouble than it's worth; in particular, we can't create the new
2370 * heap in a different namespace from the old, or we will have
2371 * problems with the TEMP status of temp tables.
2373 snprintf(NewHeapName, sizeof(NewHeapName),
2374 "pg_temp_%u", tab->relid);
2376 OIDNewHeap = make_new_heap(tab->relid, NewHeapName, NewTableSpace);
2379 * Copy the heap data into the new table with the desired
2380 * modifications, and test the current data within the table
2381 * against new constraints generated by ALTER TABLE commands.
2383 ATRewriteTable(tab, OIDNewHeap);
2385 /* Swap the physical files of the old and new heaps. */
2386 swap_relation_files(tab->relid, OIDNewHeap);
2388 CommandCounterIncrement();
2390 /* Destroy new heap with old filenode */
2391 object.classId = RelationRelationId;
2392 object.objectId = OIDNewHeap;
2393 object.objectSubId = 0;
2396 * The new relation is local to our transaction and we know
2397 * nothing depends on it, so DROP_RESTRICT should be OK.
2399 performDeletion(&object, DROP_RESTRICT);
2400 /* performDeletion does CommandCounterIncrement at end */
2403 * Rebuild each index on the relation (but not the toast table,
2404 * which is all-new anyway). We do not need
2405 * CommandCounterIncrement() because reindex_relation does it.
2407 reindex_relation(tab->relid, false);
2412 * Test the current data within the table against new constraints
2413 * generated by ALTER TABLE commands, but don't rebuild data.
2415 if (tab->constraints != NIL)
2416 ATRewriteTable(tab, InvalidOid);
2419 * If we had SET TABLESPACE but no reason to reconstruct tuples,
2420 * just do a block-by-block copy.
2422 if (tab->newTableSpace)
2423 ATExecSetTableSpace(tab->relid, tab->newTableSpace);
2428 * Foreign key constraints are checked in a final pass, since (a) it's
2429 * generally best to examine each one separately, and (b) it's at least
2430 * theoretically possible that we have changed both relations of the
2431 * foreign key, and we'd better have finished both rewrites before we try
2432 * to read the tables.
2434 foreach(ltab, *wqueue)
2436 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2437 Relation rel = NULL;
2440 foreach(lcon, tab->constraints)
2442 NewConstraint *con = lfirst(lcon);
2444 if (con->contype == CONSTR_FOREIGN)
2446 FkConstraint *fkconstraint = (FkConstraint *) con->qual;
2451 /* Long since locked, no need for another */
2452 rel = heap_open(tab->relid, NoLock);
2455 refrel = heap_open(con->refrelid, RowShareLock);
2457 validateForeignKeyConstraint(fkconstraint, rel, refrel);
2459 heap_close(refrel, NoLock);
2464 heap_close(rel, NoLock);
2469 * ATRewriteTable: scan or rewrite one table
2471 * OIDNewHeap is InvalidOid if we don't need to rewrite
2474 ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
2478 TupleDesc oldTupDesc;
2479 TupleDesc newTupDesc;
2480 bool needscan = false;
2486 * Open the relation(s). We have surely already locked the existing
2489 oldrel = heap_open(tab->relid, NoLock);
2490 oldTupDesc = tab->oldDesc;
2491 newTupDesc = RelationGetDescr(oldrel); /* includes all mods */
2493 if (OidIsValid(OIDNewHeap))
2494 newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
2499 * If we need to rewrite the table, the operation has to be propagated to
2500 * tables that use this table's rowtype as a column type.
2502 * (Eventually this will probably become true for scans as well, but at
2503 * the moment a composite type does not enforce any constraints, so it's
2504 * not necessary/appropriate to enforce them just during ALTER.)
2507 find_composite_type_dependencies(oldrel->rd_rel->reltype,
2508 RelationGetRelationName(oldrel));
2511 * Generate the constraint and default execution states
2514 estate = CreateExecutorState();
2516 /* Build the needed expression execution states */
2517 foreach(l, tab->constraints)
2519 NewConstraint *con = lfirst(l);
2521 switch (con->contype)
2525 con->qualstate = (List *)
2526 ExecPrepareExpr((Expr *) con->qual, estate);
2528 case CONSTR_FOREIGN:
2529 /* Nothing to do here */
2531 case CONSTR_NOTNULL:
2535 elog(ERROR, "unrecognized constraint type: %d",
2536 (int) con->contype);
2540 foreach(l, tab->newvals)
2542 NewColumnValue *ex = lfirst(l);
2546 ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
2551 ExprContext *econtext;
2554 TupleTableSlot *oldslot;
2555 TupleTableSlot *newslot;
2558 MemoryContext oldCxt;
2559 List *dropped_attrs = NIL;
2562 econtext = GetPerTupleExprContext(estate);
2565 * Make tuple slots for old and new tuples. Note that even when the
2566 * tuples are the same, the tupDescs might not be (consider ADD COLUMN
2567 * without a default).
2569 oldslot = MakeSingleTupleTableSlot(oldTupDesc);
2570 newslot = MakeSingleTupleTableSlot(newTupDesc);
2572 /* Preallocate values/isnull arrays */
2573 i = Max(newTupDesc->natts, oldTupDesc->natts);
2574 values = (Datum *) palloc(i * sizeof(Datum));
2575 isnull = (bool *) palloc(i * sizeof(bool));
2576 memset(values, 0, i * sizeof(Datum));
2577 memset(isnull, true, i * sizeof(bool));
2580 * Any attributes that are dropped according to the new tuple
2581 * descriptor can be set to NULL. We precompute the list of dropped
2582 * attributes to avoid needing to do so in the per-tuple loop.
2584 for (i = 0; i < newTupDesc->natts; i++)
2586 if (newTupDesc->attrs[i]->attisdropped)
2587 dropped_attrs = lappend_int(dropped_attrs, i);
2591 * Scan through the rows, generating a new row if needed and then
2592 * checking all the constraints.
2594 scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL);
2597 * Switch to per-tuple memory context and reset it for each tuple
2598 * produced, so we don't leak memory.
2600 oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2602 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2606 Oid tupOid = InvalidOid;
2608 /* Extract data from old tuple */
2609 heap_deform_tuple(tuple, oldTupDesc, values, isnull);
2610 if (oldTupDesc->tdhasoid)
2611 tupOid = HeapTupleGetOid(tuple);
2613 /* Set dropped attributes to null in new tuple */
2614 foreach(lc, dropped_attrs)
2615 isnull[lfirst_int(lc)] = true;
2618 * Process supplied expressions to replace selected columns.
2619 * Expression inputs come from the old tuple.
2621 ExecStoreTuple(tuple, oldslot, InvalidBuffer, false);
2622 econtext->ecxt_scantuple = oldslot;
2624 foreach(l, tab->newvals)
2626 NewColumnValue *ex = lfirst(l);
2628 values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate,
2630 &isnull[ex->attnum - 1],
2635 * Form the new tuple. Note that we don't explicitly pfree it,
2636 * since the per-tuple memory context will be reset shortly.
2638 tuple = heap_form_tuple(newTupDesc, values, isnull);
2640 /* Preserve OID, if any */
2641 if (newTupDesc->tdhasoid)
2642 HeapTupleSetOid(tuple, tupOid);
2645 /* Now check any constraints on the possibly-changed tuple */
2646 ExecStoreTuple(tuple, newslot, InvalidBuffer, false);
2647 econtext->ecxt_scantuple = newslot;
2649 foreach(l, tab->constraints)
2651 NewConstraint *con = lfirst(l);
2653 switch (con->contype)
2656 if (!ExecQual(con->qualstate, econtext, true))
2658 (errcode(ERRCODE_CHECK_VIOLATION),
2659 errmsg("check constraint \"%s\" is violated by some row",
2662 case CONSTR_NOTNULL:
2667 d = heap_getattr(tuple, con->attnum, newTupDesc,
2671 (errcode(ERRCODE_NOT_NULL_VIOLATION),
2672 errmsg("column \"%s\" contains null values",
2673 get_attname(tab->relid,
2677 case CONSTR_FOREIGN:
2678 /* Nothing to do here */
2681 elog(ERROR, "unrecognized constraint type: %d",
2682 (int) con->contype);
2686 /* Write the tuple out to the new relation */
2688 simple_heap_insert(newrel, tuple);
2690 ResetExprContext(econtext);
2692 CHECK_FOR_INTERRUPTS();
2695 MemoryContextSwitchTo(oldCxt);
2698 ExecDropSingleTupleTableSlot(oldslot);
2699 ExecDropSingleTupleTableSlot(newslot);
2702 FreeExecutorState(estate);
2704 heap_close(oldrel, NoLock);
2706 heap_close(newrel, NoLock);
2710 * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue
2712 static AlteredTableInfo *
2713 ATGetQueueEntry(List **wqueue, Relation rel)
2715 Oid relid = RelationGetRelid(rel);
2716 AlteredTableInfo *tab;
2719 foreach(ltab, *wqueue)
2721 tab = (AlteredTableInfo *) lfirst(ltab);
2722 if (tab->relid == relid)
2727 * Not there, so add it. Note that we make a copy of the relation's
2728 * existing descriptor before anything interesting can happen to it.
2730 tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
2732 tab->relkind = rel->rd_rel->relkind;
2733 tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel));
2735 *wqueue = lappend(*wqueue, tab);
2741 * ATSimplePermissions
2743 * - Ensure that it is a relation (or possibly a view)
2744 * - Ensure this user is the owner
2745 * - Ensure that it is not a system table
2748 ATSimplePermissions(Relation rel, bool allowView)
2750 if (rel->rd_rel->relkind != RELKIND_RELATION)
2754 if (rel->rd_rel->relkind != RELKIND_VIEW)
2756 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2757 errmsg("\"%s\" is not a table or view",
2758 RelationGetRelationName(rel))));
2762 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2763 errmsg("\"%s\" is not a table",
2764 RelationGetRelationName(rel))));
2767 /* Permissions checks */
2768 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
2769 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
2770 RelationGetRelationName(rel));
2772 if (!allowSystemTableMods && IsSystemRelation(rel))
2774 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2775 errmsg("permission denied: \"%s\" is a system catalog",
2776 RelationGetRelationName(rel))));
2782 * Simple table recursion sufficient for most ALTER TABLE operations.
2783 * All direct and indirect children are processed in an unspecified order.
2784 * Note that if a child inherits from the original table via multiple
2785 * inheritance paths, it will be visited just once.
2788 ATSimpleRecursion(List **wqueue, Relation rel,
2789 AlterTableCmd *cmd, bool recurse)
2792 * Propagate to children if desired. Non-table relations never have
2793 * children, so no need to search in that case.
2795 if (recurse && rel->rd_rel->relkind == RELKIND_RELATION)
2797 Oid relid = RelationGetRelid(rel);
2801 /* this routine is actually in the planner */
2802 children = find_all_inheritors(relid);
2805 * find_all_inheritors does the recursive search of the inheritance
2806 * hierarchy, so all we have to do is process all of the relids in the
2807 * list that it returns.
2809 foreach(child, children)
2811 Oid childrelid = lfirst_oid(child);
2814 if (childrelid == relid)
2816 childrel = relation_open(childrelid, AccessExclusiveLock);
2817 ATPrepCmd(wqueue, childrel, cmd, false, true);
2818 relation_close(childrel, NoLock);
2824 * ATOneLevelRecursion
2826 * Here, we visit only direct inheritance children. It is expected that
2827 * the command's prep routine will recurse again to find indirect children.
2828 * When using this technique, a multiply-inheriting child will be visited
2832 ATOneLevelRecursion(List **wqueue, Relation rel,
2835 Oid relid = RelationGetRelid(rel);
2839 /* this routine is actually in the planner */
2840 children = find_inheritance_children(relid);
2842 foreach(child, children)
2844 Oid childrelid = lfirst_oid(child);
2847 childrel = relation_open(childrelid, AccessExclusiveLock);
2848 ATPrepCmd(wqueue, childrel, cmd, true, true);
2849 relation_close(childrel, NoLock);
2855 * find_composite_type_dependencies
2857 * Check to see if a table's rowtype is being used as a column in some
2858 * other table (possibly nested several levels deep in composite types!).
2859 * Eventually, we'd like to propagate the check or rewrite operation
2860 * into other such tables, but for now, just error out if we find any.
2862 * We assume that functions and views depending on the type are not reasons
2863 * to reject the ALTER. (How safe is this really?)
2866 find_composite_type_dependencies(Oid typeOid, const char *origTblName)
2870 SysScanDesc depScan;
2874 * We scan pg_depend to find those things that depend on the rowtype. (We
2875 * assume we can ignore refobjsubid for a rowtype.)
2877 depRel = heap_open(DependRelationId, AccessShareLock);
2879 ScanKeyInit(&key[0],
2880 Anum_pg_depend_refclassid,
2881 BTEqualStrategyNumber, F_OIDEQ,
2882 ObjectIdGetDatum(TypeRelationId));
2883 ScanKeyInit(&key[1],
2884 Anum_pg_depend_refobjid,
2885 BTEqualStrategyNumber, F_OIDEQ,
2886 ObjectIdGetDatum(typeOid));
2888 depScan = systable_beginscan(depRel, DependReferenceIndexId, true,
2889 SnapshotNow, 2, key);
2891 while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
2893 Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
2895 Form_pg_attribute att;
2897 /* Ignore dependees that aren't user columns of relations */
2898 /* (we assume system columns are never of rowtypes) */
2899 if (pg_depend->classid != RelationRelationId ||
2900 pg_depend->objsubid <= 0)
2903 rel = relation_open(pg_depend->objid, AccessShareLock);
2904 att = rel->rd_att->attrs[pg_depend->objsubid - 1];
2906 if (rel->rd_rel->relkind == RELKIND_RELATION)
2909 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2910 errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype",
2912 RelationGetRelationName(rel),
2913 NameStr(att->attname))));
2915 else if (OidIsValid(rel->rd_rel->reltype))
2918 * A view or composite type itself isn't a problem, but we must
2919 * recursively check for indirect dependencies via its rowtype.
2921 find_composite_type_dependencies(rel->rd_rel->reltype,
2925 relation_close(rel, AccessShareLock);
2928 systable_endscan(depScan);
2930 relation_close(depRel, AccessShareLock);
2935 * ALTER TABLE ADD COLUMN
2937 * Adds an additional attribute to a relation making the assumption that
2938 * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
2939 * AT_AddColumn AlterTableCmd by analyze.c and added as independent
2943 ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
2947 * Recurse to add the column to child classes, if requested.
2949 * We must recurse one level at a time, so that multiply-inheriting
2950 * children are visited the right number of times and end up with the
2951 * right attinhcount.
2955 AlterTableCmd *childCmd = copyObject(cmd);
2956 ColumnDef *colDefChild = (ColumnDef *) childCmd->def;
2958 /* Child should see column as singly inherited */
2959 colDefChild->inhcount = 1;
2960 colDefChild->is_local = false;
2961 /* and don't make a support dependency on the child */
2962 colDefChild->support = NULL;
2964 ATOneLevelRecursion(wqueue, rel, childCmd);
2969 * If we are told not to recurse, there had better not be any child
2970 * tables; else the addition would put them out of step.
2972 if (find_inheritance_children(RelationGetRelid(rel)) != NIL)
2974 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2975 errmsg("column must be added to child tables too")));
2980 ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
2983 Oid myrelid = RelationGetRelid(rel);
2987 HeapTuple attributeTuple;
2988 Form_pg_attribute attribute;
2989 FormData_pg_attribute attributeD;
2993 HeapTuple typeTuple;
2998 attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
3001 * Are we adding the column to a recursion child? If so, check whether to
3002 * merge with an existing definition for the column.
3004 if (colDef->inhcount > 0)
3008 /* Does child already have a column by this name? */
3009 tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname);
3010 if (HeapTupleIsValid(tuple))
3012 Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
3014 /* Okay if child matches by type */
3015 if (typenameTypeId(NULL, colDef->typename) != childatt->atttypid ||
3016 colDef->typename->typmod != childatt->atttypmod)
3018 (errcode(ERRCODE_DATATYPE_MISMATCH),
3019 errmsg("child table \"%s\" has different type for column \"%s\"",
3020 RelationGetRelationName(rel), colDef->colname)));
3022 /* Bump the existing child att's inhcount */
3023 childatt->attinhcount++;
3024 simple_heap_update(attrdesc, &tuple->t_self, tuple);
3025 CatalogUpdateIndexes(attrdesc, tuple);
3027 heap_freetuple(tuple);
3029 /* Inform the user about the merge */
3031 (errmsg("merging definition of column \"%s\" for child \"%s\"",
3032 colDef->colname, RelationGetRelationName(rel))));
3034 heap_close(attrdesc, RowExclusiveLock);
3039 pgclass = heap_open(RelationRelationId, RowExclusiveLock);
3041 reltup = SearchSysCacheCopy(RELOID,
3042 ObjectIdGetDatum(myrelid),
3044 if (!HeapTupleIsValid(reltup))
3045 elog(ERROR, "cache lookup failed for relation %u", myrelid);
3048 * this test is deliberately not attisdropped-aware, since if one tries to
3049 * add a column matching a dropped column name, it's gonna fail anyway.
3051 if (SearchSysCacheExists(ATTNAME,
3052 ObjectIdGetDatum(myrelid),
3053 PointerGetDatum(colDef->colname),
3056 (errcode(ERRCODE_DUPLICATE_COLUMN),
3057 errmsg("column \"%s\" of relation \"%s\" already exists",
3058 colDef->colname, RelationGetRelationName(rel))));
3060 minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts;
3061 maxatts = minattnum + 1;
3062 if (maxatts > MaxHeapAttributeNumber)
3064 (errcode(ERRCODE_TOO_MANY_COLUMNS),
3065 errmsg("tables can have at most %d columns",
3066 MaxHeapAttributeNumber)));
3069 typeTuple = typenameType(NULL, colDef->typename);
3070 tform = (Form_pg_type) GETSTRUCT(typeTuple);
3071 typeOid = HeapTupleGetOid(typeTuple);
3073 /* make sure datatype is legal for a column */
3074 CheckAttributeType(colDef->colname, typeOid);
3076 attributeTuple = heap_addheader(Natts_pg_attribute,
3078 ATTRIBUTE_TUPLE_SIZE,
3079 (void *) &attributeD);
3081 attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple);
3083 attribute->attrelid = myrelid;
3084 namestrcpy(&(attribute->attname), colDef->colname);
3085 attribute->atttypid = typeOid;
3086 attribute->attstattarget = -1;
3087 attribute->attlen = tform->typlen;
3088 attribute->attcacheoff = -1;
3089 attribute->atttypmod = colDef->typename->typmod;
3090 attribute->attnum = i;
3091 attribute->attbyval = tform->typbyval;
3092 attribute->attndims = list_length(colDef->typename->arrayBounds);
3093 attribute->attstorage = tform->typstorage;
3094 attribute->attalign = tform->typalign;
3095 attribute->attnotnull = colDef->is_not_null;
3096 attribute->atthasdef = false;
3097 attribute->attisdropped = false;
3098 attribute->attislocal = colDef->is_local;
3099 attribute->attinhcount = colDef->inhcount;
3101 ReleaseSysCache(typeTuple);
3103 simple_heap_insert(attrdesc, attributeTuple);
3105 /* Update indexes on pg_attribute */
3106 CatalogUpdateIndexes(attrdesc, attributeTuple);
3108 heap_close(attrdesc, RowExclusiveLock);
3111 * Update number of attributes in pg_class tuple
3113 ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts;
3115 simple_heap_update(pgclass, &reltup->t_self, reltup);
3117 /* keep catalog indexes current */
3118 CatalogUpdateIndexes(pgclass, reltup);
3120 heap_freetuple(reltup);
3122 heap_close(pgclass, RowExclusiveLock);
3124 /* Make the attribute's catalog entry visible */
3125 CommandCounterIncrement();
3128 * Store the DEFAULT, if any, in the catalogs
3130 if (colDef->raw_default)
3132 RawColumnDefault *rawEnt;
3134 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3135 rawEnt->attnum = attribute->attnum;
3136 rawEnt->raw_default = copyObject(colDef->raw_default);
3139 * This function is intended for CREATE TABLE, so it processes a
3140 * _list_ of defaults, but we just do one.
3142 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3144 /* Make the additional catalog changes visible */
3145 CommandCounterIncrement();
3149 * Tell Phase 3 to fill in the default expression, if there is one.
3151 * If there is no default, Phase 3 doesn't have to do anything, because
3152 * that effectively means that the default is NULL. The heap tuple access
3153 * routines always check for attnum > # of attributes in tuple, and return
3154 * NULL if so, so without any modification of the tuple data we will get
3155 * the effect of NULL values in the new column.
3157 * An exception occurs when the new column is of a domain type: the domain
3158 * might have a NOT NULL constraint, or a check constraint that indirectly
3159 * rejects nulls. If there are any domain constraints then we construct
3160 * an explicit NULL default value that will be passed through
3161 * CoerceToDomain processing. (This is a tad inefficient, since it causes
3162 * rewriting the table which we really don't have to do, but the present
3163 * design of domain processing doesn't offer any simple way of checking
3164 * the constraints more directly.)
3166 * Note: we use build_column_default, and not just the cooked default
3167 * returned by AddRelationRawConstraints, so that the right thing happens
3168 * when a datatype's default applies.
3170 defval = (Expr *) build_column_default(rel, attribute->attnum);
3172 if (!defval && GetDomainConstraints(typeOid) != NIL)
3174 Oid basetype = getBaseType(typeOid);
3176 defval = (Expr *) makeNullConst(basetype);
3177 defval = (Expr *) coerce_to_target_type(NULL,
3181 colDef->typename->typmod,
3182 COERCION_ASSIGNMENT,
3183 COERCE_IMPLICIT_CAST);
3184 if (defval == NULL) /* should not happen */
3185 elog(ERROR, "failed to coerce base type to domain");
3190 NewColumnValue *newval;
3192 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
3193 newval->attnum = attribute->attnum;
3194 newval->expr = defval;
3196 tab->newvals = lappend(tab->newvals, newval);
3200 * Add needed dependency entries for the new column.
3202 add_column_datatype_dependency(myrelid, i, attribute->atttypid);
3203 if (colDef->support != NULL)
3204 add_column_support_dependency(myrelid, i, colDef->support);
3208 * Install a column's dependency on its datatype.
3211 add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
3213 ObjectAddress myself,
3216 myself.classId = RelationRelationId;
3217 myself.objectId = relid;
3218 myself.objectSubId = attnum;
3219 referenced.classId = TypeRelationId;
3220 referenced.objectId = typid;
3221 referenced.objectSubId = 0;
3222 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3226 * Install a dependency for a column's supporting relation (serial sequence).
3229 add_column_support_dependency(Oid relid, int32 attnum, RangeVar *support)
3231 ObjectAddress colobject,
3234 colobject.classId = RelationRelationId;
3235 colobject.objectId = relid;
3236 colobject.objectSubId = attnum;
3237 suppobject.classId = RelationRelationId;
3238 suppobject.objectId = RangeVarGetRelid(support, false);
3239 suppobject.objectSubId = 0;
3240 recordDependencyOn(&suppobject, &colobject, DEPENDENCY_INTERNAL);
3244 * ALTER TABLE ALTER COLUMN DROP NOT NULL
3247 ATExecDropNotNull(Relation rel, const char *colName)
3253 ListCell *indexoidscan;
3256 * lookup the attribute
3258 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3260 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3262 if (!HeapTupleIsValid(tuple))
3264 (errcode(ERRCODE_UNDEFINED_COLUMN),
3265 errmsg("column \"%s\" of relation \"%s\" does not exist",
3266 colName, RelationGetRelationName(rel))));
3268 attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3270 /* Prevent them from altering a system attribute */
3273 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3274 errmsg("cannot alter system column \"%s\"",
3278 * Check that the attribute is not in a primary key
3281 /* Loop over all indexes on the relation */
3282 indexoidlist = RelationGetIndexList(rel);
3284 foreach(indexoidscan, indexoidlist)
3286 Oid indexoid = lfirst_oid(indexoidscan);
3287 HeapTuple indexTuple;
3288 Form_pg_index indexStruct;
3291 indexTuple = SearchSysCache(INDEXRELID,
3292 ObjectIdGetDatum(indexoid),
3294 if (!HeapTupleIsValid(indexTuple))
3295 elog(ERROR, "cache lookup failed for index %u", indexoid);
3296 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
3298 /* If the index is not a primary key, skip the check */
3299 if (indexStruct->indisprimary)
3302 * Loop over each attribute in the primary key and see if it
3303 * matches the to-be-altered attribute
3305 for (i = 0; i < indexStruct->indnatts; i++)
3307 if (indexStruct->indkey.values[i] == attnum)
3309 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3310 errmsg("column \"%s\" is in a primary key",
3315 ReleaseSysCache(indexTuple);
3318 list_free(indexoidlist);
3321 * Okay, actually perform the catalog change ... if needed
3323 if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3325 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE;
3327 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3329 /* keep the system catalog indexes current */
3330 CatalogUpdateIndexes(attr_rel, tuple);
3333 heap_close(attr_rel, RowExclusiveLock);
3337 * ALTER TABLE ALTER COLUMN SET NOT NULL
3340 ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
3341 const char *colName)
3346 NewConstraint *newcon;
3349 * lookup the attribute
3351 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3353 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3355 if (!HeapTupleIsValid(tuple))
3357 (errcode(ERRCODE_UNDEFINED_COLUMN),
3358 errmsg("column \"%s\" of relation \"%s\" does not exist",
3359 colName, RelationGetRelationName(rel))));
3361 attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3363 /* Prevent them from altering a system attribute */
3366 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3367 errmsg("cannot alter system column \"%s\"",
3371 * Okay, actually perform the catalog change ... if needed
3373 if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3375 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE;
3377 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3379 /* keep the system catalog indexes current */
3380 CatalogUpdateIndexes(attr_rel, tuple);
3382 /* Tell Phase 3 to test the constraint */
3383 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3384 newcon->contype = CONSTR_NOTNULL;
3385 newcon->attnum = attnum;
3386 newcon->name = "NOT NULL";
3388 tab->constraints = lappend(tab->constraints, newcon);
3391 heap_close(attr_rel, RowExclusiveLock);
3395 * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT
3398 ATExecColumnDefault(Relation rel, const char *colName,
3404 * get the number of the attribute
3406 attnum = get_attnum(RelationGetRelid(rel), colName);
3407 if (attnum == InvalidAttrNumber)
3409 (errcode(ERRCODE_UNDEFINED_COLUMN),
3410 errmsg("column \"%s\" of relation \"%s\" does not exist",
3411 colName, RelationGetRelationName(rel))));
3413 /* Prevent them from altering a system attribute */
3416 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3417 errmsg("cannot alter system column \"%s\"",
3421 * Remove any old default for the column. We use RESTRICT here for
3422 * safety, but at present we do not expect anything to depend on the
3425 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false);
3430 RawColumnDefault *rawEnt;
3432 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3433 rawEnt->attnum = attnum;
3434 rawEnt->raw_default = newDefault;
3437 * This function is intended for CREATE TABLE, so it processes a
3438 * _list_ of defaults, but we just do one.
3440 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3445 * ALTER TABLE ALTER COLUMN SET STATISTICS
3448 ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue)
3451 * We do our own permission checking because (a) we want to allow SET
3452 * STATISTICS on indexes (for expressional index columns), and (b) we want
3453 * to allow SET STATISTICS on system catalogs without requiring
3454 * allowSystemTableMods to be turned on.
3456 if (rel->rd_rel->relkind != RELKIND_RELATION &&
3457 rel->rd_rel->relkind != RELKIND_INDEX)
3459 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3460 errmsg("\"%s\" is not a table or index",
3461 RelationGetRelationName(rel))));
3463 /* Permissions checks */
3464 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
3465 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
3466 RelationGetRelationName(rel));
3470 ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
3473 Relation attrelation;
3475 Form_pg_attribute attrtuple;
3477 Assert(IsA(newValue, Integer));
3478 newtarget = intVal(newValue);
3481 * Limit target to a sane range
3486 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3487 errmsg("statistics target %d is too low",
3490 else if (newtarget > 1000)
3494 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3495 errmsg("lowering statistics target to %d",
3499 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3501 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3503 if (!HeapTupleIsValid(tuple))
3505 (errcode(ERRCODE_UNDEFINED_COLUMN),
3506 errmsg("column \"%s\" of relation \"%s\" does not exist",
3507 colName, RelationGetRelationName(rel))));
3508 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3510 if (attrtuple->attnum <= 0)
3512 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3513 errmsg("cannot alter system column \"%s\"",
3516 attrtuple->attstattarget = newtarget;
3518 simple_heap_update(attrelation, &tuple->t_self, tuple);
3520 /* keep system catalog indexes current */
3521 CatalogUpdateIndexes(attrelation, tuple);
3523 heap_freetuple(tuple);
3525 heap_close(attrelation, RowExclusiveLock);
3529 * ALTER TABLE ALTER COLUMN SET STORAGE
3532 ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
3536 Relation attrelation;
3538 Form_pg_attribute attrtuple;
3540 Assert(IsA(newValue, String));
3541 storagemode = strVal(newValue);
3543 if (pg_strcasecmp(storagemode, "plain") == 0)
3545 else if (pg_strcasecmp(storagemode, "external") == 0)
3547 else if (pg_strcasecmp(storagemode, "extended") == 0)
3549 else if (pg_strcasecmp(storagemode, "main") == 0)
3554 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3555 errmsg("invalid storage type \"%s\"",
3557 newstorage = 0; /* keep compiler quiet */
3560 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3562 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3564 if (!HeapTupleIsValid(tuple))
3566 (errcode(ERRCODE_UNDEFINED_COLUMN),
3567 errmsg("column \"%s\" of relation \"%s\" does not exist",
3568 colName, RelationGetRelationName(rel))));
3569 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3571 if (attrtuple->attnum <= 0)
3573 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3574 errmsg("cannot alter system column \"%s\"",
3578 * safety check: do not allow toasted storage modes unless column datatype
3581 if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid))
3582 attrtuple->attstorage = newstorage;
3585 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3586 errmsg("column data type %s can only have storage PLAIN",
3587 format_type_be(attrtuple->atttypid))));
3589 simple_heap_update(attrelation, &tuple->t_self, tuple);
3591 /* keep system catalog indexes current */
3592 CatalogUpdateIndexes(attrelation, tuple);
3594 heap_freetuple(tuple);
3596 heap_close(attrelation, RowExclusiveLock);
3601 * ALTER TABLE DROP COLUMN
3603 * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism,
3604 * because we have to decide at runtime whether to recurse or not depending
3605 * on whether attinhcount goes to zero or not. (We can't check this in a
3606 * static pre-pass because it won't handle multiple inheritance situations
3607 * correctly.) Since DROP COLUMN doesn't need to create any work queue
3608 * entries for Phase 3, it's okay to recurse internally in this routine
3609 * without considering the work queue.
3612 ATExecDropColumn(Relation rel, const char *colName,
3613 DropBehavior behavior,
3614 bool recurse, bool recursing)
3617 Form_pg_attribute targetatt;
3620 ObjectAddress object;
3622 /* At top level, permission check was done in ATPrepCmd, else do it */
3624 ATSimplePermissions(rel, false);
3627 * get the number of the attribute
3629 tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
3630 if (!HeapTupleIsValid(tuple))
3632 (errcode(ERRCODE_UNDEFINED_COLUMN),
3633 errmsg("column \"%s\" of relation \"%s\" does not exist",
3634 colName, RelationGetRelationName(rel))));
3635 targetatt = (Form_pg_attribute) GETSTRUCT(tuple);
3637 attnum = targetatt->attnum;
3639 /* Can't drop a system attribute, except OID */
3640 if (attnum <= 0 && attnum != ObjectIdAttributeNumber)
3642 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3643 errmsg("cannot drop system column \"%s\"",
3646 /* Don't drop inherited columns */
3647 if (targetatt->attinhcount > 0 && !recursing)
3649 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3650 errmsg("cannot drop inherited column \"%s\"",
3653 ReleaseSysCache(tuple);
3656 * Propagate to children as appropriate. Unlike most other ALTER
3657 * routines, we have to do this one level of recursion at a time; we can't
3658 * use find_all_inheritors to do it in one pass.
3660 children = find_inheritance_children(RelationGetRelid(rel));
3667 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3668 foreach(child, children)
3670 Oid childrelid = lfirst_oid(child);
3672 Form_pg_attribute childatt;
3674 childrel = heap_open(childrelid, AccessExclusiveLock);
3676 tuple = SearchSysCacheCopyAttName(childrelid, colName);
3677 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
3678 elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
3679 colName, childrelid);
3680 childatt = (Form_pg_attribute) GETSTRUCT(tuple);
3682 if (childatt->attinhcount <= 0) /* shouldn't happen */
3683 elog(ERROR, "relation %u has non-inherited attribute \"%s\"",
3684 childrelid, colName);
3689 * If the child column has other definition sources, just
3690 * decrement its inheritance count; if not, recurse to delete
3693 if (childatt->attinhcount == 1 && !childatt->attislocal)
3695 /* Time to delete this child column, too */
3696 ATExecDropColumn(childrel, colName, behavior, true, true);
3700 /* Child column must survive my deletion */
3701 childatt->attinhcount--;
3703 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3705 /* keep the system catalog indexes current */
3706 CatalogUpdateIndexes(attr_rel, tuple);
3708 /* Make update visible */
3709 CommandCounterIncrement();
3715 * If we were told to drop ONLY in this table (no recursion),
3716 * we need to mark the inheritors' attribute as locally
3717 * defined rather than inherited.
3719 childatt->attinhcount--;
3720 childatt->attislocal = true;
3722 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3724 /* keep the system catalog indexes current */
3725 CatalogUpdateIndexes(attr_rel, tuple);
3727 /* Make update visible */
3728 CommandCounterIncrement();
3731 heap_freetuple(tuple);
3733 heap_close(childrel, NoLock);
3735 heap_close(attr_rel, RowExclusiveLock);
3739 * Perform the actual column deletion
3741 object.classId = RelationRelationId;
3742 object.objectId = RelationGetRelid(rel);
3743 object.objectSubId = attnum;
3745 performDeletion(&object, behavior);
3748 * If we dropped the OID column, must adjust pg_class.relhasoids
3750 if (attnum == ObjectIdAttributeNumber)
3753 Form_pg_class tuple_class;
3755 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
3757 tuple = SearchSysCacheCopy(RELOID,
3758 ObjectIdGetDatum(RelationGetRelid(rel)),
3760 if (!HeapTupleIsValid(tuple))
3761 elog(ERROR, "cache lookup failed for relation %u",
3762 RelationGetRelid(rel));
3763 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
3765 tuple_class->relhasoids = false;
3766 simple_heap_update(class_rel, &tuple->t_self, tuple);
3768 /* Keep the catalog indexes up to date */
3769 CatalogUpdateIndexes(class_rel, tuple);
3771 heap_close(class_rel, RowExclusiveLock);
3776 * ALTER TABLE ADD INDEX
3778 * There is no such command in the grammar, but the parser converts UNIQUE
3779 * and PRIMARY KEY constraints into AT_AddIndex subcommands. This lets us
3780 * schedule creation of the index at the appropriate time during ALTER.
3783 ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
3784 IndexStmt *stmt, bool is_rebuild)
3790 Assert(IsA(stmt, IndexStmt));
3792 /* suppress schema rights check when rebuilding existing index */
3793 check_rights = !is_rebuild;
3794 /* skip index build if phase 3 will have to rewrite table anyway */
3795 skip_build = (tab->newvals != NIL);
3796 /* suppress notices when rebuilding existing index */
3799 DefineIndex(stmt->relation, /* relation */
3800 stmt->idxname, /* index name */
3801 InvalidOid, /* no predefined OID */
3802 stmt->accessMethod, /* am name */
3804 stmt->indexParams, /* parameters */
3805 (Expr *) stmt->whereClause,
3810 true, /* is_alter_table */
3817 * ALTER TABLE ADD CONSTRAINT
3820 ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
3822 switch (nodeTag(newConstraint))
3826 Constraint *constr = (Constraint *) newConstraint;
3829 * Currently, we only expect to see CONSTR_CHECK nodes
3830 * arriving here (see the preprocessing done in
3831 * parser/analyze.c). Use a switch anyway to make it easier
3832 * to add more code later.
3834 switch (constr->contype)
3842 * Call AddRelationRawConstraints to do the work.
3843 * It returns a list of cooked constraints.
3845 newcons = AddRelationRawConstraints(rel, NIL,
3846 list_make1(constr));
3847 /* Add each constraint to Phase 3's queue */
3848 foreach(lcon, newcons)
3850 CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
3851 NewConstraint *newcon;
3853 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3854 newcon->name = ccon->name;
3855 newcon->contype = ccon->contype;
3856 newcon->attnum = ccon->attnum;
3857 /* ExecQual wants implicit-AND format */
3858 newcon->qual = (Node *)
3859 make_ands_implicit((Expr *) ccon->expr);
3861 tab->constraints = lappend(tab->constraints,
3867 elog(ERROR, "unrecognized constraint type: %d",
3868 (int) constr->contype);
3872 case T_FkConstraint:
3874 FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
3877 * Assign or validate constraint name
3879 if (fkconstraint->constr_name)
3881 if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
3882 RelationGetRelid(rel),
3883 RelationGetNamespace(rel),
3884 fkconstraint->constr_name))
3886 (errcode(ERRCODE_DUPLICATE_OBJECT),
3887 errmsg("constraint \"%s\" for relation \"%s\" already exists",
3888 fkconstraint->constr_name,
3889 RelationGetRelationName(rel))));
3892 fkconstraint->constr_name =
3893 ChooseConstraintName(RelationGetRelationName(rel),
3894 strVal(linitial(fkconstraint->fk_attrs)),
3896 RelationGetNamespace(rel),
3899 ATAddForeignKeyConstraint(tab, rel, fkconstraint);
3904 elog(ERROR, "unrecognized node type: %d",
3905 (int) nodeTag(newConstraint));
3910 * Add a foreign-key constraint to a single table
3912 * Subroutine for ATExecAddConstraint. Must already hold exclusive
3913 * lock on the rel, and have done appropriate validity/permissions checks
3917 ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
3918 FkConstraint *fkconstraint)
3921 AclResult aclresult;
3922 int16 pkattnum[INDEX_MAX_KEYS];
3923 int16 fkattnum[INDEX_MAX_KEYS];
3924 Oid pktypoid[INDEX_MAX_KEYS];
3925 Oid fktypoid[INDEX_MAX_KEYS];
3926 Oid opclasses[INDEX_MAX_KEYS];
3934 * Grab an exclusive lock on the pk table, so that someone doesn't delete
3935 * rows out from under us. (Although a lesser lock would do for that
3936 * purpose, we'll need exclusive lock anyway to add triggers to the pk
3937 * table; trying to start with a lesser lock will just create a risk of
3940 pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
3943 * Validity and permissions checks
3945 * Note: REFERENCES permissions checks are redundant with CREATE TRIGGER,
3946 * but we may as well error out sooner instead of later.
3948 if (pkrel->rd_rel->relkind != RELKIND_RELATION)
3950 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3951 errmsg("referenced relation \"%s\" is not a table",
3952 RelationGetRelationName(pkrel))));
3954 aclresult = pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(),
3956 if (aclresult != ACLCHECK_OK)
3957 aclcheck_error(aclresult, ACL_KIND_CLASS,
3958 RelationGetRelationName(pkrel));
3960 if (!allowSystemTableMods && IsSystemRelation(pkrel))
3962 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
3963 errmsg("permission denied: \"%s\" is a system catalog",
3964 RelationGetRelationName(pkrel))));
3966 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
3968 if (aclresult != ACLCHECK_OK)
3969 aclcheck_error(aclresult, ACL_KIND_CLASS,
3970 RelationGetRelationName(rel));
3973 * Disallow reference from permanent table to temp table or vice versa.
3974 * (The ban on perm->temp is for fairly obvious reasons. The ban on
3975 * temp->perm is because other backends might need to run the RI triggers
3976 * on the perm table, but they can't reliably see tuples the owning
3977 * backend has created in the temp table, because non-shared buffers are
3978 * used for temp tables.)
3980 if (isTempNamespace(RelationGetNamespace(pkrel)))
3982 if (!isTempNamespace(RelationGetNamespace(rel)))
3984 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3985 errmsg("cannot reference temporary table from permanent table constraint")));
3989 if (isTempNamespace(RelationGetNamespace(rel)))
3991 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3992 errmsg("cannot reference permanent table from temporary table constraint")));
3996 * Look up the referencing attributes to make sure they exist, and record
3997 * their attnums and type OIDs.
3999 MemSet(pkattnum, 0, sizeof(pkattnum));
4000 MemSet(fkattnum, 0, sizeof(fkattnum));
4001 MemSet(pktypoid, 0, sizeof(pktypoid));
4002 MemSet(fktypoid, 0, sizeof(fktypoid));
4003 MemSet(opclasses, 0, sizeof(opclasses));
4005 numfks = transformColumnNameList(RelationGetRelid(rel),
4006 fkconstraint->fk_attrs,
4007 fkattnum, fktypoid);
4010 * If the attribute list for the referenced table was omitted, lookup the
4011 * definition of the primary key and use it. Otherwise, validate the
4012 * supplied attribute list. In either case, discover the index OID and
4013 * index opclasses, and the attnums and type OIDs of the attributes.
4015 if (fkconstraint->pk_attrs == NIL)
4017 numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
4018 &fkconstraint->pk_attrs,
4024 numpks = transformColumnNameList(RelationGetRelid(pkrel),
4025 fkconstraint->pk_attrs,
4026 pkattnum, pktypoid);
4027 /* Look for an index matching the column list */
4028 indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum,
4032 /* Be sure referencing and referenced column types are comparable */
4033 if (numfks != numpks)
4035 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4036 errmsg("number of referencing and referenced columns for foreign key disagree")));
4038 for (i = 0; i < numpks; i++)
4041 * pktypoid[i] is the primary key table's i'th key's type fktypoid[i]
4042 * is the foreign key table's i'th key's type
4044 * Note that we look for an operator with the PK type on the left;
4045 * when the types are different this is critical because the PK index
4046 * will need operators with the indexkey on the left. (Ordinarily both
4047 * commutator operators will exist if either does, but we won't get
4048 * the right answer from the test below on opclass membership unless
4049 * we select the proper operator.)
4051 Operator o = oper(NULL, list_make1(makeString("=")),
4052 pktypoid[i], fktypoid[i],
4057 (errcode(ERRCODE_UNDEFINED_FUNCTION),
4058 errmsg("foreign key constraint \"%s\" "
4059 "cannot be implemented",
4060 fkconstraint->constr_name),
4061 errdetail("Key columns \"%s\" and \"%s\" "
4062 "are of incompatible types: %s and %s.",
4063 strVal(list_nth(fkconstraint->fk_attrs, i)),
4064 strVal(list_nth(fkconstraint->pk_attrs, i)),
4065 format_type_be(fktypoid[i]),
4066 format_type_be(pktypoid[i]))));
4069 * Check that the found operator is compatible with the PK index, and
4070 * generate a warning if not, since otherwise costly seqscans will be
4071 * incurred to check FK validity.
4073 if (!op_in_opclass(oprid(o), opclasses[i]))
4075 (errmsg("foreign key constraint \"%s\" "
4076 "will require costly sequential scans",
4077 fkconstraint->constr_name),
4078 errdetail("Key columns \"%s\" and \"%s\" "
4079 "are of different types: %s and %s.",
4080 strVal(list_nth(fkconstraint->fk_attrs, i)),
4081 strVal(list_nth(fkconstraint->pk_attrs, i)),
4082 format_type_be(fktypoid[i]),
4083 format_type_be(pktypoid[i]))));
4089 * Tell Phase 3 to check that the constraint is satisfied by existing rows
4090 * (we can skip this during table creation).
4092 if (!fkconstraint->skip_validation)
4094 NewConstraint *newcon;
4096 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
4097 newcon->name = fkconstraint->constr_name;
4098 newcon->contype = CONSTR_FOREIGN;
4099 newcon->refrelid = RelationGetRelid(pkrel);
4100 newcon->qual = (Node *) fkconstraint;
4102 tab->constraints = lappend(tab->constraints, newcon);
4106 * Record the FK constraint in pg_constraint.
4108 constrOid = CreateConstraintEntry(fkconstraint->constr_name,
4109 RelationGetNamespace(rel),
4111 fkconstraint->deferrable,
4112 fkconstraint->initdeferred,
4113 RelationGetRelid(rel),
4116 InvalidOid, /* not a domain
4118 RelationGetRelid(pkrel),
4121 fkconstraint->fk_upd_action,
4122 fkconstraint->fk_del_action,
4123 fkconstraint->fk_matchtype,
4125 NULL, /* no check constraint */
4130 * Create the triggers that will enforce the constraint.
4132 createForeignKeyTriggers(rel, fkconstraint, constrOid);
4135 * Close pk table, but keep lock until we've committed.
4137 heap_close(pkrel, NoLock);
4142 * transformColumnNameList - transform list of column names
4144 * Lookup each name and return its attnum and type OID
4147 transformColumnNameList(Oid relId, List *colList,
4148 int16 *attnums, Oid *atttypids)
4156 char *attname = strVal(lfirst(l));
4159 atttuple = SearchSysCacheAttName(relId, attname);
4160 if (!HeapTupleIsValid(atttuple))
4162 (errcode(ERRCODE_UNDEFINED_COLUMN),
4163 errmsg("column \"%s\" referenced in foreign key constraint does not exist",
4165 if (attnum >= INDEX_MAX_KEYS)
4167 (errcode(ERRCODE_TOO_MANY_COLUMNS),
4168 errmsg("cannot have more than %d keys in a foreign key",
4170 attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum;
4171 atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
4172 ReleaseSysCache(atttuple);
4180 * transformFkeyGetPrimaryKey -
4182 * Look up the names, attnums, and types of the primary key attributes
4183 * for the pkrel. Also return the index OID and index opclasses of the
4184 * index supporting the primary key.
4186 * All parameters except pkrel are output parameters. Also, the function
4187 * return value is the number of attributes in the primary key.
4189 * Used when the column list in the REFERENCES specification is omitted.
4192 transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
4194 int16 *attnums, Oid *atttypids,
4198 ListCell *indexoidscan;
4199 HeapTuple indexTuple = NULL;
4200 Form_pg_index indexStruct = NULL;
4201 Datum indclassDatum;
4203 oidvector *indclass;
4207 * Get the list of index OIDs for the table from the relcache, and look up
4208 * each one in the pg_index syscache until we find one marked primary key
4209 * (hopefully there isn't more than one such).
4211 *indexOid = InvalidOid;
4213 indexoidlist = RelationGetIndexList(pkrel);
4215 foreach(indexoidscan, indexoidlist)
4217 Oid indexoid = lfirst_oid(indexoidscan);
4219 indexTuple = SearchSysCache(INDEXRELID,
4220 ObjectIdGetDatum(indexoid),
4222 if (!HeapTupleIsValid(indexTuple))
4223 elog(ERROR, "cache lookup failed for index %u", indexoid);
4224 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4225 if (indexStruct->indisprimary)
4227 *indexOid = indexoid;
4230 ReleaseSysCache(indexTuple);
4233 list_free(indexoidlist);
4236 * Check that we found it
4238 if (!OidIsValid(*indexOid))
4240 (errcode(ERRCODE_UNDEFINED_OBJECT),
4241 errmsg("there is no primary key for referenced table \"%s\"",
4242 RelationGetRelationName(pkrel))));
4244 /* Must get indclass the hard way */
4245 indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4246 Anum_pg_index_indclass, &isnull);
4248 indclass = (oidvector *) DatumGetPointer(indclassDatum);
4251 * Now build the list of PK attributes from the indkey definition (we
4252 * assume a primary key cannot have expressional elements)
4255 for (i = 0; i < indexStruct->indnatts; i++)
4257 int pkattno = indexStruct->indkey.values[i];
4259 attnums[i] = pkattno;
4260 atttypids[i] = attnumTypeId(pkrel, pkattno);
4261 opclasses[i] = indclass->values[i];
4262 *attnamelist = lappend(*attnamelist,
4263 makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
4266 ReleaseSysCache(indexTuple);
4272 * transformFkeyCheckAttrs -
4274 * Make sure that the attributes of a referenced table belong to a unique
4275 * (or primary key) constraint. Return the OID of the index supporting
4276 * the constraint, as well as the opclasses associated with the index
4280 transformFkeyCheckAttrs(Relation pkrel,
4281 int numattrs, int16 *attnums,
4282 Oid *opclasses) /* output parameter */
4284 Oid indexoid = InvalidOid;
4287 ListCell *indexoidscan;
4290 * Get the list of index OIDs for the table from the relcache, and look up
4291 * each one in the pg_index syscache, and match unique indexes to the list
4292 * of attnums we are given.
4294 indexoidlist = RelationGetIndexList(pkrel);
4296 foreach(indexoidscan, indexoidlist)
4298 HeapTuple indexTuple;
4299 Form_pg_index indexStruct;
4303 indexoid = lfirst_oid(indexoidscan);
4304 indexTuple = SearchSysCache(INDEXRELID,
4305 ObjectIdGetDatum(indexoid),
4307 if (!HeapTupleIsValid(indexTuple))
4308 elog(ERROR, "cache lookup failed for index %u", indexoid);
4309 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4312 * Must have the right number of columns; must be unique and not a
4313 * partial index; forget it if there are any expressions, too
4315 if (indexStruct->indnatts == numattrs &&
4316 indexStruct->indisunique &&
4317 heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
4318 heap_attisnull(indexTuple, Anum_pg_index_indexprs))
4320 /* Must get indclass the hard way */
4321 Datum indclassDatum;
4323 oidvector *indclass;
4325 indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4326 Anum_pg_index_indclass, &isnull);
4328 indclass = (oidvector *) DatumGetPointer(indclassDatum);
4331 * The given attnum list may match the index columns in any order.
4332 * Check that each list is a subset of the other.
4334 for (i = 0; i < numattrs; i++)
4337 for (j = 0; j < numattrs; j++)
4339 if (attnums[i] == indexStruct->indkey.values[j])
4350 for (i = 0; i < numattrs; i++)
4353 for (j = 0; j < numattrs; j++)
4355 if (attnums[j] == indexStruct->indkey.values[i])
4357 opclasses[j] = indclass->values[i];
4367 ReleaseSysCache(indexTuple);
4374 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4375 errmsg("there is no unique constraint matching given keys for referenced table \"%s\"",
4376 RelationGetRelationName(pkrel))));
4378 list_free(indexoidlist);
4384 * Scan the existing rows in a table to verify they meet a proposed FK
4387 * Caller must have opened and locked both relations.
4390 validateForeignKeyConstraint(FkConstraint *fkconstraint,
4401 * See if we can do it with a single LEFT JOIN query. A FALSE result
4402 * indicates we must proceed with the fire-the-trigger method.
4404 if (RI_Initial_Check(fkconstraint, rel, pkrel))
4408 * Scan through each tuple, calling RI_FKey_check_ins (insert trigger) as
4409 * if that tuple had just been inserted. If any of those fail, it should
4410 * ereport(ERROR) and that's that.
4412 MemSet(&trig, 0, sizeof(trig));
4413 trig.tgoid = InvalidOid;
4414 trig.tgname = fkconstraint->constr_name;
4415 trig.tgenabled = TRUE;
4416 trig.tgisconstraint = TRUE;
4417 trig.tgconstrrelid = RelationGetRelid(pkrel);
4418 trig.tgdeferrable = FALSE;
4419 trig.tginitdeferred = FALSE;
4421 trig.tgargs = (char **) palloc(sizeof(char *) *
4422 (4 + list_length(fkconstraint->fk_attrs)
4423 + list_length(fkconstraint->pk_attrs)));
4425 trig.tgargs[0] = trig.tgname;
4426 trig.tgargs[1] = RelationGetRelationName(rel);
4427 trig.tgargs[2] = RelationGetRelationName(pkrel);
4428 trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
4430 foreach(list, fkconstraint->fk_attrs)
4432 char *fk_at = strVal(lfirst(list));
4434 trig.tgargs[count] = fk_at;
4438 foreach(list, fkconstraint->pk_attrs)
4440 char *pk_at = strVal(lfirst(list));
4442 trig.tgargs[count] = pk_at;
4445 trig.tgnargs = count - 1;
4447 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
4449 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
4451 FunctionCallInfoData fcinfo;
4452 TriggerData trigdata;
4455 * Make a call to the trigger function
4457 * No parameters are passed, but we do set a context
4459 MemSet(&fcinfo, 0, sizeof(fcinfo));
4462 * We assume RI_FKey_check_ins won't look at flinfo...
4464 trigdata.type = T_TriggerData;
4465 trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
4466 trigdata.tg_relation = rel;
4467 trigdata.tg_trigtuple = tuple;
4468 trigdata.tg_newtuple = NULL;
4469 trigdata.tg_trigger = &trig;
4470 trigdata.tg_trigtuplebuf = scan->rs_cbuf;
4471 trigdata.tg_newtuplebuf = InvalidBuffer;
4473 fcinfo.context = (Node *) &trigdata;
4475 RI_FKey_check_ins(&fcinfo);
4484 CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
4485 ObjectAddress *constrobj, ObjectAddress *trigobj,
4488 CreateTrigStmt *fk_trigger;
4492 fk_trigger = makeNode(CreateTrigStmt);
4493 fk_trigger->trigname = fkconstraint->constr_name;
4494 fk_trigger->relation = myRel;
4495 fk_trigger->before = false;
4496 fk_trigger->row = true;
4498 /* Either ON INSERT or ON UPDATE */
4501 fk_trigger->funcname = SystemFuncName("RI_FKey_check_ins");
4502 fk_trigger->actions[0] = 'i';
4506 fk_trigger->funcname = SystemFuncName("RI_FKey_check_upd");
4507 fk_trigger->actions[0] = 'u';
4509 fk_trigger->actions[1] = '\0';
4511 fk_trigger->isconstraint = true;
4512 fk_trigger->deferrable = fkconstraint->deferrable;
4513 fk_trigger->initdeferred = fkconstraint->initdeferred;
4514 fk_trigger->constrrel = fkconstraint->pktable;
4516 fk_trigger->args = NIL;
4517 fk_trigger->args = lappend(fk_trigger->args,
4518 makeString(fkconstraint->constr_name));
4519 fk_trigger->args = lappend(fk_trigger->args,
4520 makeString(myRel->relname));
4521 fk_trigger->args = lappend(fk_trigger->args,
4522 makeString(fkconstraint->pktable->relname));
4523 fk_trigger->args = lappend(fk_trigger->args,
4524 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4525 if (list_length(fkconstraint->fk_attrs) != list_length(fkconstraint->pk_attrs))
4527 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4528 errmsg("number of referencing and referenced columns for foreign key disagree")));
4530 forboth(fk_attr, fkconstraint->fk_attrs,
4531 pk_attr, fkconstraint->pk_attrs)
4533 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4534 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4537 trigobj->objectId = CreateTrigger(fk_trigger, true);
4539 /* Register dependency from trigger to constraint */
4540 recordDependencyOn(trigobj, constrobj, DEPENDENCY_INTERNAL);
4542 /* Make changes-so-far visible */
4543 CommandCounterIncrement();
4547 * Create the triggers that implement an FK constraint.
4550 createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
4554 CreateTrigStmt *fk_trigger;
4557 ObjectAddress trigobj,
4561 * Reconstruct a RangeVar for my relation (not passed in, unfortunately).
4563 myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
4564 pstrdup(RelationGetRelationName(rel)));
4567 * Preset objectAddress fields
4569 constrobj.classId = ConstraintRelationId;
4570 constrobj.objectId = constrOid;
4571 constrobj.objectSubId = 0;
4572 trigobj.classId = TriggerRelationId;
4573 trigobj.objectSubId = 0;
4575 /* Make changes-so-far visible */
4576 CommandCounterIncrement();
4579 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the CHECK
4580 * action for both INSERTs and UPDATEs on the referencing table.
4582 CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, true);
4583 CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, false);
4586 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4587 * DELETE action on the referenced table.
4589 fk_trigger = makeNode(CreateTrigStmt);
4590 fk_trigger->trigname = fkconstraint->constr_name;
4591 fk_trigger->relation = fkconstraint->pktable;
4592 fk_trigger->before = false;
4593 fk_trigger->row = true;
4594 fk_trigger->actions[0] = 'd';
4595 fk_trigger->actions[1] = '\0';
4597 fk_trigger->isconstraint = true;
4598 fk_trigger->constrrel = myRel;
4599 switch (fkconstraint->fk_del_action)
4601 case FKCONSTR_ACTION_NOACTION:
4602 fk_trigger->deferrable = fkconstraint->deferrable;
4603 fk_trigger->initdeferred = fkconstraint->initdeferred;
4604 fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_del");
4606 case FKCONSTR_ACTION_RESTRICT:
4607 fk_trigger->deferrable = false;
4608 fk_trigger->initdeferred = false;
4609 fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_del");
4611 case FKCONSTR_ACTION_CASCADE:
4612 fk_trigger->deferrable = false;
4613 fk_trigger->initdeferred = false;
4614 fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_del");
4616 case FKCONSTR_ACTION_SETNULL:
4617 fk_trigger->deferrable = false;
4618 fk_trigger->initdeferred = false;
4619 fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_del");
4621 case FKCONSTR_ACTION_SETDEFAULT:
4622 fk_trigger->deferrable = false;
4623 fk_trigger->initdeferred = false;
4624 fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_del");
4627 elog(ERROR, "unrecognized FK action type: %d",
4628 (int) fkconstraint->fk_del_action);
4632 fk_trigger->args = NIL;
4633 fk_trigger->args = lappend(fk_trigger->args,
4634 makeString(fkconstraint->constr_name));
4635 fk_trigger->args = lappend(fk_trigger->args,
4636 makeString(myRel->relname));
4637 fk_trigger->args = lappend(fk_trigger->args,
4638 makeString(fkconstraint->pktable->relname));
4639 fk_trigger->args = lappend(fk_trigger->args,
4640 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4641 forboth(fk_attr, fkconstraint->fk_attrs,
4642 pk_attr, fkconstraint->pk_attrs)
4644 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4645 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4648 trigobj.objectId = CreateTrigger(fk_trigger, true);
4650 /* Register dependency from trigger to constraint */
4651 recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4653 /* Make changes-so-far visible */
4654 CommandCounterIncrement();
4657 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4658 * UPDATE action on the referenced table.
4660 fk_trigger = makeNode(CreateTrigStmt);
4661 fk_trigger->trigname = fkconstraint->constr_name;
4662 fk_trigger->relation = fkconstraint->pktable;
4663 fk_trigger->before = false;
4664 fk_trigger->row = true;
4665 fk_trigger->actions[0] = 'u';
4666 fk_trigger->actions[1] = '\0';
4667 fk_trigger->isconstraint = true;
4668 fk_trigger->constrrel = myRel;
4669 switch (fkconstraint->fk_upd_action)
4671 case FKCONSTR_ACTION_NOACTION:
4672 fk_trigger->deferrable = fkconstraint->deferrable;
4673 fk_trigger->initdeferred = fkconstraint->initdeferred;
4674 fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_upd");
4676 case FKCONSTR_ACTION_RESTRICT:
4677 fk_trigger->deferrable = false;
4678 fk_trigger->initdeferred = false;
4679 fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_upd");
4681 case FKCONSTR_ACTION_CASCADE:
4682 fk_trigger->deferrable = false;
4683 fk_trigger->initdeferred = false;
4684 fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_upd");
4686 case FKCONSTR_ACTION_SETNULL:
4687 fk_trigger->deferrable = false;
4688 fk_trigger->initdeferred = false;
4689 fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_upd");
4691 case FKCONSTR_ACTION_SETDEFAULT:
4692 fk_trigger->deferrable = false;
4693 fk_trigger->initdeferred = false;
4694 fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_upd");
4697 elog(ERROR, "unrecognized FK action type: %d",
4698 (int) fkconstraint->fk_upd_action);
4702 fk_trigger->args = NIL;
4703 fk_trigger->args = lappend(fk_trigger->args,
4704 makeString(fkconstraint->constr_name));
4705 fk_trigger->args = lappend(fk_trigger->args,
4706 makeString(myRel->relname));
4707 fk_trigger->args = lappend(fk_trigger->args,
4708 makeString(fkconstraint->pktable->relname));
4709 fk_trigger->args = lappend(fk_trigger->args,
4710 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4711 forboth(fk_attr, fkconstraint->fk_attrs,
4712 pk_attr, fkconstraint->pk_attrs)
4714 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4715 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4718 trigobj.objectId = CreateTrigger(fk_trigger, true);
4720 /* Register dependency from trigger to constraint */
4721 recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4725 * fkMatchTypeToString -
4726 * convert FKCONSTR_MATCH_xxx code to string to use in trigger args
4729 fkMatchTypeToString(char match_type)
4733 case FKCONSTR_MATCH_FULL:
4734 return pstrdup("FULL");
4735 case FKCONSTR_MATCH_PARTIAL:
4736 return pstrdup("PARTIAL");
4737 case FKCONSTR_MATCH_UNSPECIFIED:
4738 return pstrdup("UNSPECIFIED");
4740 elog(ERROR, "unrecognized match type: %d",
4743 return NULL; /* can't get here */
4747 * ALTER TABLE DROP CONSTRAINT
4750 ATPrepDropConstraint(List **wqueue, Relation rel,
4751 bool recurse, AlterTableCmd *cmd)
4754 * We don't want errors or noise from child tables, so we have to pass
4755 * down a modified command.
4759 AlterTableCmd *childCmd = copyObject(cmd);
4761 childCmd->subtype = AT_DropConstraintQuietly;
4762 ATSimpleRecursion(wqueue, rel, childCmd, recurse);
4767 ATExecDropConstraint(Relation rel, const char *constrName,
4768 DropBehavior behavior, bool quiet)
4772 deleted = RemoveRelConstraints(rel, constrName, behavior);
4776 /* If zero constraints deleted, complain */
4779 (errcode(ERRCODE_UNDEFINED_OBJECT),
4780 errmsg("constraint \"%s\" does not exist",
4782 /* Otherwise if more than one constraint deleted, notify */
4783 else if (deleted > 1)
4785 (errmsg("multiple constraints named \"%s\" were dropped",
4794 ATPrepAlterColumnType(List **wqueue,
4795 AlteredTableInfo *tab, Relation rel,
4796 bool recurse, bool recursing,
4799 char *colName = cmd->name;
4800 TypeName *typename = (TypeName *) cmd->def;
4802 Form_pg_attribute attTup;
4806 NewColumnValue *newval;
4807 ParseState *pstate = make_parsestate(NULL);
4809 /* lookup the attribute so we can check inheritance status */
4810 tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
4811 if (!HeapTupleIsValid(tuple))
4813 (errcode(ERRCODE_UNDEFINED_COLUMN),
4814 errmsg("column \"%s\" of relation \"%s\" does not exist",
4815 colName, RelationGetRelationName(rel))));
4816 attTup = (Form_pg_attribute) GETSTRUCT(tuple);
4817 attnum = attTup->attnum;
4819 /* Can't alter a system attribute */
4822 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4823 errmsg("cannot alter system column \"%s\"",
4826 /* Don't alter inherited columns */
4827 if (attTup->attinhcount > 0 && !recursing)
4829 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4830 errmsg("cannot alter inherited column \"%s\"",
4833 /* Look up the target type */
4834 targettype = typenameTypeId(NULL, typename);
4836 /* make sure datatype is legal for a column */
4837 CheckAttributeType(colName, targettype);
4840 * Set up an expression to transform the old data value to the new type.
4841 * If a USING option was given, transform and use that expression, else
4842 * just take the old value and try to coerce it. We do this first so that
4843 * type incompatibility can be detected before we waste effort, and
4844 * because we need the expression to be parsed against the original table
4851 /* Expression must be able to access vars of old table */
4852 rte = addRangeTableEntryForRelation(pstate,
4857 addRTEtoQuery(pstate, rte, false, true, true);
4859 transform = transformExpr(pstate, cmd->transform);
4861 /* It can't return a set */
4862 if (expression_returns_set(transform))
4864 (errcode(ERRCODE_DATATYPE_MISMATCH),
4865 errmsg("transform expression must not return a set")));
4867 /* No subplans or aggregates, either... */
4868 if (pstate->p_hasSubLinks)
4870 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4871 errmsg("cannot use subquery in transform expression")));
4872 if (pstate->p_hasAggs)
4874 (errcode(ERRCODE_GROUPING_ERROR),
4875 errmsg("cannot use aggregate function in transform expression")));
4879 transform = (Node *) makeVar(1, attnum,
4880 attTup->atttypid, attTup->atttypmod,
4884 transform = coerce_to_target_type(pstate,
4885 transform, exprType(transform),
4886 targettype, typename->typmod,
4887 COERCION_ASSIGNMENT,
4888 COERCE_IMPLICIT_CAST);
4889 if (transform == NULL)
4891 (errcode(ERRCODE_DATATYPE_MISMATCH),
4892 errmsg("column \"%s\" cannot be cast to type \"%s\"",
4893 colName, TypeNameToString(typename))));
4896 * Add a work queue item to make ATRewriteTable update the column
4899 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
4900 newval->attnum = attnum;
4901 newval->expr = (Expr *) transform;
4903 tab->newvals = lappend(tab->newvals, newval);
4905 ReleaseSysCache(tuple);
4908 * The recursion case is handled by ATSimpleRecursion. However, if we are
4909 * told not to recurse, there had better not be any child tables; else the
4910 * alter would put them out of step.
4913 ATSimpleRecursion(wqueue, rel, cmd, recurse);
4914 else if (!recursing &&
4915 find_inheritance_children(RelationGetRelid(rel)) != NIL)
4917 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4918 errmsg("type of inherited column \"%s\" must be changed in child tables too",
4923 ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
4924 const char *colName, TypeName *typename)
4927 Form_pg_attribute attTup;
4929 HeapTuple typeTuple;
4933 Relation attrelation;
4939 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
4941 /* Look up the target column */
4942 heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
4943 if (!HeapTupleIsValid(heapTup)) /* shouldn't happen */
4945 (errcode(ERRCODE_UNDEFINED_COLUMN),
4946 errmsg("column \"%s\" of relation \"%s\" does not exist",
4947 colName, RelationGetRelationName(rel))));
4948 attTup = (Form_pg_attribute) GETSTRUCT(heapTup);
4949 attnum = attTup->attnum;
4951 /* Check for multiple ALTER TYPE on same column --- can't cope */
4952 if (attTup->atttypid != tab->oldDesc->attrs[attnum - 1]->atttypid ||
4953 attTup->atttypmod != tab->oldDesc->attrs[attnum - 1]->atttypmod)
4955 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4956 errmsg("cannot alter type of column \"%s\" twice",
4959 /* Look up the target type (should not fail, since prep found it) */
4960 typeTuple = typenameType(NULL, typename);
4961 tform = (Form_pg_type) GETSTRUCT(typeTuple);
4962 targettype = HeapTupleGetOid(typeTuple);
4965 * If there is a default expression for the column, get it and ensure we
4966 * can coerce it to the new datatype. (We must do this before changing
4967 * the column type, because build_column_default itself will try to
4968 * coerce, and will not issue the error message we want if it fails.)
4970 * We remove any implicit coercion steps at the top level of the old
4971 * default expression; this has been agreed to satisfy the principle of
4972 * least surprise. (The conversion to the new column type should act like
4973 * it started from what the user sees as the stored expression, and the
4974 * implicit coercions aren't going to be shown.)
4976 if (attTup->atthasdef)
4978 defaultexpr = build_column_default(rel, attnum);
4979 Assert(defaultexpr);
4980 defaultexpr = strip_implicit_coercions(defaultexpr);
4981 defaultexpr = coerce_to_target_type(NULL, /* no UNKNOWN params */
4982 defaultexpr, exprType(defaultexpr),
4983 targettype, typename->typmod,
4984 COERCION_ASSIGNMENT,
4985 COERCE_IMPLICIT_CAST);
4986 if (defaultexpr == NULL)
4988 (errcode(ERRCODE_DATATYPE_MISMATCH),
4989 errmsg("default for column \"%s\" cannot be cast to type \"%s\"",
4990 colName, TypeNameToString(typename))));
4996 * Find everything that depends on the column (constraints, indexes, etc),
4997 * and record enough information to let us recreate the objects.
4999 * The actual recreation does not happen here, but only after we have
5000 * performed all the individual ALTER TYPE operations. We have to save
5001 * the info before executing ALTER TYPE, though, else the deparser will
5004 * There could be multiple entries for the same object, so we must check
5005 * to ensure we process each one only once. Note: we assume that an index
5006 * that implements a constraint will not show a direct dependency on the
5009 depRel = heap_open(DependRelationId, RowExclusiveLock);
5011 ScanKeyInit(&key[0],
5012 Anum_pg_depend_refclassid,
5013 BTEqualStrategyNumber, F_OIDEQ,
5014 ObjectIdGetDatum(RelationRelationId));
5015 ScanKeyInit(&key[1],
5016 Anum_pg_depend_refobjid,
5017 BTEqualStrategyNumber, F_OIDEQ,
5018 ObjectIdGetDatum(RelationGetRelid(rel)));
5019 ScanKeyInit(&key[2],
5020 Anum_pg_depend_refobjsubid,
5021 BTEqualStrategyNumber, F_INT4EQ,
5022 Int32GetDatum((int32) attnum));
5024 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
5025 SnapshotNow, 3, key);
5027 while (HeapTupleIsValid(depTup = systable_getnext(scan)))
5029 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
5030 ObjectAddress foundObject;
5032 /* We don't expect any PIN dependencies on columns */
5033 if (foundDep->deptype == DEPENDENCY_PIN)
5034 elog(ERROR, "cannot alter type of a pinned column");
5036 foundObject.classId = foundDep->classid;
5037 foundObject.objectId = foundDep->objid;
5038 foundObject.objectSubId = foundDep->objsubid;
5040 switch (getObjectClass(&foundObject))
5044 char relKind = get_rel_relkind(foundObject.objectId);
5046 if (relKind == RELKIND_INDEX)
5048 Assert(foundObject.objectSubId == 0);
5049 if (!list_member_oid(tab->changedIndexOids, foundObject.objectId))
5051 tab->changedIndexOids = lappend_oid(tab->changedIndexOids,
5052 foundObject.objectId);
5053 tab->changedIndexDefs = lappend(tab->changedIndexDefs,
5054 pg_get_indexdef_string(foundObject.objectId));
5057 else if (relKind == RELKIND_SEQUENCE)
5060 * This must be a SERIAL column's sequence. We need
5061 * not do anything to it.
5063 Assert(foundObject.objectSubId == 0);
5067 /* Not expecting any other direct dependencies... */
5068 elog(ERROR, "unexpected object depending on column: %s",
5069 getObjectDescription(&foundObject));
5074 case OCLASS_CONSTRAINT:
5075 Assert(foundObject.objectSubId == 0);
5076 if (!list_member_oid(tab->changedConstraintOids,
5077 foundObject.objectId))
5079 char *defstring = pg_get_constraintdef_string(foundObject.objectId);
5082 * Put NORMAL dependencies at the front of the list and
5083 * AUTO dependencies at the back. This makes sure that
5084 * foreign-key constraints depending on this column will
5085 * be dropped before unique or primary-key constraints of
5086 * the column; which we must have because the FK
5087 * constraints depend on the indexes belonging to the
5088 * unique constraints.
5090 if (foundDep->deptype == DEPENDENCY_NORMAL)
5092 tab->changedConstraintOids =
5093 lcons_oid(foundObject.objectId,
5094 tab->changedConstraintOids);
5095 tab->changedConstraintDefs =
5097 tab->changedConstraintDefs);
5101 tab->changedConstraintOids =
5102 lappend_oid(tab->changedConstraintOids,
5103 foundObject.objectId);
5104 tab->changedConstraintDefs =
5105 lappend(tab->changedConstraintDefs,
5111 case OCLASS_REWRITE:
5112 /* XXX someday see if we can cope with revising views */
5114 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5115 errmsg("cannot alter type of a column used by a view or rule"),
5116 errdetail("%s depends on column \"%s\"",
5117 getObjectDescription(&foundObject),
5121 case OCLASS_DEFAULT:
5124 * Ignore the column's default expression, since we will fix
5127 Assert(defaultexpr);
5133 case OCLASS_CONVERSION:
5134 case OCLASS_LANGUAGE:
5135 case OCLASS_OPERATOR:
5136 case OCLASS_OPCLASS:
5137 case OCLASS_TRIGGER:
5141 * We don't expect any of these sorts of objects to depend on
5144 elog(ERROR, "unexpected object depending on column: %s",
5145 getObjectDescription(&foundObject));
5149 elog(ERROR, "unrecognized object class: %u",
5150 foundObject.classId);
5154 systable_endscan(scan);
5157 * Now scan for dependencies of this column on other things. The only
5158 * thing we should find is the dependency on the column datatype, which we
5161 ScanKeyInit(&key[0],
5162 Anum_pg_depend_classid,
5163 BTEqualStrategyNumber, F_OIDEQ,
5164 ObjectIdGetDatum(RelationRelationId));
5165 ScanKeyInit(&key[1],
5166 Anum_pg_depend_objid,
5167 BTEqualStrategyNumber, F_OIDEQ,
5168 ObjectIdGetDatum(RelationGetRelid(rel)));
5169 ScanKeyInit(&key[2],
5170 Anum_pg_depend_objsubid,
5171 BTEqualStrategyNumber, F_INT4EQ,
5172 Int32GetDatum((int32) attnum));
5174 scan = systable_beginscan(depRel, DependDependerIndexId, true,
5175 SnapshotNow, 3, key);
5177 while (HeapTupleIsValid(depTup = systable_getnext(scan)))
5179 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
5181 if (foundDep->deptype != DEPENDENCY_NORMAL)
5182 elog(ERROR, "found unexpected dependency type '%c'",
5184 if (foundDep->refclassid != TypeRelationId ||
5185 foundDep->refobjid != attTup->atttypid)
5186 elog(ERROR, "found unexpected dependency for column");
5188 simple_heap_delete(depRel, &depTup->t_self);
5191 systable_endscan(scan);
5193 heap_close(depRel, RowExclusiveLock);
5196 * Here we go --- change the recorded column type. (Note heapTup is a
5197 * copy of the syscache entry, so okay to scribble on.)
5199 attTup->atttypid = targettype;
5200 attTup->atttypmod = typename->typmod;
5201 attTup->attndims = list_length(typename->arrayBounds);
5202 attTup->attlen = tform->typlen;
5203 attTup->attbyval = tform->typbyval;
5204 attTup->attalign = tform->typalign;
5205 attTup->attstorage = tform->typstorage;
5207 ReleaseSysCache(typeTuple);
5209 simple_heap_update(attrelation, &heapTup->t_self, heapTup);
5211 /* keep system catalog indexes current */
5212 CatalogUpdateIndexes(attrelation, heapTup);
5214 heap_close(attrelation, RowExclusiveLock);
5216 /* Install dependency on new datatype */
5217 add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype);
5220 * Drop any pg_statistic entry for the column, since it's now wrong type
5222 RemoveStatistics(RelationGetRelid(rel), attnum);
5225 * Update the default, if present, by brute force --- remove and re-add
5226 * the default. Probably unsafe to take shortcuts, since the new version
5227 * may well have additional dependencies. (It's okay to do this now,
5228 * rather than after other ALTER TYPE commands, since the default won't
5229 * depend on other column types.)
5233 /* Must make new row visible since it will be updated again */
5234 CommandCounterIncrement();
5237 * We use RESTRICT here for safety, but at present we do not expect
5238 * anything to depend on the default.
5240 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
5242 StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
5246 heap_freetuple(heapTup);
5250 * Cleanup after we've finished all the ALTER TYPE operations for a
5251 * particular relation. We have to drop and recreate all the indexes
5252 * and constraints that depend on the altered columns.
5255 ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
5261 * Re-parse the index and constraint definitions, and attach them to the
5262 * appropriate work queue entries. We do this before dropping because in
5263 * the case of a FOREIGN KEY constraint, we might not yet have exclusive
5264 * lock on the table the constraint is attached to, and we need to get
5265 * that before dropping. It's safe because the parser won't actually look
5266 * at the catalogs to detect the existing entry.
5268 foreach(l, tab->changedIndexDefs)
5269 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5270 foreach(l, tab->changedConstraintDefs)
5271 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5274 * Now we can drop the existing constraints and indexes --- constraints
5275 * first, since some of them might depend on the indexes. In fact, we
5276 * have to delete FOREIGN KEY constraints before UNIQUE constraints,
5277 * but we already ordered the constraint list to ensure that would happen.
5278 * It should be okay to use DROP_RESTRICT here, since nothing else should
5279 * be depending on these objects.
5281 foreach(l, tab->changedConstraintOids)
5283 obj.classId = ConstraintRelationId;
5284 obj.objectId = lfirst_oid(l);
5285 obj.objectSubId = 0;
5286 performDeletion(&obj, DROP_RESTRICT);
5289 foreach(l, tab->changedIndexOids)
5291 obj.classId = RelationRelationId;
5292 obj.objectId = lfirst_oid(l);
5293 obj.objectSubId = 0;
5294 performDeletion(&obj, DROP_RESTRICT);
5298 * The objects will get recreated during subsequent passes over the work
5304 ATPostAlterTypeParse(char *cmd, List **wqueue)
5306 List *raw_parsetree_list;
5307 List *querytree_list;
5308 ListCell *list_item;
5311 * We expect that we only have to do raw parsing and parse analysis, not
5312 * any rule rewriting, since these will all be utility statements.
5314 raw_parsetree_list = raw_parser(cmd);
5315 querytree_list = NIL;
5316 foreach(list_item, raw_parsetree_list)
5318 Node *parsetree = (Node *) lfirst(list_item);
5320 querytree_list = list_concat(querytree_list,
5321 parse_analyze(parsetree, cmd, NULL, 0));
5325 * Attach each generated command to the proper place in the work queue.
5326 * Note this could result in creation of entirely new work-queue entries.
5328 foreach(list_item, querytree_list)
5330 Query *query = (Query *) lfirst(list_item);
5332 AlteredTableInfo *tab;
5334 Assert(IsA(query, Query));
5335 Assert(query->commandType == CMD_UTILITY);
5336 switch (nodeTag(query->utilityStmt))
5340 IndexStmt *stmt = (IndexStmt *) query->utilityStmt;
5341 AlterTableCmd *newcmd;
5343 rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5344 tab = ATGetQueueEntry(wqueue, rel);
5345 newcmd = makeNode(AlterTableCmd);
5346 newcmd->subtype = AT_ReAddIndex;
5347 newcmd->def = (Node *) stmt;
5348 tab->subcmds[AT_PASS_OLD_INDEX] =
5349 lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
5350 relation_close(rel, NoLock);
5353 case T_AlterTableStmt:
5355 AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt;
5358 rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5359 tab = ATGetQueueEntry(wqueue, rel);
5360 foreach(lcmd, stmt->cmds)
5362 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
5364 switch (cmd->subtype)
5367 cmd->subtype = AT_ReAddIndex;
5368 tab->subcmds[AT_PASS_OLD_INDEX] =
5369 lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd);
5371 case AT_AddConstraint:
5372 tab->subcmds[AT_PASS_OLD_CONSTR] =
5373 lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd);
5376 elog(ERROR, "unexpected statement type: %d",
5377 (int) cmd->subtype);
5380 relation_close(rel, NoLock);
5384 elog(ERROR, "unexpected statement type: %d",
5385 (int) nodeTag(query->utilityStmt));
5394 * recursing is true if we are recursing from a table to its indexes or
5395 * toast table. We don't allow the ownership of those things to be
5396 * changed separately from the parent table. Also, we can skip permission
5397 * checks (this is necessary not just an optimization, else we'd fail to
5398 * handle toast tables properly).
5401 ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
5403 Relation target_rel;
5406 Form_pg_class tuple_class;
5409 * Get exclusive lock till end of transaction on the target table. Use
5410 * relation_open so that we can work on indexes and sequences.
5412 target_rel = relation_open(relationOid, AccessExclusiveLock);
5414 /* Get its pg_class tuple, too */
5415 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
5417 tuple = SearchSysCache(RELOID,
5418 ObjectIdGetDatum(relationOid),
5420 if (!HeapTupleIsValid(tuple))
5421 elog(ERROR, "cache lookup failed for relation %u", relationOid);
5422 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
5424 /* Can we change the ownership of this tuple? */
5425 switch (tuple_class->relkind)
5427 case RELKIND_RELATION:
5429 case RELKIND_SEQUENCE:
5430 /* ok to change owner */
5436 * Because ALTER INDEX OWNER used to be allowed, and in fact
5437 * is generated by old versions of pg_dump, we give a warning
5438 * and do nothing rather than erroring out. Also, to avoid
5439 * unnecessary chatter while restoring those old dumps, say
5440 * nothing at all if the command would be a no-op anyway.
5442 if (tuple_class->relowner != newOwnerId)
5444 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5445 errmsg("cannot change owner of index \"%s\"",
5446 NameStr(tuple_class->relname)),
5447 errhint("Change the ownership of the index's table, instead.")));
5448 /* quick hack to exit via the no-op path */
5449 newOwnerId = tuple_class->relowner;
5452 case RELKIND_TOASTVALUE:
5458 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5459 errmsg("\"%s\" is not a table, view, or sequence",
5460 NameStr(tuple_class->relname))));
5464 * If the new owner is the same as the existing owner, consider the
5465 * command to have succeeded. This is for dump restoration purposes.
5467 if (tuple_class->relowner != newOwnerId)
5469 Datum repl_val[Natts_pg_class];
5470 char repl_null[Natts_pg_class];
5471 char repl_repl[Natts_pg_class];
5477 /* skip permission checks when recursing to index or toast table */
5480 /* Superusers can always do it */
5483 Oid namespaceOid = tuple_class->relnamespace;
5484 AclResult aclresult;
5486 /* Otherwise, must be owner of the existing object */
5487 if (!pg_class_ownercheck(relationOid, GetUserId()))
5488 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5489 RelationGetRelationName(target_rel));
5491 /* Must be able to become new owner */
5492 check_is_member_of_role(GetUserId(), newOwnerId);
5494 /* New owner must have CREATE privilege on namespace */
5495 aclresult = pg_namespace_aclcheck(namespaceOid, newOwnerId,
5497 if (aclresult != ACLCHECK_OK)
5498 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
5499 get_namespace_name(namespaceOid));
5503 memset(repl_null, ' ', sizeof(repl_null));
5504 memset(repl_repl, ' ', sizeof(repl_repl));
5506 repl_repl[Anum_pg_class_relowner - 1] = 'r';
5507 repl_val[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(newOwnerId);
5510 * Determine the modified ACL for the new owner. This is only
5511 * necessary when the ACL is non-null.
5513 aclDatum = SysCacheGetAttr(RELOID, tuple,
5514 Anum_pg_class_relacl,
5518 newAcl = aclnewowner(DatumGetAclP(aclDatum),
5519 tuple_class->relowner, newOwnerId);
5520 repl_repl[Anum_pg_class_relacl - 1] = 'r';
5521 repl_val[Anum_pg_class_relacl - 1] = PointerGetDatum(newAcl);
5524 newtuple = heap_modifytuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl);
5526 simple_heap_update(class_rel, &newtuple->t_self, newtuple);
5527 CatalogUpdateIndexes(class_rel, newtuple);
5529 heap_freetuple(newtuple);
5531 /* Update owner dependency reference */
5532 changeDependencyOnOwner(RelationRelationId, relationOid, newOwnerId);
5535 * Also change the ownership of the table's rowtype, if it has one
5537 if (tuple_class->relkind != RELKIND_INDEX)
5538 AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId);
5541 * If we are operating on a table, also change the ownership of any
5542 * indexes and sequences that belong to the table, as well as the
5543 * table's toast table (if it has one)
5545 if (tuple_class->relkind == RELKIND_RELATION ||
5546 tuple_class->relkind == RELKIND_TOASTVALUE)
5548 List *index_oid_list;
5551 /* Find all the indexes belonging to this relation */
5552 index_oid_list = RelationGetIndexList(target_rel);
5554 /* For each index, recursively change its ownership */
5555 foreach(i, index_oid_list)
5556 ATExecChangeOwner(lfirst_oid(i), newOwnerId, true);
5558 list_free(index_oid_list);
5561 if (tuple_class->relkind == RELKIND_RELATION)
5563 /* If it has a toast table, recurse to change its ownership */
5564 if (tuple_class->reltoastrelid != InvalidOid)
5565 ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerId,
5568 /* If it has dependent sequences, recurse to change them too */
5569 change_owner_recurse_to_sequences(relationOid, newOwnerId);
5573 ReleaseSysCache(tuple);
5574 heap_close(class_rel, RowExclusiveLock);
5575 relation_close(target_rel, NoLock);
5579 * change_owner_recurse_to_sequences
5581 * Helper function for ATExecChangeOwner. Examines pg_depend searching
5582 * for sequences that are dependent on serial columns, and changes their
5586 change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId)
5594 * SERIAL sequences are those having an internal dependency on one of the
5595 * table's columns (we don't care *which* column, exactly).
5597 depRel = heap_open(DependRelationId, AccessShareLock);
5599 ScanKeyInit(&key[0],
5600 Anum_pg_depend_refclassid,
5601 BTEqualStrategyNumber, F_OIDEQ,
5602 ObjectIdGetDatum(RelationRelationId));
5603 ScanKeyInit(&key[1],
5604 Anum_pg_depend_refobjid,
5605 BTEqualStrategyNumber, F_OIDEQ,
5606 ObjectIdGetDatum(relationOid));
5607 /* we leave refobjsubid unspecified */
5609 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
5610 SnapshotNow, 2, key);
5612 while (HeapTupleIsValid(tup = systable_getnext(scan)))
5614 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
5617 /* skip dependencies other than internal dependencies on columns */
5618 if (depForm->refobjsubid == 0 ||
5619 depForm->classid != RelationRelationId ||
5620 depForm->objsubid != 0 ||
5621 depForm->deptype != DEPENDENCY_INTERNAL)
5624 /* Use relation_open just in case it's an index */
5625 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
5627 /* skip non-sequence relations */
5628 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
5630 /* No need to keep the lock */
5631 relation_close(seqRel, AccessExclusiveLock);
5635 /* We don't need to close the sequence while we alter it. */
5636 ATExecChangeOwner(depForm->objid, newOwnerId, false);
5638 /* Now we can close it. Keep the lock till end of transaction. */
5639 relation_close(seqRel, NoLock);
5642 systable_endscan(scan);
5644 relation_close(depRel, AccessShareLock);
5648 * ALTER TABLE CLUSTER ON
5650 * The only thing we have to do is to change the indisclustered bits.
5653 ATExecClusterOn(Relation rel, const char *indexName)
5657 indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
5659 if (!OidIsValid(indexOid))
5661 (errcode(ERRCODE_UNDEFINED_OBJECT),
5662 errmsg("index \"%s\" for table \"%s\" does not exist",
5663 indexName, RelationGetRelationName(rel))));
5665 /* Check index is valid to cluster on */
5666 check_index_is_clusterable(rel, indexOid, false);
5668 /* And do the work */
5669 mark_index_clustered(rel, indexOid);
5673 * ALTER TABLE SET WITHOUT CLUSTER
5675 * We have to find any indexes on the table that have indisclustered bit
5676 * set and turn it off.
5679 ATExecDropCluster(Relation rel)
5681 mark_index_clustered(rel, InvalidOid);
5685 * ALTER TABLE SET TABLESPACE
5688 ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
5691 AclResult aclresult;
5694 * We do our own permission checking because we want to allow this on
5697 if (rel->rd_rel->relkind != RELKIND_RELATION &&
5698 rel->rd_rel->relkind != RELKIND_INDEX)
5700 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5701 errmsg("\"%s\" is not a table or index",
5702 RelationGetRelationName(rel))));
5704 /* Permissions checks */
5705 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
5706 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5707 RelationGetRelationName(rel));
5709 if (!allowSystemTableMods && IsSystemRelation(rel))
5711 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5712 errmsg("permission denied: \"%s\" is a system catalog",
5713 RelationGetRelationName(rel))));
5715 /* Check that the tablespace exists */
5716 tablespaceId = get_tablespace_oid(tablespacename);
5717 if (!OidIsValid(tablespaceId))
5719 (errcode(ERRCODE_UNDEFINED_OBJECT),
5720 errmsg("tablespace \"%s\" does not exist", tablespacename)));
5722 /* Check its permissions */
5723 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
5724 if (aclresult != ACLCHECK_OK)
5725 aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
5727 /* Save info for Phase 3 to do the real work */
5728 if (OidIsValid(tab->newTableSpace))
5730 (errcode(ERRCODE_SYNTAX_ERROR),
5731 errmsg("cannot have multiple SET TABLESPACE subcommands")));
5732 tab->newTableSpace = tablespaceId;
5736 * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
5737 * rewriting to be done, so we just want to copy the data as fast as possible.
5740 ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
5746 RelFileNode newrnode;
5747 SMgrRelation dstrel;
5750 Form_pg_class rd_rel;
5752 rel = relation_open(tableOid, NoLock);
5755 * We can never allow moving of shared or nailed-in-cache relations,
5756 * because we can't support changing their reltablespace values.
5758 if (rel->rd_rel->relisshared || rel->rd_isnailed)
5760 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5761 errmsg("cannot move system relation \"%s\"",
5762 RelationGetRelationName(rel))));
5765 * Don't allow moving temp tables of other backends ... their local buffer
5766 * manager is not going to cope.
5768 if (isOtherTempNamespace(RelationGetNamespace(rel)))
5770 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5771 errmsg("cannot move temporary tables of other sessions")));
5774 * No work if no change in tablespace.
5776 oldTableSpace = rel->rd_rel->reltablespace;
5777 if (newTableSpace == oldTableSpace ||
5778 (newTableSpace == MyDatabaseTableSpace && oldTableSpace == 0))
5780 relation_close(rel, NoLock);
5784 reltoastrelid = rel->rd_rel->reltoastrelid;
5785 reltoastidxid = rel->rd_rel->reltoastidxid;
5787 /* Get a modifiable copy of the relation's pg_class row */
5788 pg_class = heap_open(RelationRelationId, RowExclusiveLock);
5790 tuple = SearchSysCacheCopy(RELOID,
5791 ObjectIdGetDatum(tableOid),
5793 if (!HeapTupleIsValid(tuple))
5794 elog(ERROR, "cache lookup failed for relation %u", tableOid);
5795 rd_rel = (Form_pg_class) GETSTRUCT(tuple);
5797 /* create another storage file. Is it a little ugly ? */
5798 /* NOTE: any conflict in relfilenode value will be caught here */
5799 newrnode = rel->rd_node;
5800 newrnode.spcNode = newTableSpace;
5802 dstrel = smgropen(newrnode);
5803 smgrcreate(dstrel, rel->rd_istemp, false);
5805 /* copy relation data to the new physical file */
5806 copy_relation_data(rel, dstrel);
5808 /* schedule unlinking old physical file */
5809 RelationOpenSmgr(rel);
5810 smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
5813 * Now drop smgr references. The source was already dropped by
5814 * smgrscheduleunlink.
5818 /* update the pg_class row */
5819 rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
5820 simple_heap_update(pg_class, &tuple->t_self, tuple);
5821 CatalogUpdateIndexes(pg_class, tuple);
5823 heap_freetuple(tuple);
5825 heap_close(pg_class, RowExclusiveLock);
5827 relation_close(rel, NoLock);
5829 /* Make sure the reltablespace change is visible */
5830 CommandCounterIncrement();
5832 /* Move associated toast relation and/or index, too */
5833 if (OidIsValid(reltoastrelid))
5834 ATExecSetTableSpace(reltoastrelid, newTableSpace);
5835 if (OidIsValid(reltoastidxid))
5836 ATExecSetTableSpace(reltoastidxid, newTableSpace);
5840 * Copy data, block by block
5843 copy_relation_data(Relation rel, SMgrRelation dst)
5847 BlockNumber nblocks;
5850 Page page = (Page) buf;
5853 * Since we copy the file directly without looking at the shared buffers,
5854 * we'd better first flush out any pages of the source relation that are
5855 * in shared buffers. We assume no new changes will be made while we are
5856 * holding exclusive lock on the rel.
5858 FlushRelationBuffers(rel);
5861 * We need to log the copied data in WAL iff WAL archiving is enabled AND
5862 * it's not a temp rel.
5864 use_wal = XLogArchivingActive() && !rel->rd_istemp;
5866 nblocks = RelationGetNumberOfBlocks(rel);
5867 /* RelationGetNumberOfBlocks will certainly have opened rd_smgr */
5870 for (blkno = 0; blkno < nblocks; blkno++)
5872 smgrread(src, blkno, buf);
5877 xl_heap_newpage xlrec;
5879 XLogRecData rdata[2];
5881 /* NO ELOG(ERROR) from here till newpage op is logged */
5882 START_CRIT_SECTION();
5884 xlrec.node = dst->smgr_rnode;
5885 xlrec.blkno = blkno;
5887 rdata[0].data = (char *) &xlrec;
5888 rdata[0].len = SizeOfHeapNewpage;
5889 rdata[0].buffer = InvalidBuffer;
5890 rdata[0].next = &(rdata[1]);
5892 rdata[1].data = (char *) page;
5893 rdata[1].len = BLCKSZ;
5894 rdata[1].buffer = InvalidBuffer;
5895 rdata[1].next = NULL;
5897 recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
5899 PageSetLSN(page, recptr);
5900 PageSetTLI(page, ThisTimeLineID);
5906 * Now write the page. We say isTemp = true even if it's not a temp
5907 * rel, because there's no need for smgr to schedule an fsync for this
5908 * write; we'll do it ourselves below.
5910 smgrwrite(dst, blkno, buf, true);
5914 * If the rel isn't temp, we must fsync it down to disk before it's safe
5915 * to commit the transaction. (For a temp rel we don't care since the rel
5916 * will be uninteresting after a crash anyway.)
5918 * It's obvious that we must do this when not WAL-logging the copy. It's
5919 * less obvious that we have to do it even if we did WAL-log the copied
5920 * pages. The reason is that since we're copying outside shared buffers, a
5921 * CHECKPOINT occurring during the copy has no way to flush the previously
5922 * written data to disk (indeed it won't know the new rel even exists). A
5923 * crash later on would replay WAL from the checkpoint, therefore it
5924 * wouldn't replay our earlier WAL entries. If we do not fsync those pages
5925 * here, they might still not be on disk when the crash occurs.
5927 if (!rel->rd_istemp)
5932 * ALTER TABLE ENABLE/DISABLE TRIGGER
5934 * We just pass this off to trigger.c.
5937 ATExecEnableDisableTrigger(Relation rel, char *trigname,
5938 bool enable, bool skip_system)
5940 EnableDisableTrigger(rel, trigname, enable, skip_system);
5944 decompile_conbin(HeapTuple contuple, TupleDesc tupledesc)
5946 Form_pg_constraint con = (Form_pg_constraint)(GETSTRUCT(contuple));
5950 d = fastgetattr(contuple, Anum_pg_constraint_conbin, tupledesc, &isnull);
5952 elog(ERROR, "conbin is null for constraint \"%s\"", NameStr(con->conname));
5953 d = DirectFunctionCall2(pg_get_expr, d, ObjectIdGetDatum(con->conrelid));
5954 return DatumGetCString(DirectFunctionCall1(textout,d));
5958 /* ALTER TABLE INHERIT */
5960 /* Add a parent to the child's parents. This verifies that all the columns and
5961 * check constraints of the parent appear in the child and that they have the
5962 * same data type and expressions.
5966 ATExecAddInherits(Relation rel, RangeVar *parent)
5972 HeapTuple inheritsTuple;
5977 /* XXX is this enough locking? */
5978 relation = heap_openrv(parent, AccessShareLock);
5981 * Must be owner of both parent and child -- child is taken care of by
5982 * ATSimplePermissions call in ATPrepCmd
5984 ATSimplePermissions(relation, false);
5986 /* Permanent rels cannot inherit from temporary ones */
5987 if (!isTempNamespace(RelationGetNamespace(rel)) &&
5988 isTempNamespace(RelationGetNamespace(relation)))
5990 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5991 errmsg("cannot inherit from temporary relation \"%s\"",
5994 /* If parent has OIDs then all children must have OIDs */
5995 if (relation->rd_rel->relhasoids && !rel->rd_rel->relhasoids)
5997 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5998 errmsg("table \"%s\" without OIDs cannot inherit from table \"%s\" with OIDs",
5999 RelationGetRelationName(rel), parent->relname)));
6002 * Reject duplications in the list of parents. We scan through the list of
6003 * parents in pg_inherit and keep track of the first open inhseqno slot
6004 * found to use for the new parent.
6006 catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
6008 Anum_pg_inherits_inhrelid,
6009 BTEqualStrategyNumber, F_OIDEQ,
6010 ObjectIdGetDatum(RelationGetRelid(rel)));
6011 scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
6012 true, SnapshotNow, 1, &key);
6013 inhseqno = 0; /* inhseqno sequences are supposed to start at
6015 while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
6017 Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inheritsTuple);
6019 if (inh->inhparent == RelationGetRelid(relation))
6021 (errcode(ERRCODE_DUPLICATE_TABLE),
6022 errmsg("inherited relation \"%s\" duplicated",
6024 if (inh->inhseqno == inhseqno + 1)
6025 inhseqno = inh->inhseqno;
6027 systable_endscan(scan);
6028 heap_close(catalogRelation, RowExclusiveLock);
6031 * If the new parent is found in our list of inheritors we have a circular
6035 /* this routine is actually in the planner */
6036 children = find_all_inheritors(RelationGetRelid(rel));
6038 if (list_member_oid(children, RelationGetRelid(relation)))
6040 (errcode(ERRCODE_DUPLICATE_TABLE),
6041 errmsg("circular inheritance structure found, \"%s\" is already a child of \"%s\"",
6042 parent->relname, RelationGetRelationName(rel))));
6045 /* Match up the columns and bump attinhcount and attislocal */
6046 MergeAttributesIntoExisting(rel, relation);
6048 /* Match up the constraints and make sure they're present in child */
6049 MergeConstraintsIntoExisting(rel, relation);
6052 * Use this refactored part of StoreCatalogInheritance which CREATE TABLE
6053 * uses to add the pg_inherit line
6055 catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
6056 StoreCatalogInheritance1(RelationGetRelid(rel), RelationGetRelid(relation),
6057 inhseqno + 1, catalogRelation);
6058 heap_close(catalogRelation, RowExclusiveLock);
6060 heap_close(relation, AccessShareLock);
6064 * Check columns in child table match up with columns in parent
6066 * Called by ATExecAddInherits
6068 * Currently all columns must be found in child. Missing columns are an error.
6069 * One day we might consider creating new columns like CREATE TABLE does.
6071 * The data type must match perfectly, if the parent column is NOT NULL then
6072 * the child table must be as well. Defaults are ignored however.
6077 MergeAttributesIntoExisting(Relation rel, Relation relation)
6080 AttrNumber parent_attno,
6082 TupleDesc tupleDesc;
6083 TupleConstr *constr;
6086 child_attno = RelationGetNumberOfAttributes(rel);
6088 tupleDesc = RelationGetDescr(relation);
6089 constr = tupleDesc->constr;
6091 for (parent_attno = 1; parent_attno <= tupleDesc->natts;
6094 Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
6095 char *attributeName = NameStr(attribute->attname);
6097 /* Ignore dropped columns in the parent. */
6098 if (attribute->attisdropped)
6101 /* Does it conflict with an existing column? */
6102 attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
6104 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), attributeName);
6105 if (HeapTupleIsValid(tuple))
6108 * Yes, try to merge the two column definitions. They must have
6109 * the same type and typmod.
6111 Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
6113 if (attribute->atttypid != childatt->atttypid ||
6114 attribute->atttypmod != childatt->atttypmod ||
6115 (attribute->attnotnull && !childatt->attnotnull))
6117 (errcode(ERRCODE_DATATYPE_MISMATCH),
6118 errmsg("child table \"%s\" has different type for column \"%s\"",
6119 RelationGetRelationName(rel), NameStr(attribute->attname))));
6121 childatt->attinhcount++;
6122 simple_heap_update(attrdesc, &tuple->t_self, tuple);
6123 /* XXX strength reduce open indexes to outside loop? */
6124 CatalogUpdateIndexes(attrdesc, tuple);
6125 heap_freetuple(tuple);
6128 * We don't touch default at all since we're not making any other
6129 * DDL changes to the child
6135 * No, create a new inherited column
6137 * Creating inherited columns in this case seems to be unpopular.
6138 * In the common use case of partitioned tables it's a foot-gun.
6141 (errcode(ERRCODE_DATATYPE_MISMATCH),
6142 errmsg("child table missing column \"%s\"",
6143 NameStr(attribute->attname))));
6145 heap_close(attrdesc, RowExclusiveLock);
6151 * Check constraints in child table match up with constraints in parent
6153 * Called by ATExecAddInherits
6155 * Currently all constraints in parent must be present in the child. One day we
6156 * may consider adding new constraints like CREATE TABLE does. We may also want
6157 * to allow an optional flag on parent table constraints indicating they are
6158 * intended to ONLY apply to the master table, not to the children. That would
6159 * make it possible to ensure no records are mistakenly inserted into the
6160 * master in partitioned tables rather than the appropriate child.
6162 * XXX this is O(n^2) which may be issue with tables with hundreds of
6163 * constraints. As long as tables have more like 10 constraints it shouldn't be
6164 * an issue though. Even 100 constraints ought not be the end of the world.
6168 MergeConstraintsIntoExisting(Relation rel, Relation relation)
6170 Relation catalogRelation;
6171 TupleDesc tupleDesc;
6174 HeapTuple constraintTuple;
6178 /* First gather up the child's constraint definitions */
6179 catalogRelation = heap_open(ConstraintRelationId, AccessShareLock);
6180 tupleDesc = RelationGetDescr(catalogRelation);
6183 Anum_pg_constraint_conrelid,
6184 BTEqualStrategyNumber,
6186 ObjectIdGetDatum(RelationGetRelid(rel)));
6187 scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId,
6188 true, SnapshotNow, 1, &key);
6191 while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
6193 Form_pg_constraint con = (Form_pg_constraint) (GETSTRUCT(constraintTuple));
6195 if (con->contype != CONSTRAINT_CHECK)
6197 /* XXX Do I need the copytuple here? */
6198 constraints = lappend(constraints, heap_copytuple(constraintTuple));
6200 systable_endscan(scan);
6202 /* Then loop through the parent's constraints looking for them in the list */
6204 Anum_pg_constraint_conrelid,
6205 BTEqualStrategyNumber,
6207 ObjectIdGetDatum(RelationGetRelid(relation)));
6208 scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, true,
6209 SnapshotNow, 1, &key);
6210 while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
6213 Form_pg_constraint parent_con = (Form_pg_constraint) (GETSTRUCT(constraintTuple));
6214 Form_pg_constraint child_con = NULL;
6215 HeapTuple child_contuple = NULL;
6217 if (parent_con->contype != CONSTRAINT_CHECK)
6220 foreach(elem, constraints)
6222 child_contuple = lfirst(elem);
6223 child_con = (Form_pg_constraint) (GETSTRUCT(child_contuple));
6224 if (!strcmp(NameStr(parent_con->conname),
6225 NameStr(child_con->conname)))
6234 (errcode(ERRCODE_DATATYPE_MISMATCH),
6235 errmsg("child table missing constraint matching parent table constraint \"%s\"",
6236 NameStr(parent_con->conname))));
6238 if (parent_con->condeferrable != child_con->condeferrable ||
6239 parent_con->condeferred != child_con->condeferred ||
6240 parent_con->contypid != child_con->contypid ||
6241 strcmp(decompile_conbin(constraintTuple, tupleDesc),
6242 decompile_conbin(child_contuple, tupleDesc)))
6244 (errcode(ERRCODE_DATATYPE_MISMATCH),
6245 errmsg("constraint definition for CHECK constraint \"%s\" doesn't match",
6246 NameStr(parent_con->conname))));
6249 * TODO: add conislocal,coninhcount to constraints. This is where we
6250 * would have to bump them just like attributes
6253 systable_endscan(scan);
6254 heap_close(catalogRelation, AccessShareLock);
6257 /* ALTER TABLE NO INHERIT */
6259 /* Drop a parent from the child's parents. This just adjusts the attinhcount
6260 * and attislocal of the columns and removes the pg_inherit and pg_depend
6263 * If attinhcount goes to 0 then attislocal gets set to true. If it goes back up
6264 * attislocal stays 0 which means if a child is ever removed from a parent then
6265 * its columns will never be automatically dropped which may surprise. But at
6266 * least we'll never surprise by dropping columns someone isn't expecting to be
6267 * dropped which would actually mean data loss.
6271 ATExecDropInherits(Relation rel, RangeVar *parent)
6275 Relation catalogRelation;
6278 HeapTuple inheritsTuple,
6286 * Get the OID of parent -- if no schema is specified use the regular
6287 * search path and only drop the one table that's found. We could try to
6288 * be clever and look at each parent and see if it matches but that would
6289 * be inconsistent with other operations I think.
6295 dropparent = RangeVarGetRelid(parent, false);
6297 /* Search through the direct parents of rel looking for dropparent oid */
6299 catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
6301 Anum_pg_inherits_inhrelid,
6302 BTEqualStrategyNumber, F_OIDEQ,
6303 ObjectIdGetDatum(RelationGetRelid(rel)));
6304 scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true, SnapshotNow, 1, key);
6305 while (!found && HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
6307 inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
6308 if (inhparent == dropparent)
6310 simple_heap_delete(catalogRelation, &inheritsTuple->t_self);
6314 systable_endscan(scan);
6315 heap_close(catalogRelation, RowExclusiveLock);
6320 if (parent->schemaname)
6322 (errcode(ERRCODE_UNDEFINED_TABLE),
6323 errmsg("relation \"%s.%s\" is not a parent of relation \"%s\"",
6324 parent->schemaname, parent->relname, RelationGetRelationName(rel))));
6327 (errcode(ERRCODE_UNDEFINED_TABLE),
6328 errmsg("relation \"%s\" is not a parent of relation \"%s\"",
6329 parent->relname, RelationGetRelationName(rel))));
6332 /* Search through columns looking for matching columns from parent table */
6334 catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock);
6336 Anum_pg_attribute_attrelid,
6337 BTEqualStrategyNumber,
6339 ObjectIdGetDatum(RelationGetRelid(rel)));
6340 scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId,
6341 true, SnapshotNow, 1, key);
6342 while (HeapTupleIsValid(attributeTuple = systable_getnext(scan)))
6344 Form_pg_attribute att = ((Form_pg_attribute) GETSTRUCT(attributeTuple));
6347 * Not an inherited column at all (do NOT use islocal for this
6348 * test--it can be true for inherited columns)
6350 if (att->attinhcount == 0)
6352 if (att->attisdropped)
6355 if (SearchSysCacheExistsAttName(dropparent, NameStr(att->attname)))
6357 /* Decrement inhcount and possibly set islocal to 1 */
6358 HeapTuple copyTuple = heap_copytuple(attributeTuple);
6359 Form_pg_attribute copy_att = ((Form_pg_attribute) GETSTRUCT(copyTuple));
6361 copy_att->attinhcount--;
6362 if (copy_att->attinhcount == 0)
6363 copy_att->attislocal = true;
6365 simple_heap_update(catalogRelation, ©Tuple->t_self, copyTuple);
6368 * XXX "Avoid using it for multiple tuples, since opening the
6369 * indexes and building the index info structures is moderately
6370 * expensive." Perhaps this can be moved outside the loop or else
6371 * at least the CatalogOpenIndexes/CatalogCloseIndexes moved
6372 * outside the loop but when I try that it seg faults?!
6374 CatalogUpdateIndexes(catalogRelation, copyTuple);
6375 heap_freetuple(copyTuple);
6378 systable_endscan(scan);
6379 heap_close(catalogRelation, RowExclusiveLock);
6383 * Drop the dependency
6385 * There's no convenient way to do this, so go trawling through pg_depend
6388 catalogRelation = heap_open(DependRelationId, RowExclusiveLock);
6390 ScanKeyInit(&key[0],
6391 Anum_pg_depend_classid,
6392 BTEqualStrategyNumber, F_OIDEQ,
6393 RelationRelationId);
6394 ScanKeyInit(&key[1],
6395 Anum_pg_depend_objid,
6396 BTEqualStrategyNumber, F_OIDEQ,
6397 ObjectIdGetDatum(RelationGetRelid(rel)));
6399 scan = systable_beginscan(catalogRelation, DependDependerIndexId, true,
6400 SnapshotNow, 2, key);
6402 while (HeapTupleIsValid(depTuple = systable_getnext(scan)))
6404 Form_pg_depend dep = (Form_pg_depend) GETSTRUCT(depTuple);
6406 if (dep->refclassid == RelationRelationId &&
6407 dep->refobjid == dropparent &&
6408 dep->deptype == DEPENDENCY_NORMAL)
6411 * Only delete a single dependency -- there shouldn't be more but
6414 simple_heap_delete(catalogRelation, &depTuple->t_self);
6419 systable_endscan(scan);
6421 heap_close(catalogRelation, RowExclusiveLock);
6426 * ALTER TABLE CREATE TOAST TABLE
6428 * Note: this is also invoked from outside this module; in such cases we
6429 * expect the caller to have verified that the relation is a table and we
6430 * have all the right permissions. Callers expect this function
6431 * to end with CommandCounterIncrement if it makes any changes.
6434 AlterTableCreateToastTable(Oid relOid, bool silent)
6439 bool shared_relation;
6443 char toast_relname[NAMEDATALEN];
6444 char toast_idxname[NAMEDATALEN];
6445 IndexInfo *indexInfo;
6446 Oid classObjectId[2];
6447 ObjectAddress baseobject,
6451 * Grab an exclusive lock on the target table, which we will NOT release
6452 * until end of transaction. (This is probably redundant in all present
6455 rel = heap_open(relOid, AccessExclusiveLock);
6458 * Toast table is shared if and only if its parent is.
6460 * We cannot allow toasting a shared relation after initdb (because
6461 * there's no way to mark it toasted in other databases' pg_class).
6462 * Unfortunately we can't distinguish initdb from a manually started
6463 * standalone backend (toasting happens after the bootstrap phase, so
6464 * checking IsBootstrapProcessingMode() won't work). However, we can at
6465 * least prevent this mistake under normal multi-user operation.
6467 shared_relation = rel->rd_rel->relisshared;
6468 if (shared_relation && IsUnderPostmaster)
6470 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6471 errmsg("shared tables cannot be toasted after initdb")));
6474 * Is it already toasted?
6476 if (rel->rd_rel->reltoastrelid != InvalidOid)
6480 heap_close(rel, NoLock);
6485 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6486 errmsg("table \"%s\" already has a TOAST table",
6487 RelationGetRelationName(rel))));
6491 * Check to see whether the table actually needs a TOAST table.
6493 if (!needs_toast_table(rel))
6497 heap_close(rel, NoLock);
6502 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6503 errmsg("table \"%s\" does not need a TOAST table",
6504 RelationGetRelationName(rel))));
6508 * Create the toast table and its index
6510 snprintf(toast_relname, sizeof(toast_relname),
6511 "pg_toast_%u", relOid);
6512 snprintf(toast_idxname, sizeof(toast_idxname),
6513 "pg_toast_%u_index", relOid);
6515 /* this is pretty painful... need a tuple descriptor */
6516 tupdesc = CreateTemplateTupleDesc(3, false);
6517 TupleDescInitEntry(tupdesc, (AttrNumber) 1,
6521 TupleDescInitEntry(tupdesc, (AttrNumber) 2,
6525 TupleDescInitEntry(tupdesc, (AttrNumber) 3,
6531 * Ensure that the toast table doesn't itself get toasted, or we'll be
6532 * toast :-(. This is essential for chunk_data because type bytea is
6533 * toastable; hit the other two just to be sure.
6535 tupdesc->attrs[0]->attstorage = 'p';
6536 tupdesc->attrs[1]->attstorage = 'p';
6537 tupdesc->attrs[2]->attstorage = 'p';
6540 * Note: the toast relation is placed in the regular pg_toast namespace
6541 * even if its master relation is a temp table. There cannot be any
6542 * naming collision, and the toast rel will be destroyed when its master
6543 * is, so there's no need to handle the toast rel as temp.
6545 toast_relid = heap_create_with_catalog(toast_relname,
6547 rel->rd_rel->reltablespace,
6549 rel->rd_rel->relowner,
6558 /* make the toast relation visible, else index creation will fail */
6559 CommandCounterIncrement();
6562 * Create unique index on chunk_id, chunk_seq.
6564 * NOTE: the normal TOAST access routines could actually function with a
6565 * single-column index on chunk_id only. However, the slice access
6566 * routines use both columns for faster access to an individual chunk. In
6567 * addition, we want it to be unique as a check against the possibility of
6568 * duplicate TOAST chunk OIDs. The index might also be a little more
6569 * efficient this way, since btree isn't all that happy with large numbers
6573 indexInfo = makeNode(IndexInfo);
6574 indexInfo->ii_NumIndexAttrs = 2;
6575 indexInfo->ii_KeyAttrNumbers[0] = 1;
6576 indexInfo->ii_KeyAttrNumbers[1] = 2;
6577 indexInfo->ii_Expressions = NIL;
6578 indexInfo->ii_ExpressionsState = NIL;
6579 indexInfo->ii_Predicate = NIL;
6580 indexInfo->ii_PredicateState = NIL;
6581 indexInfo->ii_Unique = true;
6583 classObjectId[0] = OID_BTREE_OPS_OID;
6584 classObjectId[1] = INT4_BTREE_OPS_OID;
6586 toast_idxid = index_create(toast_relid, toast_idxname, InvalidOid,
6589 rel->rd_rel->reltablespace,
6591 true, true, false, true, false);
6594 * Store the toast table's OID in the parent relation's pg_class row
6596 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
6598 reltup = SearchSysCacheCopy(RELOID,
6599 ObjectIdGetDatum(relOid),
6601 if (!HeapTupleIsValid(reltup))
6602 elog(ERROR, "cache lookup failed for relation %u", relOid);
6604 ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
6606 simple_heap_update(class_rel, &reltup->t_self, reltup);
6608 /* Keep catalog indexes current */
6609 CatalogUpdateIndexes(class_rel, reltup);
6611 heap_freetuple(reltup);
6613 heap_close(class_rel, RowExclusiveLock);
6616 * Register dependency from the toast table to the master, so that the
6617 * toast table will be deleted if the master is.
6619 baseobject.classId = RelationRelationId;
6620 baseobject.objectId = relOid;
6621 baseobject.objectSubId = 0;
6622 toastobject.classId = RelationRelationId;
6623 toastobject.objectId = toast_relid;
6624 toastobject.objectSubId = 0;
6626 recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
6629 * Clean up and make changes visible
6631 heap_close(rel, NoLock);
6633 CommandCounterIncrement();
6637 * Check to see whether the table needs a TOAST table. It does only if
6638 * (1) there are any toastable attributes, and (2) the maximum length
6639 * of a tuple could exceed TOAST_TUPLE_THRESHOLD. (We don't want to
6640 * create a toast table for something like "f1 varchar(20)".)
6643 needs_toast_table(Relation rel)
6645 int32 data_length = 0;
6646 bool maxlength_unknown = false;
6647 bool has_toastable_attrs = false;
6649 Form_pg_attribute *att;
6653 tupdesc = rel->rd_att;
6654 att = tupdesc->attrs;
6656 for (i = 0; i < tupdesc->natts; i++)
6658 if (att[i]->attisdropped)
6660 data_length = att_align(data_length, att[i]->attalign);
6661 if (att[i]->attlen > 0)
6663 /* Fixed-length types are never toastable */
6664 data_length += att[i]->attlen;
6668 int32 maxlen = type_maximum_size(att[i]->atttypid,
6672 maxlength_unknown = true;
6674 data_length += maxlen;
6675 if (att[i]->attstorage != 'p')
6676 has_toastable_attrs = true;
6679 if (!has_toastable_attrs)
6680 return false; /* nothing to toast? */
6681 if (maxlength_unknown)
6682 return true; /* any unlimited-length attrs? */
6683 tuple_length = MAXALIGN(offsetof(HeapTupleHeaderData, t_bits) +
6684 BITMAPLEN(tupdesc->natts)) +
6685 MAXALIGN(data_length);
6686 return (tuple_length > TOAST_TUPLE_THRESHOLD);
6691 * Execute ALTER TABLE SET SCHEMA
6693 * Note: caller must have checked ownership of the relation already
6696 AlterTableNamespace(RangeVar *relation, const char *newschema)
6704 rel = heap_openrv(relation, AccessExclusiveLock);
6706 /* heap_openrv allows TOAST, but we don't want to */
6707 if (rel->rd_rel->relkind == RELKIND_TOASTVALUE)
6709 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
6710 errmsg("\"%s\" is a TOAST relation",
6711 RelationGetRelationName(rel))));
6713 relid = RelationGetRelid(rel);
6714 oldNspOid = RelationGetNamespace(rel);
6716 /* get schema OID and check its permissions */
6717 nspOid = LookupCreationNamespace(newschema);
6719 if (oldNspOid == nspOid)
6721 (errcode(ERRCODE_DUPLICATE_TABLE),
6722 errmsg("relation \"%s\" is already in schema \"%s\"",
6723 RelationGetRelationName(rel),
6726 /* disallow renaming into or out of temp schemas */
6727 if (isAnyTempNamespace(nspOid) || isAnyTempNamespace(oldNspOid))
6729 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6730 errmsg("cannot move objects into or out of temporary schemas")));
6732 /* same for TOAST schema */
6733 if (nspOid == PG_TOAST_NAMESPACE || oldNspOid == PG_TOAST_NAMESPACE)
6735 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6736 errmsg("cannot move objects into or out of TOAST schema")));
6738 /* OK, modify the pg_class row and pg_depend entry */
6739 classRel = heap_open(RelationRelationId, RowExclusiveLock);
6741 AlterRelationNamespaceInternal(classRel, relid, oldNspOid, nspOid, true);
6743 /* Fix the table's rowtype too */
6744 AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false);
6746 /* Fix other dependent stuff */
6747 if (rel->rd_rel->relkind == RELKIND_RELATION)
6749 AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
6750 AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema);
6751 AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
6754 heap_close(classRel, RowExclusiveLock);
6756 /* close rel, but keep lock until commit */
6757 relation_close(rel, NoLock);
6761 * The guts of relocating a relation to another namespace: fix the pg_class
6762 * entry, and the pg_depend entry if any. Caller must already have
6763 * opened and write-locked pg_class.
6766 AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
6767 Oid oldNspOid, Oid newNspOid,
6768 bool hasDependEntry)
6771 Form_pg_class classForm;
6773 classTup = SearchSysCacheCopy(RELOID,
6774 ObjectIdGetDatum(relOid),
6776 if (!HeapTupleIsValid(classTup))
6777 elog(ERROR, "cache lookup failed for relation %u", relOid);
6778 classForm = (Form_pg_class) GETSTRUCT(classTup);
6780 Assert(classForm->relnamespace == oldNspOid);
6782 /* check for duplicate name (more friendly than unique-index failure) */
6783 if (get_relname_relid(NameStr(classForm->relname),
6784 newNspOid) != InvalidOid)
6786 (errcode(ERRCODE_DUPLICATE_TABLE),
6787 errmsg("relation \"%s\" already exists in schema \"%s\"",
6788 NameStr(classForm->relname),
6789 get_namespace_name(newNspOid))));
6791 /* classTup is a copy, so OK to scribble on */
6792 classForm->relnamespace = newNspOid;
6794 simple_heap_update(classRel, &classTup->t_self, classTup);
6795 CatalogUpdateIndexes(classRel, classTup);
6797 /* Update dependency on schema if caller said so */
6798 if (hasDependEntry &&
6799 changeDependencyFor(RelationRelationId, relOid,
6800 NamespaceRelationId, oldNspOid, newNspOid) != 1)
6801 elog(ERROR, "failed to change schema dependency for relation \"%s\"",
6802 NameStr(classForm->relname));
6804 heap_freetuple(classTup);
6808 * Move all indexes for the specified relation to another namespace.
6810 * Note: we assume adequate permission checking was done by the caller,
6811 * and that the caller has a suitable lock on the owning relation.
6814 AlterIndexNamespaces(Relation classRel, Relation rel,
6815 Oid oldNspOid, Oid newNspOid)
6820 indexList = RelationGetIndexList(rel);
6822 foreach(l, indexList)
6824 Oid indexOid = lfirst_oid(l);
6827 * Note: currently, the index will not have its own dependency on the
6828 * namespace, so we don't need to do changeDependencyFor(). There's no
6829 * rowtype in pg_type, either.
6831 AlterRelationNamespaceInternal(classRel, indexOid,
6832 oldNspOid, newNspOid,
6836 list_free(indexList);
6840 * Move all SERIAL-column sequences of the specified relation to another
6843 * Note: we assume adequate permission checking was done by the caller,
6844 * and that the caller has a suitable lock on the owning relation.
6847 AlterSeqNamespaces(Relation classRel, Relation rel,
6848 Oid oldNspOid, Oid newNspOid, const char *newNspName)
6856 * SERIAL sequences are those having an internal dependency on one of the
6857 * table's columns (we don't care *which* column, exactly).
6859 depRel = heap_open(DependRelationId, AccessShareLock);
6861 ScanKeyInit(&key[0],
6862 Anum_pg_depend_refclassid,
6863 BTEqualStrategyNumber, F_OIDEQ,
6864 ObjectIdGetDatum(RelationRelationId));
6865 ScanKeyInit(&key[1],
6866 Anum_pg_depend_refobjid,
6867 BTEqualStrategyNumber, F_OIDEQ,
6868 ObjectIdGetDatum(RelationGetRelid(rel)));
6869 /* we leave refobjsubid unspecified */
6871 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
6872 SnapshotNow, 2, key);
6874 while (HeapTupleIsValid(tup = systable_getnext(scan)))
6876 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
6879 /* skip dependencies other than internal dependencies on columns */
6880 if (depForm->refobjsubid == 0 ||
6881 depForm->classid != RelationRelationId ||
6882 depForm->objsubid != 0 ||
6883 depForm->deptype != DEPENDENCY_INTERNAL)
6886 /* Use relation_open just in case it's an index */
6887 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
6889 /* skip non-sequence relations */
6890 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
6892 /* No need to keep the lock */
6893 relation_close(seqRel, AccessExclusiveLock);
6897 /* Fix the pg_class and pg_depend entries */
6898 AlterRelationNamespaceInternal(classRel, depForm->objid,
6899 oldNspOid, newNspOid,
6903 * Sequences have entries in pg_type. We need to be careful to move
6904 * them to the new namespace, too.
6906 AlterTypeNamespaceInternal(RelationGetForm(seqRel)->reltype,
6909 /* Now we can close it. Keep the lock till end of transaction. */
6910 relation_close(seqRel, NoLock);
6913 systable_endscan(scan);
6915 relation_close(depRel, AccessShareLock);
6920 * This code supports
6921 * CREATE TEMP TABLE ... ON COMMIT { DROP | PRESERVE ROWS | DELETE ROWS }
6923 * Because we only support this for TEMP tables, it's sufficient to remember
6924 * the state in a backend-local data structure.
6928 * Register a newly-created relation's ON COMMIT action.
6931 register_on_commit_action(Oid relid, OnCommitAction action)
6934 MemoryContext oldcxt;
6937 * We needn't bother registering the relation unless there is an ON COMMIT
6938 * action we need to take.
6940 if (action == ONCOMMIT_NOOP || action == ONCOMMIT_PRESERVE_ROWS)
6943 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
6945 oc = (OnCommitItem *) palloc(sizeof(OnCommitItem));
6947 oc->oncommit = action;
6948 oc->creating_subid = GetCurrentSubTransactionId();
6949 oc->deleting_subid = InvalidSubTransactionId;
6951 on_commits = lcons(oc, on_commits);
6953 MemoryContextSwitchTo(oldcxt);
6957 * Unregister any ON COMMIT action when a relation is deleted.
6959 * Actually, we only mark the OnCommitItem entry as to be deleted after commit.
6962 remove_on_commit_action(Oid relid)
6966 foreach(l, on_commits)
6968 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6970 if (oc->relid == relid)
6972 oc->deleting_subid = GetCurrentSubTransactionId();
6979 * Perform ON COMMIT actions.
6981 * This is invoked just before actually committing, since it's possible
6982 * to encounter errors.
6985 PreCommit_on_commit_actions(void)
6988 List *oids_to_truncate = NIL;
6990 foreach(l, on_commits)
6992 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6994 /* Ignore entry if already dropped in this xact */
6995 if (oc->deleting_subid != InvalidSubTransactionId)
6998 switch (oc->oncommit)
7001 case ONCOMMIT_PRESERVE_ROWS:
7002 /* Do nothing (there shouldn't be such entries, actually) */
7004 case ONCOMMIT_DELETE_ROWS:
7005 oids_to_truncate = lappend_oid(oids_to_truncate, oc->relid);
7009 ObjectAddress object;
7011 object.classId = RelationRelationId;
7012 object.objectId = oc->relid;
7013 object.objectSubId = 0;
7014 performDeletion(&object, DROP_CASCADE);
7017 * Note that table deletion will call
7018 * remove_on_commit_action, so the entry should get marked
7021 Assert(oc->deleting_subid != InvalidSubTransactionId);
7026 if (oids_to_truncate != NIL)
7028 heap_truncate(oids_to_truncate);
7029 CommandCounterIncrement(); /* XXX needed? */
7034 * Post-commit or post-abort cleanup for ON COMMIT management.
7036 * All we do here is remove no-longer-needed OnCommitItem entries.
7038 * During commit, remove entries that were deleted during this transaction;
7039 * during abort, remove those created during this transaction.
7042 AtEOXact_on_commit_actions(bool isCommit)
7045 ListCell *prev_item;
7048 cur_item = list_head(on_commits);
7050 while (cur_item != NULL)
7052 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
7054 if (isCommit ? oc->deleting_subid != InvalidSubTransactionId :
7055 oc->creating_subid != InvalidSubTransactionId)
7057 /* cur_item must be removed */
7058 on_commits = list_delete_cell(on_commits, cur_item, prev_item);
7061 cur_item = lnext(prev_item);
7063 cur_item = list_head(on_commits);
7067 /* cur_item must be preserved */
7068 oc->creating_subid = InvalidSubTransactionId;
7069 oc->deleting_subid = InvalidSubTransactionId;
7070 prev_item = cur_item;
7071 cur_item = lnext(prev_item);
7077 * Post-subcommit or post-subabort cleanup for ON COMMIT management.
7079 * During subabort, we can immediately remove entries created during this
7080 * subtransaction. During subcommit, just relabel entries marked during
7081 * this subtransaction as being the parent's responsibility.
7084 AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
7085 SubTransactionId parentSubid)
7088 ListCell *prev_item;
7091 cur_item = list_head(on_commits);
7093 while (cur_item != NULL)
7095 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
7097 if (!isCommit && oc->creating_subid == mySubid)
7099 /* cur_item must be removed */
7100 on_commits = list_delete_cell(on_commits, cur_item, prev_item);
7103 cur_item = lnext(prev_item);
7105 cur_item = list_head(on_commits);
7109 /* cur_item must be preserved */
7110 if (oc->creating_subid == mySubid)
7111 oc->creating_subid = parentSubid;
7112 if (oc->deleting_subid == mySubid)
7113 oc->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId;
7114 prev_item = cur_item;
7115 cur_item = lnext(prev_item);