1 /*-------------------------------------------------------------------------
4 * Commands for creating and altering table structures and settings
6 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.180 2006/03/05 15:58:24 momjian Exp $
13 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/tuptoaster.h"
19 #include "catalog/catalog.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_constraint.h"
26 #include "catalog/pg_depend.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_namespace.h"
29 #include "catalog/pg_opclass.h"
30 #include "catalog/pg_trigger.h"
31 #include "catalog/pg_type.h"
32 #include "commands/cluster.h"
33 #include "commands/defrem.h"
34 #include "commands/tablecmds.h"
35 #include "commands/tablespace.h"
36 #include "commands/trigger.h"
37 #include "commands/typecmds.h"
38 #include "executor/executor.h"
39 #include "lib/stringinfo.h"
40 #include "miscadmin.h"
41 #include "nodes/makefuncs.h"
42 #include "optimizer/clauses.h"
43 #include "optimizer/plancat.h"
44 #include "optimizer/prep.h"
45 #include "parser/analyze.h"
46 #include "parser/gramparse.h"
47 #include "parser/parser.h"
48 #include "parser/parse_clause.h"
49 #include "parser/parse_coerce.h"
50 #include "parser/parse_expr.h"
51 #include "parser/parse_oper.h"
52 #include "parser/parse_relation.h"
53 #include "parser/parse_type.h"
54 #include "rewrite/rewriteHandler.h"
55 #include "storage/smgr.h"
56 #include "utils/acl.h"
57 #include "utils/builtins.h"
58 #include "utils/fmgroids.h"
59 #include "utils/inval.h"
60 #include "utils/lsyscache.h"
61 #include "utils/memutils.h"
62 #include "utils/relcache.h"
63 #include "utils/syscache.h"
67 * ON COMMIT action list
69 typedef struct OnCommitItem
71 Oid relid; /* relid of relation */
72 OnCommitAction oncommit; /* what to do at end of xact */
75 * If this entry was created during the current transaction,
76 * creating_subid is the ID of the creating subxact; if created in a prior
77 * transaction, creating_subid is zero. If deleted during the current
78 * transaction, deleting_subid is the ID of the deleting subxact; if no
79 * deletion request is pending, deleting_subid is zero.
81 SubTransactionId creating_subid;
82 SubTransactionId deleting_subid;
85 static List *on_commits = NIL;
89 * State information for ALTER TABLE
91 * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo
92 * structs, one for each table modified by the operation (the named table
93 * plus any child tables that are affected). We save lists of subcommands
94 * to apply to this table (possibly modified by parse transformation steps);
95 * these lists will be executed in Phase 2. If a Phase 3 step is needed,
96 * necessary information is stored in the constraints and newvals lists.
98 * Phase 2 is divided into multiple passes; subcommands are executed in
99 * a pass determined by subcommand type.
102 #define AT_PASS_DROP 0 /* DROP (all flavors) */
103 #define AT_PASS_ALTER_TYPE 1 /* ALTER COLUMN TYPE */
104 #define AT_PASS_OLD_INDEX 2 /* re-add existing indexes */
105 #define AT_PASS_OLD_CONSTR 3 /* re-add existing constraints */
106 #define AT_PASS_COL_ATTRS 4 /* set other column attributes */
107 /* We could support a RENAME COLUMN pass here, but not currently used */
108 #define AT_PASS_ADD_COL 5 /* ADD COLUMN */
109 #define AT_PASS_ADD_INDEX 6 /* ADD indexes */
110 #define AT_PASS_ADD_CONSTR 7 /* ADD constraints, defaults */
111 #define AT_PASS_MISC 8 /* other stuff */
112 #define AT_NUM_PASSES 9
114 typedef struct AlteredTableInfo
116 /* Information saved before any work commences: */
117 Oid relid; /* Relation to work on */
118 char relkind; /* Its relkind */
119 TupleDesc oldDesc; /* Pre-modification tuple descriptor */
120 /* Information saved by Phase 1 for Phase 2: */
121 List *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
122 /* Information saved by Phases 1/2 for Phase 3: */
123 List *constraints; /* List of NewConstraint */
124 List *newvals; /* List of NewColumnValue */
125 Oid newTableSpace; /* new tablespace; 0 means no change */
126 /* Objects to rebuild after completing ALTER TYPE operations */
127 List *changedConstraintOids; /* OIDs of constraints to rebuild */
128 List *changedConstraintDefs; /* string definitions of same */
129 List *changedIndexOids; /* OIDs of indexes to rebuild */
130 List *changedIndexDefs; /* string definitions of same */
133 /* Struct describing one new constraint to check in Phase 3 scan */
134 typedef struct NewConstraint
136 char *name; /* Constraint name, or NULL if none */
137 ConstrType contype; /* CHECK, NOT_NULL, or FOREIGN */
138 AttrNumber attnum; /* only relevant for NOT_NULL */
139 Oid refrelid; /* PK rel, if FOREIGN */
140 Node *qual; /* Check expr or FkConstraint struct */
141 List *qualstate; /* Execution state for CHECK */
145 * Struct describing one new column value that needs to be computed during
146 * Phase 3 copy (this could be either a new column with a non-null default, or
147 * a column that we're changing the type of). Columns without such an entry
148 * are just copied from the old table during ATRewriteTable. Note that the
149 * expr is an expression over *old* table values.
151 typedef struct NewColumnValue
153 AttrNumber attnum; /* which column */
154 Expr *expr; /* expression to compute */
155 ExprState *exprstate; /* execution state */
159 static void truncate_check_rel(Relation rel);
160 static List *MergeAttributes(List *schema, List *supers, bool istemp,
161 List **supOids, List **supconstr, int *supOidCount);
162 static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
163 static void StoreCatalogInheritance(Oid relationId, List *supers);
164 static int findAttrByName(const char *attributeName, List *schema);
165 static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
166 static bool needs_toast_table(Relation rel);
167 static void AlterIndexNamespaces(Relation classRel, Relation rel,
168 Oid oldNspOid, Oid newNspOid);
169 static void AlterSeqNamespaces(Relation classRel, Relation rel,
170 Oid oldNspOid, Oid newNspOid,
171 const char *newNspName);
172 static int transformColumnNameList(Oid relId, List *colList,
173 int16 *attnums, Oid *atttypids);
174 static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
176 int16 *attnums, Oid *atttypids,
178 static Oid transformFkeyCheckAttrs(Relation pkrel,
179 int numattrs, int16 *attnums,
181 static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
182 Relation rel, Relation pkrel);
183 static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
185 static char *fkMatchTypeToString(char match_type);
186 static void ATController(Relation rel, List *cmds, bool recurse);
187 static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
188 bool recurse, bool recursing);
189 static void ATRewriteCatalogs(List **wqueue);
190 static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
191 static void ATRewriteTables(List **wqueue);
192 static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
193 static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
194 static void ATSimplePermissions(Relation rel, bool allowView);
195 static void ATSimpleRecursion(List **wqueue, Relation rel,
196 AlterTableCmd *cmd, bool recurse);
197 static void ATOneLevelRecursion(List **wqueue, Relation rel,
199 static void find_composite_type_dependencies(Oid typeOid,
200 const char *origTblName);
201 static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
203 static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
205 static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
206 static void add_column_support_dependency(Oid relid, int32 attnum,
208 static void ATExecDropNotNull(Relation rel, const char *colName);
209 static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
210 const char *colName);
211 static void ATExecColumnDefault(Relation rel, const char *colName,
213 static void ATPrepSetStatistics(Relation rel, const char *colName,
215 static void ATExecSetStatistics(Relation rel, const char *colName,
217 static void ATExecSetStorage(Relation rel, const char *colName,
219 static void ATExecDropColumn(Relation rel, const char *colName,
220 DropBehavior behavior,
221 bool recurse, bool recursing);
222 static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
223 IndexStmt *stmt, bool is_rebuild);
224 static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
225 Node *newConstraint);
226 static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
227 FkConstraint *fkconstraint);
228 static void ATPrepDropConstraint(List **wqueue, Relation rel,
229 bool recurse, AlterTableCmd *cmd);
230 static void ATExecDropConstraint(Relation rel, const char *constrName,
231 DropBehavior behavior, bool quiet);
232 static void ATPrepAlterColumnType(List **wqueue,
233 AlteredTableInfo *tab, Relation rel,
234 bool recurse, bool recursing,
236 static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
237 const char *colName, TypeName *typename);
238 static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
239 static void ATPostAlterTypeParse(char *cmd, List **wqueue);
240 static void change_owner_recurse_to_sequences(Oid relationOid,
242 static void ATExecClusterOn(Relation rel, const char *indexName);
243 static void ATExecDropCluster(Relation rel);
244 static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
245 char *tablespacename);
246 static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
247 static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
248 bool enable, bool skip_system);
249 static void copy_relation_data(Relation rel, SMgrRelation dst);
250 static void update_ri_trigger_args(Oid relid,
254 bool update_relname);
257 /* ----------------------------------------------------------------
259 * Creates a new relation.
261 * If successful, returns the OID of the new relation.
262 * ----------------------------------------------------------------
265 DefineRelation(CreateStmt *stmt, char relkind)
267 char relname[NAMEDATALEN];
269 List *schema = stmt->tableElts;
273 TupleDesc descriptor;
275 List *old_constraints;
284 * Truncate relname to appropriate length (probably a waste of time, as
285 * parser should have done this already).
287 StrNCpy(relname, stmt->relation->relname, NAMEDATALEN);
290 * Check consistency of arguments
292 if (stmt->oncommit != ONCOMMIT_NOOP && !stmt->relation->istemp)
294 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
295 errmsg("ON COMMIT can only be used on temporary tables")));
298 * Look up the namespace in which we are supposed to create the relation.
299 * Check we have permission to create there. Skip check if bootstrapping,
300 * since permissions machinery may not be working yet.
302 namespaceId = RangeVarGetCreationNamespace(stmt->relation);
304 if (!IsBootstrapProcessingMode())
308 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
310 if (aclresult != ACLCHECK_OK)
311 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
312 get_namespace_name(namespaceId));
316 * Select tablespace to use. If not specified, use default_tablespace
317 * (which may in turn default to database's default).
319 if (stmt->tablespacename)
321 tablespaceId = get_tablespace_oid(stmt->tablespacename);
322 if (!OidIsValid(tablespaceId))
324 (errcode(ERRCODE_UNDEFINED_OBJECT),
325 errmsg("tablespace \"%s\" does not exist",
326 stmt->tablespacename)));
330 tablespaceId = GetDefaultTablespace();
331 /* note InvalidOid is OK in this case */
334 /* Check permissions except when using database's default */
335 if (OidIsValid(tablespaceId))
339 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
341 if (aclresult != ACLCHECK_OK)
342 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
343 get_tablespace_name(tablespaceId));
347 * Look up inheritance ancestors and generate relation schema, including
348 * inherited attributes.
350 schema = MergeAttributes(schema, stmt->inhRelations,
351 stmt->relation->istemp,
352 &inheritOids, &old_constraints, &parentOidCount);
355 * Create a relation descriptor from the relation schema and create the
356 * relation. Note that in this stage only inherited (pre-cooked) defaults
357 * and constraints will be included into the new relation.
358 * (BuildDescForRelation takes care of the inherited defaults, but we have
359 * to copy inherited constraints here.)
361 descriptor = BuildDescForRelation(schema);
363 localHasOids = interpretOidsOption(stmt->hasoids);
364 descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
366 if (old_constraints != NIL)
368 ConstrCheck *check = (ConstrCheck *)
369 palloc0(list_length(old_constraints) * sizeof(ConstrCheck));
372 foreach(listptr, old_constraints)
374 Constraint *cdef = (Constraint *) lfirst(listptr);
377 if (cdef->contype != CONSTR_CHECK)
379 Assert(cdef->name != NULL);
380 Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
383 * In multiple-inheritance situations, it's possible to inherit
384 * the same grandparent constraint through multiple parents.
385 * Hence, discard inherited constraints that match as to both name
386 * and expression. Otherwise, gripe if the names conflict.
388 for (i = 0; i < ncheck; i++)
390 if (strcmp(check[i].ccname, cdef->name) != 0)
392 if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0)
398 (errcode(ERRCODE_DUPLICATE_OBJECT),
399 errmsg("duplicate check constraint name \"%s\"",
404 check[ncheck].ccname = cdef->name;
405 check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
411 if (descriptor->constr == NULL)
413 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
414 descriptor->constr->defval = NULL;
415 descriptor->constr->num_defval = 0;
416 descriptor->constr->has_not_null = false;
418 descriptor->constr->num_check = ncheck;
419 descriptor->constr->check = check;
423 relationId = heap_create_with_catalog(relname,
434 allowSystemTableMods);
436 StoreCatalogInheritance(relationId, inheritOids);
439 * We must bump the command counter to make the newly-created relation
440 * tuple visible for opening.
442 CommandCounterIncrement();
445 * Open the new relation and acquire exclusive lock on it. This isn't
446 * really necessary for locking out other backends (since they can't see
447 * the new rel anyway until we commit), but it keeps the lock manager from
448 * complaining about deadlock risks.
450 rel = relation_open(relationId, AccessExclusiveLock);
453 * Now add any newly specified column default values and CHECK constraints
454 * to the new relation. These are passed to us in the form of raw
455 * parsetrees; we need to transform them to executable expression trees
456 * before they can be added. The most convenient way to do that is to
457 * apply the parser's transformExpr routine, but transformExpr doesn't
458 * work unless we have a pre-existing relation. So, the transformation has
459 * to be postponed to this final step of CREATE TABLE.
461 * Another task that's conveniently done at this step is to add dependency
462 * links between columns and supporting relations (such as SERIAL
465 * First, scan schema to find new column defaults.
470 foreach(listptr, schema)
472 ColumnDef *colDef = lfirst(listptr);
476 if (colDef->raw_default != NULL)
478 RawColumnDefault *rawEnt;
480 Assert(colDef->cooked_default == NULL);
482 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
483 rawEnt->attnum = attnum;
484 rawEnt->raw_default = colDef->raw_default;
485 rawDefaults = lappend(rawDefaults, rawEnt);
488 /* Create dependency for supporting relation for this column */
489 if (colDef->support != NULL)
490 add_column_support_dependency(relationId, attnum, colDef->support);
494 * Parse and add the defaults/constraints, if any.
496 if (rawDefaults || stmt->constraints)
497 AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
500 * Clean up. We keep lock on new relation (although it shouldn't be
501 * visible to anyone else anyway, until commit).
503 relation_close(rel, NoLock);
510 * Deletes a relation.
513 RemoveRelation(const RangeVar *relation, DropBehavior behavior)
516 ObjectAddress object;
518 relOid = RangeVarGetRelid(relation, false);
520 object.classId = RelationRelationId;
521 object.objectId = relOid;
522 object.objectSubId = 0;
524 performDeletion(&object, behavior);
529 * Executes a TRUNCATE command.
531 * This is a multi-relation truncate. We first open and grab exclusive
532 * lock on all relations involved, checking permissions and otherwise
533 * verifying that the relation is OK for truncation. In CASCADE mode,
534 * relations having FK references to the targeted relations are automatically
535 * added to the group; in RESTRICT mode, we check that all FK references are
536 * internal to the group that's being truncated. Finally all the relations
537 * are truncated and reindexed.
540 ExecuteTruncate(TruncateStmt *stmt)
547 * Open, exclusive-lock, and check all the explicitly-specified relations
549 foreach(cell, stmt->relations)
551 RangeVar *rv = lfirst(cell);
554 rel = heap_openrv(rv, AccessExclusiveLock);
555 truncate_check_rel(rel);
556 rels = lappend(rels, rel);
557 relids = lappend_oid(relids, RelationGetRelid(rel));
561 * In CASCADE mode, suck in all referencing relations as well. This
562 * requires multiple iterations to find indirectly-dependent relations.
563 * At each phase, we need to exclusive-lock new rels before looking
564 * for their dependencies, else we might miss something. Also, we
565 * check each rel as soon as we open it, to avoid a faux pas such as
566 * holding lock for a long time on a rel we have no permissions for.
568 if (stmt->behavior == DROP_CASCADE)
574 newrelids = heap_truncate_find_FKs(relids);
575 if (newrelids == NIL)
576 break; /* nothing else to add */
578 foreach(cell, newrelids)
580 Oid relid = lfirst_oid(cell);
583 rel = heap_open(relid, AccessExclusiveLock);
585 (errmsg("truncate cascades to table \"%s\"",
586 RelationGetRelationName(rel))));
587 truncate_check_rel(rel);
588 rels = lappend(rels, rel);
589 relids = lappend_oid(relids, relid);
595 * Check foreign key references. In CASCADE mode, this should be
596 * unnecessary since we just pulled in all the references; but as
597 * a cross-check, do it anyway if in an Assert-enabled build.
599 #ifdef USE_ASSERT_CHECKING
600 heap_truncate_check_FKs(rels, false);
602 if (stmt->behavior == DROP_RESTRICT)
603 heap_truncate_check_FKs(rels, false);
607 * OK, truncate each table.
611 Relation rel = (Relation) lfirst(cell);
616 * Create a new empty storage file for the relation, and assign it as
617 * the relfilenode value. The old storage file is scheduled for
618 * deletion at commit.
620 setNewRelfilenode(rel);
622 heap_relid = RelationGetRelid(rel);
623 toast_relid = rel->rd_rel->reltoastrelid;
625 heap_close(rel, NoLock);
628 * The same for the toast table, if any.
630 if (OidIsValid(toast_relid))
632 rel = relation_open(toast_relid, AccessExclusiveLock);
633 setNewRelfilenode(rel);
634 heap_close(rel, NoLock);
638 * Reconstruct the indexes to match, and we're done.
640 reindex_relation(heap_relid, true);
645 * Check that a given rel is safe to truncate. Subroutine for ExecuteTruncate
648 truncate_check_rel(Relation rel)
650 /* Only allow truncate on regular tables */
651 if (rel->rd_rel->relkind != RELKIND_RELATION)
653 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
654 errmsg("\"%s\" is not a table",
655 RelationGetRelationName(rel))));
657 /* Permissions checks */
658 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
659 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
660 RelationGetRelationName(rel));
662 if (!allowSystemTableMods && IsSystemRelation(rel))
664 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
665 errmsg("permission denied: \"%s\" is a system catalog",
666 RelationGetRelationName(rel))));
669 * We can never allow truncation of shared or nailed-in-cache
670 * relations, because we can't support changing their relfilenode
673 if (rel->rd_rel->relisshared || rel->rd_isnailed)
675 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
676 errmsg("cannot truncate system relation \"%s\"",
677 RelationGetRelationName(rel))));
680 * Don't allow truncate on temp tables of other backends ... their
681 * local buffer manager is not going to cope.
683 if (isOtherTempNamespace(RelationGetNamespace(rel)))
685 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
686 errmsg("cannot truncate temporary tables of other sessions")));
691 * Returns new schema given initial schema and superclasses.
694 * 'schema' is the column/attribute definition for the table. (It's a list
695 * of ColumnDef's.) It is destructively changed.
696 * 'supers' is a list of names (as RangeVar nodes) of parent relations.
697 * 'istemp' is TRUE if we are creating a temp relation.
700 * 'supOids' receives a list of the OIDs of the parent relations.
701 * 'supconstr' receives a list of constraints belonging to the parents,
702 * updated as necessary to be valid for the child.
703 * 'supOidCount' is set to the number of parents that have OID columns.
706 * Completed schema list.
709 * The order in which the attributes are inherited is very important.
710 * Intuitively, the inherited attributes should come first. If a table
711 * inherits from multiple parents, the order of those attributes are
712 * according to the order of the parents specified in CREATE TABLE.
716 * create table person (name text, age int4, location point);
717 * create table emp (salary int4, manager text) inherits(person);
718 * create table student (gpa float8) inherits (person);
719 * create table stud_emp (percent int4) inherits (emp, student);
721 * The order of the attributes of stud_emp is:
723 * person {1:name, 2:age, 3:location}
725 * {6:gpa} student emp {4:salary, 5:manager}
727 * stud_emp {7:percent}
729 * If the same attribute name appears multiple times, then it appears
730 * in the result table in the proper location for its first appearance.
732 * Constraints (including NOT NULL constraints) for the child table
733 * are the union of all relevant constraints, from both the child schema
736 * The default value for a child column is defined as:
737 * (1) If the child schema specifies a default, that value is used.
738 * (2) If neither the child nor any parent specifies a default, then
739 * the column will not have a default.
740 * (3) If conflicting defaults are inherited from different parents
741 * (and not overridden by the child), an error is raised.
742 * (4) Otherwise the inherited default is used.
743 * Rule (3) is new in Postgres 7.1; in earlier releases you got a
744 * rather arbitrary choice of which parent default to use.
748 MergeAttributes(List *schema, List *supers, bool istemp,
749 List **supOids, List **supconstr, int *supOidCount)
752 List *inhSchema = NIL;
753 List *parentOids = NIL;
754 List *constraints = NIL;
755 int parentsWithOids = 0;
756 bool have_bogus_defaults = false;
757 char *bogus_marker = "Bogus!"; /* marks conflicting defaults */
761 * Check for and reject tables with too many columns. We perform this
762 * check relatively early for two reasons: (a) we don't run the risk of
763 * overflowing an AttrNumber in subsequent code (b) an O(n^2) algorithm is
764 * okay if we're processing <= 1600 columns, but could take minutes to
765 * execute if the user attempts to create a table with hundreds of
766 * thousands of columns.
768 * Note that we also need to check that any we do not exceed this figure
769 * after including columns from inherited relations.
771 if (list_length(schema) > MaxHeapAttributeNumber)
773 (errcode(ERRCODE_TOO_MANY_COLUMNS),
774 errmsg("tables can have at most %d columns",
775 MaxHeapAttributeNumber)));
778 * Check for duplicate names in the explicit list of attributes.
780 * Although we might consider merging such entries in the same way that we
781 * handle name conflicts for inherited attributes, it seems to make more
782 * sense to assume such conflicts are errors.
784 foreach(entry, schema)
786 ColumnDef *coldef = lfirst(entry);
789 for_each_cell(rest, lnext(entry))
791 ColumnDef *restdef = lfirst(rest);
793 if (strcmp(coldef->colname, restdef->colname) == 0)
795 (errcode(ERRCODE_DUPLICATE_COLUMN),
796 errmsg("column \"%s\" duplicated",
802 * Scan the parents left-to-right, and merge their attributes to form a
803 * list of inherited attributes (inhSchema). Also check to see if we need
804 * to inherit an OID column.
807 foreach(entry, supers)
809 RangeVar *parent = (RangeVar *) lfirst(entry);
813 AttrNumber *newattno;
814 AttrNumber parent_attno;
816 relation = heap_openrv(parent, AccessShareLock);
818 if (relation->rd_rel->relkind != RELKIND_RELATION)
820 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
821 errmsg("inherited relation \"%s\" is not a table",
823 /* Permanent rels cannot inherit from temporary ones */
824 if (!istemp && isTempNamespace(RelationGetNamespace(relation)))
826 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
827 errmsg("cannot inherit from temporary relation \"%s\"",
831 * We should have an UNDER permission flag for this, but for now,
832 * demand that creator of a child table own the parent.
834 if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
835 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
836 RelationGetRelationName(relation));
839 * Reject duplications in the list of parents.
841 if (list_member_oid(parentOids, RelationGetRelid(relation)))
843 (errcode(ERRCODE_DUPLICATE_TABLE),
844 errmsg("inherited relation \"%s\" duplicated",
847 parentOids = lappend_oid(parentOids, RelationGetRelid(relation));
849 if (relation->rd_rel->relhasoids)
852 tupleDesc = RelationGetDescr(relation);
853 constr = tupleDesc->constr;
856 * newattno[] will contain the child-table attribute numbers for the
857 * attributes of this parent table. (They are not the same for
858 * parents after the first one, nor if we have dropped columns.)
860 newattno = (AttrNumber *)
861 palloc(tupleDesc->natts * sizeof(AttrNumber));
863 for (parent_attno = 1; parent_attno <= tupleDesc->natts;
866 Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
867 char *attributeName = NameStr(attribute->attname);
873 * Ignore dropped columns in the parent.
875 if (attribute->attisdropped)
878 * change_varattnos_of_a_node asserts that this is greater
879 * than zero, so if anything tries to use it, we should find
882 newattno[parent_attno - 1] = 0;
887 * Does it conflict with some previously inherited column?
889 exist_attno = findAttrByName(attributeName, inhSchema);
893 * Yes, try to merge the two column definitions. They must
894 * have the same type and typmod.
897 (errmsg("merging multiple inherited definitions of column \"%s\"",
899 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
900 if (typenameTypeId(def->typename) != attribute->atttypid ||
901 def->typename->typmod != attribute->atttypmod)
903 (errcode(ERRCODE_DATATYPE_MISMATCH),
904 errmsg("inherited column \"%s\" has a type conflict",
906 errdetail("%s versus %s",
907 TypeNameToString(def->typename),
908 format_type_be(attribute->atttypid))));
910 /* Merge of NOT NULL constraints = OR 'em together */
911 def->is_not_null |= attribute->attnotnull;
912 /* Default and other constraints are handled below */
913 newattno[parent_attno - 1] = exist_attno;
918 * No, create a new inherited column
920 def = makeNode(ColumnDef);
921 def->colname = pstrdup(attributeName);
922 typename = makeNode(TypeName);
923 typename->typeid = attribute->atttypid;
924 typename->typmod = attribute->atttypmod;
925 def->typename = typename;
927 def->is_local = false;
928 def->is_not_null = attribute->attnotnull;
929 def->raw_default = NULL;
930 def->cooked_default = NULL;
931 def->constraints = NIL;
933 inhSchema = lappend(inhSchema, def);
934 newattno[parent_attno - 1] = ++child_attno;
938 * Copy default if any
940 if (attribute->atthasdef)
942 char *this_default = NULL;
943 AttrDefault *attrdef;
946 /* Find default in constraint structure */
947 Assert(constr != NULL);
948 attrdef = constr->defval;
949 for (i = 0; i < constr->num_defval; i++)
951 if (attrdef[i].adnum == parent_attno)
953 this_default = attrdef[i].adbin;
957 Assert(this_default != NULL);
960 * If default expr could contain any vars, we'd need to fix
961 * 'em, but it can't; so default is ready to apply to child.
963 * If we already had a default from some prior parent, check
964 * to see if they are the same. If so, no problem; if not,
965 * mark the column as having a bogus default. Below, we will
966 * complain if the bogus default isn't overridden by the child
969 Assert(def->raw_default == NULL);
970 if (def->cooked_default == NULL)
971 def->cooked_default = pstrdup(this_default);
972 else if (strcmp(def->cooked_default, this_default) != 0)
974 def->cooked_default = bogus_marker;
975 have_bogus_defaults = true;
981 * Now copy the constraints of this parent, adjusting attnos using the
982 * completed newattno[] map
984 if (constr && constr->num_check > 0)
986 ConstrCheck *check = constr->check;
989 for (i = 0; i < constr->num_check; i++)
991 Constraint *cdef = makeNode(Constraint);
994 cdef->contype = CONSTR_CHECK;
995 cdef->name = pstrdup(check[i].ccname);
996 cdef->raw_expr = NULL;
997 /* adjust varattnos of ccbin here */
998 expr = stringToNode(check[i].ccbin);
999 change_varattnos_of_a_node(expr, newattno);
1000 cdef->cooked_expr = nodeToString(expr);
1001 constraints = lappend(constraints, cdef);
1008 * Close the parent rel, but keep our AccessShareLock on it until xact
1009 * commit. That will prevent someone else from deleting or ALTERing
1010 * the parent before the child is committed.
1012 heap_close(relation, NoLock);
1016 * If we had no inherited attributes, the result schema is just the
1017 * explicitly declared columns. Otherwise, we need to merge the declared
1018 * columns into the inherited schema list.
1020 if (inhSchema != NIL)
1022 foreach(entry, schema)
1024 ColumnDef *newdef = lfirst(entry);
1025 char *attributeName = newdef->colname;
1029 * Does it conflict with some previously inherited column?
1031 exist_attno = findAttrByName(attributeName, inhSchema);
1032 if (exist_attno > 0)
1037 * Yes, try to merge the two column definitions. They must
1038 * have the same type and typmod.
1041 (errmsg("merging column \"%s\" with inherited definition",
1043 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
1044 if (typenameTypeId(def->typename) != typenameTypeId(newdef->typename) ||
1045 def->typename->typmod != newdef->typename->typmod)
1047 (errcode(ERRCODE_DATATYPE_MISMATCH),
1048 errmsg("column \"%s\" has a type conflict",
1050 errdetail("%s versus %s",
1051 TypeNameToString(def->typename),
1052 TypeNameToString(newdef->typename))));
1053 /* Mark the column as locally defined */
1054 def->is_local = true;
1055 /* Merge of NOT NULL constraints = OR 'em together */
1056 def->is_not_null |= newdef->is_not_null;
1057 /* If new def has a default, override previous default */
1058 if (newdef->raw_default != NULL)
1060 def->raw_default = newdef->raw_default;
1061 def->cooked_default = newdef->cooked_default;
1067 * No, attach new column to result schema
1069 inhSchema = lappend(inhSchema, newdef);
1076 * Check that we haven't exceeded the legal # of columns after merging
1077 * in inherited columns.
1079 if (list_length(schema) > MaxHeapAttributeNumber)
1081 (errcode(ERRCODE_TOO_MANY_COLUMNS),
1082 errmsg("tables can have at most %d columns",
1083 MaxHeapAttributeNumber)));
1087 * If we found any conflicting parent default values, check to make sure
1088 * they were overridden by the child.
1090 if (have_bogus_defaults)
1092 foreach(entry, schema)
1094 ColumnDef *def = lfirst(entry);
1096 if (def->cooked_default == bogus_marker)
1098 (errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
1099 errmsg("column \"%s\" inherits conflicting default values",
1101 errhint("To resolve the conflict, specify a default explicitly.")));
1105 *supOids = parentOids;
1106 *supconstr = constraints;
1107 *supOidCount = parentsWithOids;
1112 * complementary static functions for MergeAttributes().
1114 * Varattnos of pg_constraint.conbin must be rewritten when subclasses inherit
1115 * constraints from parent classes, since the inherited attributes could
1116 * be given different column numbers in multiple-inheritance cases.
1118 * Note that the passed node tree is modified in place!
1121 change_varattnos_walker(Node *node, const AttrNumber *newattno)
1127 Var *var = (Var *) node;
1129 if (var->varlevelsup == 0 && var->varno == 1 &&
1133 * ??? the following may be a problem when the node is multiply
1134 * referenced though stringToNode() doesn't create such a node
1137 Assert(newattno[var->varattno - 1] > 0);
1138 var->varattno = newattno[var->varattno - 1];
1142 return expression_tree_walker(node, change_varattnos_walker,
1147 change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
1149 return change_varattnos_walker(node, newattno);
1153 * StoreCatalogInheritance
1154 * Updates the system catalogs with proper inheritance information.
1156 * supers is a list of the OIDs of the new relation's direct ancestors.
1159 StoreCatalogInheritance(Oid relationId, List *supers)
1170 AssertArg(OidIsValid(relationId));
1176 * Store INHERITS information in pg_inherits using direct ancestors only.
1177 * Also enter dependencies on the direct ancestors, and make sure they are
1178 * marked with relhassubclass = true.
1180 * (Once upon a time, both direct and indirect ancestors were found here
1181 * and then entered into pg_ipl. Since that catalog doesn't exist
1182 * anymore, there's no need to look for indirect ancestors.)
1184 relation = heap_open(InheritsRelationId, RowExclusiveLock);
1185 desc = RelationGetDescr(relation);
1188 foreach(entry, supers)
1190 Oid parentOid = lfirst_oid(entry);
1191 Datum datum[Natts_pg_inherits];
1192 char nullarr[Natts_pg_inherits];
1193 ObjectAddress childobject,
1196 datum[0] = ObjectIdGetDatum(relationId); /* inhrel */
1197 datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
1198 datum[2] = Int16GetDatum(seqNumber); /* inhseqno */
1204 tuple = heap_formtuple(desc, datum, nullarr);
1206 simple_heap_insert(relation, tuple);
1208 CatalogUpdateIndexes(relation, tuple);
1210 heap_freetuple(tuple);
1213 * Store a dependency too
1215 parentobject.classId = RelationRelationId;
1216 parentobject.objectId = parentOid;
1217 parentobject.objectSubId = 0;
1218 childobject.classId = RelationRelationId;
1219 childobject.objectId = relationId;
1220 childobject.objectSubId = 0;
1222 recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
1225 * Mark the parent as having subclasses.
1227 setRelhassubclassInRelation(parentOid, true);
1232 heap_close(relation, RowExclusiveLock);
1236 * Look for an existing schema entry with the given name.
1238 * Returns the index (starting with 1) if attribute already exists in schema,
1242 findAttrByName(const char *attributeName, List *schema)
1249 ColumnDef *def = lfirst(s);
1251 if (strcmp(attributeName, def->colname) == 0)
1260 * Update a relation's pg_class.relhassubclass entry to the given value
1263 setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
1265 Relation relationRelation;
1267 Form_pg_class classtuple;
1270 * Fetch a modifiable copy of the tuple, modify it, update pg_class.
1272 * If the tuple already has the right relhassubclass setting, we don't
1273 * need to update it, but we still need to issue an SI inval message.
1275 relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
1276 tuple = SearchSysCacheCopy(RELOID,
1277 ObjectIdGetDatum(relationId),
1279 if (!HeapTupleIsValid(tuple))
1280 elog(ERROR, "cache lookup failed for relation %u", relationId);
1281 classtuple = (Form_pg_class) GETSTRUCT(tuple);
1283 if (classtuple->relhassubclass != relhassubclass)
1285 classtuple->relhassubclass = relhassubclass;
1286 simple_heap_update(relationRelation, &tuple->t_self, tuple);
1288 /* keep the catalog indexes up to date */
1289 CatalogUpdateIndexes(relationRelation, tuple);
1293 /* no need to change tuple, but force relcache rebuild anyway */
1294 CacheInvalidateRelcacheByTuple(tuple);
1297 heap_freetuple(tuple);
1298 heap_close(relationRelation, RowExclusiveLock);
1303 * renameatt - changes the name of a attribute in a relation
1305 * Attname attribute is changed in attribute catalog.
1306 * No record of the previous attname is kept (correct?).
1308 * get proper relrelation from relation catalog (if not arg)
1309 * scan attribute catalog
1310 * for name conflict (within rel)
1311 * for original attribute (if not arg)
1312 * modify attname in attribute tuple
1313 * insert modified attribute in attribute catalog
1314 * delete original attribute from attribute catalog
1317 renameatt(Oid myrelid,
1318 const char *oldattname,
1319 const char *newattname,
1323 Relation targetrelation;
1324 Relation attrelation;
1326 Form_pg_attribute attform;
1329 ListCell *indexoidscan;
1332 * Grab an exclusive lock on the target table, which we will NOT release
1333 * until end of transaction.
1335 targetrelation = relation_open(myrelid, AccessExclusiveLock);
1338 * permissions checking. this would normally be done in utility.c, but
1339 * this particular routine is recursive.
1341 * normally, only the owner of a class can change its schema.
1343 if (!pg_class_ownercheck(myrelid, GetUserId()))
1344 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1345 RelationGetRelationName(targetrelation));
1346 if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1348 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1349 errmsg("permission denied: \"%s\" is a system catalog",
1350 RelationGetRelationName(targetrelation))));
1353 * if the 'recurse' flag is set then we are supposed to rename this
1354 * attribute in all classes that inherit from 'relname' (as well as in
1357 * any permissions or problems with duplicate attributes will cause the
1358 * whole transaction to abort, which is what we want -- all or nothing.
1365 /* this routine is actually in the planner */
1366 children = find_all_inheritors(myrelid);
1369 * find_all_inheritors does the recursive search of the inheritance
1370 * hierarchy, so all we have to do is process all of the relids in the
1371 * list that it returns.
1373 foreach(child, children)
1375 Oid childrelid = lfirst_oid(child);
1377 if (childrelid == myrelid)
1379 /* note we need not recurse again */
1380 renameatt(childrelid, oldattname, newattname, false, true);
1386 * If we are told not to recurse, there had better not be any child
1387 * tables; else the rename would put them out of step.
1390 find_inheritance_children(myrelid) != NIL)
1392 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1393 errmsg("inherited column \"%s\" must be renamed in child tables too",
1397 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
1399 atttup = SearchSysCacheCopyAttName(myrelid, oldattname);
1400 if (!HeapTupleIsValid(atttup))
1402 (errcode(ERRCODE_UNDEFINED_COLUMN),
1403 errmsg("column \"%s\" does not exist",
1405 attform = (Form_pg_attribute) GETSTRUCT(atttup);
1407 attnum = attform->attnum;
1410 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1411 errmsg("cannot rename system column \"%s\"",
1415 * if the attribute is inherited, forbid the renaming, unless we are
1416 * already inside a recursive rename.
1418 if (attform->attinhcount > 0 && !recursing)
1420 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1421 errmsg("cannot rename inherited column \"%s\"",
1424 /* should not already exist */
1425 /* this test is deliberately not attisdropped-aware */
1426 if (SearchSysCacheExists(ATTNAME,
1427 ObjectIdGetDatum(myrelid),
1428 PointerGetDatum(newattname),
1431 (errcode(ERRCODE_DUPLICATE_COLUMN),
1432 errmsg("column \"%s\" of relation \"%s\" already exists",
1433 newattname, RelationGetRelationName(targetrelation))));
1435 namestrcpy(&(attform->attname), newattname);
1437 simple_heap_update(attrelation, &atttup->t_self, atttup);
1439 /* keep system catalog indexes current */
1440 CatalogUpdateIndexes(attrelation, atttup);
1442 heap_freetuple(atttup);
1445 * Update column names of indexes that refer to the column being renamed.
1447 indexoidlist = RelationGetIndexList(targetrelation);
1449 foreach(indexoidscan, indexoidlist)
1451 Oid indexoid = lfirst_oid(indexoidscan);
1453 Form_pg_index indexform;
1457 * Scan through index columns to see if there's any simple index
1458 * entries for this attribute. We ignore expressional entries.
1460 indextup = SearchSysCache(INDEXRELID,
1461 ObjectIdGetDatum(indexoid),
1463 if (!HeapTupleIsValid(indextup))
1464 elog(ERROR, "cache lookup failed for index %u", indexoid);
1465 indexform = (Form_pg_index) GETSTRUCT(indextup);
1467 for (i = 0; i < indexform->indnatts; i++)
1469 if (attnum != indexform->indkey.values[i])
1473 * Found one, rename it.
1475 atttup = SearchSysCacheCopy(ATTNUM,
1476 ObjectIdGetDatum(indexoid),
1477 Int16GetDatum(i + 1),
1479 if (!HeapTupleIsValid(atttup))
1480 continue; /* should we raise an error? */
1483 * Update the (copied) attribute tuple.
1485 namestrcpy(&(((Form_pg_attribute) GETSTRUCT(atttup))->attname),
1488 simple_heap_update(attrelation, &atttup->t_self, atttup);
1490 /* keep system catalog indexes current */
1491 CatalogUpdateIndexes(attrelation, atttup);
1493 heap_freetuple(atttup);
1496 ReleaseSysCache(indextup);
1499 list_free(indexoidlist);
1501 heap_close(attrelation, RowExclusiveLock);
1504 * Update att name in any RI triggers associated with the relation.
1506 if (targetrelation->rd_rel->reltriggers > 0)
1508 /* update tgargs column reference where att is primary key */
1509 update_ri_trigger_args(RelationGetRelid(targetrelation),
1510 oldattname, newattname,
1512 /* update tgargs column reference where att is foreign key */
1513 update_ri_trigger_args(RelationGetRelid(targetrelation),
1514 oldattname, newattname,
1518 relation_close(targetrelation, NoLock); /* close rel but keep lock */
1522 * renamerel - change the name of a relation
1524 * XXX - When renaming sequences, we don't bother to modify the
1525 * sequence name that is stored within the sequence itself
1526 * (this would cause problems with MVCC). In the future,
1527 * the sequence name should probably be removed from the
1528 * sequence, AFAIK there's no need for it to be there.
1531 renamerel(Oid myrelid, const char *newrelname)
1533 Relation targetrelation;
1534 Relation relrelation; /* for RELATION relation */
1539 bool relhastriggers;
1542 * Grab an exclusive lock on the target table or index, which we will NOT
1543 * release until end of transaction.
1545 targetrelation = relation_open(myrelid, AccessExclusiveLock);
1547 oldrelname = pstrdup(RelationGetRelationName(targetrelation));
1548 namespaceId = RelationGetNamespace(targetrelation);
1550 if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1552 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1553 errmsg("permission denied: \"%s\" is a system catalog",
1554 RelationGetRelationName(targetrelation))));
1556 relkind = targetrelation->rd_rel->relkind;
1557 relhastriggers = (targetrelation->rd_rel->reltriggers > 0);
1560 * Find relation's pg_class tuple, and make sure newrelname isn't in use.
1562 relrelation = heap_open(RelationRelationId, RowExclusiveLock);
1564 reltup = SearchSysCacheCopy(RELOID,
1565 PointerGetDatum(myrelid),
1567 if (!HeapTupleIsValid(reltup)) /* shouldn't happen */
1568 elog(ERROR, "cache lookup failed for relation %u", myrelid);
1570 if (get_relname_relid(newrelname, namespaceId) != InvalidOid)
1572 (errcode(ERRCODE_DUPLICATE_TABLE),
1573 errmsg("relation \"%s\" already exists",
1577 * Update pg_class tuple with new relname. (Scribbling on reltup is OK
1578 * because it's a copy...)
1580 namestrcpy(&(((Form_pg_class) GETSTRUCT(reltup))->relname), newrelname);
1582 simple_heap_update(relrelation, &reltup->t_self, reltup);
1584 /* keep the system catalog indexes current */
1585 CatalogUpdateIndexes(relrelation, reltup);
1587 heap_freetuple(reltup);
1588 heap_close(relrelation, RowExclusiveLock);
1591 * Also rename the associated type, if any.
1593 if (relkind != RELKIND_INDEX)
1594 TypeRename(oldrelname, namespaceId, newrelname);
1597 * Update rel name in any RI triggers associated with the relation.
1601 /* update tgargs where relname is primary key */
1602 update_ri_trigger_args(myrelid,
1606 /* update tgargs where relname is foreign key */
1607 update_ri_trigger_args(myrelid,
1614 * Close rel, but keep exclusive lock!
1616 relation_close(targetrelation, NoLock);
1620 * Scan pg_trigger for RI triggers that are on the specified relation
1621 * (if fk_scan is false) or have it as the tgconstrrel (if fk_scan
1622 * is true). Update RI trigger args fields matching oldname to contain
1623 * newname instead. If update_relname is true, examine the relname
1624 * fields; otherwise examine the attname fields.
1627 update_ri_trigger_args(Oid relid,
1628 const char *oldname,
1629 const char *newname,
1631 bool update_relname)
1634 ScanKeyData skey[1];
1635 SysScanDesc trigscan;
1637 Datum values[Natts_pg_trigger];
1638 char nulls[Natts_pg_trigger];
1639 char replaces[Natts_pg_trigger];
1641 tgrel = heap_open(TriggerRelationId, RowExclusiveLock);
1644 ScanKeyInit(&skey[0],
1645 Anum_pg_trigger_tgconstrrelid,
1646 BTEqualStrategyNumber, F_OIDEQ,
1647 ObjectIdGetDatum(relid));
1648 trigscan = systable_beginscan(tgrel, TriggerConstrRelidIndexId,
1654 ScanKeyInit(&skey[0],
1655 Anum_pg_trigger_tgrelid,
1656 BTEqualStrategyNumber, F_OIDEQ,
1657 ObjectIdGetDatum(relid));
1658 trigscan = systable_beginscan(tgrel, TriggerRelidNameIndexId,
1663 while ((tuple = systable_getnext(trigscan)) != NULL)
1665 Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1675 const char *arga[RI_MAX_ARGUMENTS];
1678 tg_type = RI_FKey_trigger_type(pg_trigger->tgfoid);
1679 if (tg_type == RI_TRIGGER_NONE)
1681 /* Not an RI trigger, forget it */
1686 * It is an RI trigger, so parse the tgargs bytea.
1688 * NB: we assume the field will never be compressed or moved out of
1689 * line; so does trigger.c ...
1691 tgnargs = pg_trigger->tgnargs;
1693 DatumGetPointer(fastgetattr(tuple,
1694 Anum_pg_trigger_tgargs,
1695 tgrel->rd_att, &isnull));
1696 if (isnull || tgnargs < RI_FIRST_ATTNAME_ARGNO ||
1697 tgnargs > RI_MAX_ARGUMENTS)
1699 /* This probably shouldn't happen, but ignore busted triggers */
1702 argp = (const char *) VARDATA(val);
1703 for (i = 0; i < tgnargs; i++)
1706 argp += strlen(argp) + 1;
1710 * Figure out which item(s) to look at. If the trigger is primary-key
1711 * type and attached to my rel, I should look at the PK fields; if it
1712 * is foreign-key type and attached to my rel, I should look at the FK
1713 * fields. But the opposite rule holds when examining triggers found
1714 * by tgconstrrel search.
1716 examine_pk = (tg_type == RI_TRIGGER_PK) == (!fk_scan);
1721 /* Change the relname if needed */
1722 i = examine_pk ? RI_PK_RELNAME_ARGNO : RI_FK_RELNAME_ARGNO;
1723 if (strcmp(arga[i], oldname) == 0)
1731 /* Change attname(s) if needed */
1732 i = examine_pk ? RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX :
1733 RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_FK_IDX;
1734 for (; i < tgnargs; i += 2)
1736 if (strcmp(arga[i], oldname) == 0)
1746 /* Don't need to update this tuple */
1751 * Construct modified tgargs bytea.
1754 for (i = 0; i < tgnargs; i++)
1755 newlen += strlen(arga[i]) + 1;
1756 newtgargs = (bytea *) palloc(newlen);
1757 VARATT_SIZEP(newtgargs) = newlen;
1759 for (i = 0; i < tgnargs; i++)
1761 strcpy(((char *) newtgargs) + newlen, arga[i]);
1762 newlen += strlen(arga[i]) + 1;
1766 * Build modified tuple.
1768 for (i = 0; i < Natts_pg_trigger; i++)
1770 values[i] = (Datum) 0;
1774 values[Anum_pg_trigger_tgargs - 1] = PointerGetDatum(newtgargs);
1775 replaces[Anum_pg_trigger_tgargs - 1] = 'r';
1777 tuple = heap_modifytuple(tuple, RelationGetDescr(tgrel), values, nulls, replaces);
1780 * Update pg_trigger and its indexes
1782 simple_heap_update(tgrel, &tuple->t_self, tuple);
1784 CatalogUpdateIndexes(tgrel, tuple);
1787 * Invalidate trigger's relation's relcache entry so that other
1788 * backends (and this one too!) are sent SI message to make them
1789 * rebuild relcache entries. (Ideally this should happen
1792 * We can skip this for triggers on relid itself, since that relcache
1793 * flush will happen anyway due to the table or column rename. We
1794 * just need to catch the far ends of RI relationships.
1796 pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1797 if (pg_trigger->tgrelid != relid)
1798 CacheInvalidateRelcacheByRelid(pg_trigger->tgrelid);
1800 /* free up our scratch memory */
1802 heap_freetuple(tuple);
1805 systable_endscan(trigscan);
1807 heap_close(tgrel, RowExclusiveLock);
1810 * Increment cmd counter to make updates visible; this is needed in case
1811 * the same tuple has to be updated again by next pass (can happen in case
1812 * of a self-referential FK relationship).
1814 CommandCounterIncrement();
1819 * Execute ALTER TABLE, which can be a list of subcommands
1821 * ALTER TABLE is performed in three phases:
1822 * 1. Examine subcommands and perform pre-transformation checking.
1823 * 2. Update system catalogs.
1824 * 3. Scan table(s) to check new constraints, and optionally recopy
1825 * the data into new table(s).
1826 * Phase 3 is not performed unless one or more of the subcommands requires
1827 * it. The intention of this design is to allow multiple independent
1828 * updates of the table schema to be performed with only one pass over the
1831 * ATPrepCmd performs phase 1. A "work queue" entry is created for
1832 * each table to be affected (there may be multiple affected tables if the
1833 * commands traverse a table inheritance hierarchy). Also we do preliminary
1834 * validation of the subcommands, including parse transformation of those
1835 * expressions that need to be evaluated with respect to the old table
1838 * ATRewriteCatalogs performs phase 2 for each affected table (note that
1839 * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
1840 * Certain subcommands need to be performed before others to avoid
1841 * unnecessary conflicts; for example, DROP COLUMN should come before
1842 * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple
1843 * lists, one for each logical "pass" of phase 2.
1845 * ATRewriteTables performs phase 3 for those tables that need it.
1847 * Thanks to the magic of MVCC, an error anywhere along the way rolls back
1848 * the whole operation; we don't have to do anything special to clean up.
1851 AlterTable(AlterTableStmt *stmt)
1853 ATController(relation_openrv(stmt->relation, AccessExclusiveLock),
1855 interpretInhOption(stmt->relation->inhOpt));
1859 * AlterTableInternal
1861 * ALTER TABLE with target specified by OID
1864 AlterTableInternal(Oid relid, List *cmds, bool recurse)
1866 ATController(relation_open(relid, AccessExclusiveLock),
1872 ATController(Relation rel, List *cmds, bool recurse)
1877 /* Phase 1: preliminary examination of commands, create work queue */
1880 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
1882 ATPrepCmd(&wqueue, rel, cmd, recurse, false);
1885 /* Close the relation, but keep lock until commit */
1886 relation_close(rel, NoLock);
1888 /* Phase 2: update system catalogs */
1889 ATRewriteCatalogs(&wqueue);
1891 /* Phase 3: scan/rewrite tables as needed */
1892 ATRewriteTables(&wqueue);
1898 * Traffic cop for ALTER TABLE Phase 1 operations, including simple
1899 * recursion and permission checks.
1901 * Caller must have acquired AccessExclusiveLock on relation already.
1902 * This lock should be held until commit.
1905 ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
1906 bool recurse, bool recursing)
1908 AlteredTableInfo *tab;
1911 /* Find or create work queue entry for this table */
1912 tab = ATGetQueueEntry(wqueue, rel);
1915 * Copy the original subcommand for each table. This avoids conflicts
1916 * when different child tables need to make different parse
1917 * transformations (for example, the same column may have different column
1918 * numbers in different children).
1920 cmd = copyObject(cmd);
1923 * Do permissions checking, recursion to child tables if needed, and any
1924 * additional phase-1 processing needed.
1926 switch (cmd->subtype)
1928 case AT_AddColumn: /* ADD COLUMN */
1929 ATSimplePermissions(rel, false);
1930 /* Performs own recursion */
1931 ATPrepAddColumn(wqueue, rel, recurse, cmd);
1932 pass = AT_PASS_ADD_COL;
1934 case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
1937 * We allow defaults on views so that INSERT into a view can have
1938 * default-ish behavior. This works because the rewriter
1939 * substitutes default values into INSERTs before it expands
1942 ATSimplePermissions(rel, true);
1943 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1944 /* No command-specific prep needed */
1945 pass = AT_PASS_ADD_CONSTR;
1947 case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
1948 ATSimplePermissions(rel, false);
1949 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1950 /* No command-specific prep needed */
1951 pass = AT_PASS_DROP;
1953 case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
1954 ATSimplePermissions(rel, false);
1955 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1956 /* No command-specific prep needed */
1957 pass = AT_PASS_ADD_CONSTR;
1959 case AT_SetStatistics: /* ALTER COLUMN STATISTICS */
1960 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1961 /* Performs own permission checks */
1962 ATPrepSetStatistics(rel, cmd->name, cmd->def);
1963 pass = AT_PASS_COL_ATTRS;
1965 case AT_SetStorage: /* ALTER COLUMN STORAGE */
1966 ATSimplePermissions(rel, false);
1967 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1968 /* No command-specific prep needed */
1969 pass = AT_PASS_COL_ATTRS;
1971 case AT_DropColumn: /* DROP COLUMN */
1972 ATSimplePermissions(rel, false);
1973 /* Recursion occurs during execution phase */
1974 /* No command-specific prep needed except saving recurse flag */
1976 cmd->subtype = AT_DropColumnRecurse;
1977 pass = AT_PASS_DROP;
1979 case AT_AddIndex: /* ADD INDEX */
1980 ATSimplePermissions(rel, false);
1981 /* This command never recurses */
1982 /* No command-specific prep needed */
1983 pass = AT_PASS_ADD_INDEX;
1985 case AT_AddConstraint: /* ADD CONSTRAINT */
1986 ATSimplePermissions(rel, false);
1989 * Currently we recurse only for CHECK constraints, never for
1990 * foreign-key constraints. UNIQUE/PKEY constraints won't be seen
1993 if (IsA(cmd->def, Constraint))
1994 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1995 /* No command-specific prep needed */
1996 pass = AT_PASS_ADD_CONSTR;
1998 case AT_DropConstraint: /* DROP CONSTRAINT */
1999 ATSimplePermissions(rel, false);
2000 /* Performs own recursion */
2001 ATPrepDropConstraint(wqueue, rel, recurse, cmd);
2002 pass = AT_PASS_DROP;
2004 case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
2005 ATSimplePermissions(rel, false);
2006 ATSimpleRecursion(wqueue, rel, cmd, recurse);
2007 /* No command-specific prep needed */
2008 pass = AT_PASS_DROP;
2010 case AT_AlterColumnType: /* ALTER COLUMN TYPE */
2011 ATSimplePermissions(rel, false);
2012 /* Performs own recursion */
2013 ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
2014 pass = AT_PASS_ALTER_TYPE;
2016 case AT_ToastTable: /* CREATE TOAST TABLE */
2017 ATSimplePermissions(rel, false);
2018 /* This command never recurses */
2019 /* No command-specific prep needed */
2020 pass = AT_PASS_MISC;
2022 case AT_ChangeOwner: /* ALTER OWNER */
2023 /* This command never recurses */
2024 /* No command-specific prep needed */
2025 pass = AT_PASS_MISC;
2027 case AT_ClusterOn: /* CLUSTER ON */
2028 case AT_DropCluster: /* SET WITHOUT CLUSTER */
2029 ATSimplePermissions(rel, false);
2030 /* These commands never recurse */
2031 /* No command-specific prep needed */
2032 pass = AT_PASS_MISC;
2034 case AT_DropOids: /* SET WITHOUT OIDS */
2035 ATSimplePermissions(rel, false);
2036 /* Performs own recursion */
2037 if (rel->rd_rel->relhasoids)
2039 AlterTableCmd *dropCmd = makeNode(AlterTableCmd);
2041 dropCmd->subtype = AT_DropColumn;
2042 dropCmd->name = pstrdup("oid");
2043 dropCmd->behavior = cmd->behavior;
2044 ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
2046 pass = AT_PASS_DROP;
2048 case AT_SetTableSpace: /* SET TABLESPACE */
2049 /* This command never recurses */
2050 ATPrepSetTableSpace(tab, rel, cmd->name);
2051 pass = AT_PASS_MISC; /* doesn't actually matter */
2053 case AT_EnableTrig: /* ENABLE TRIGGER variants */
2054 case AT_EnableTrigAll:
2055 case AT_EnableTrigUser:
2056 case AT_DisableTrig: /* DISABLE TRIGGER variants */
2057 case AT_DisableTrigAll:
2058 case AT_DisableTrigUser:
2059 ATSimplePermissions(rel, false);
2060 /* These commands never recurse */
2061 /* No command-specific prep needed */
2062 pass = AT_PASS_MISC;
2065 elog(ERROR, "unrecognized alter table type: %d",
2066 (int) cmd->subtype);
2067 pass = 0; /* keep compiler quiet */
2071 /* Add the subcommand to the appropriate list for phase 2 */
2072 tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd);
2078 * Traffic cop for ALTER TABLE Phase 2 operations. Subcommands are
2079 * dispatched in a "safe" execution order (designed to avoid unnecessary
2083 ATRewriteCatalogs(List **wqueue)
2089 * We process all the tables "in parallel", one pass at a time. This is
2090 * needed because we may have to propagate work from one table to another
2091 * (specifically, ALTER TYPE on a foreign key's PK has to dispatch the
2092 * re-adding of the foreign key constraint to the other table). Work can
2093 * only be propagated into later passes, however.
2095 for (pass = 0; pass < AT_NUM_PASSES; pass++)
2097 /* Go through each table that needs to be processed */
2098 foreach(ltab, *wqueue)
2100 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2101 List *subcmds = tab->subcmds[pass];
2109 * Exclusive lock was obtained by phase 1, needn't get it again
2111 rel = relation_open(tab->relid, NoLock);
2113 foreach(lcmd, subcmds)
2114 ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
2117 * After the ALTER TYPE pass, do cleanup work (this is not done in
2118 * ATExecAlterColumnType since it should be done only once if
2119 * multiple columns of a table are altered).
2121 if (pass == AT_PASS_ALTER_TYPE)
2122 ATPostAlterTypeCleanup(wqueue, tab);
2124 relation_close(rel, NoLock);
2129 * Do an implicit CREATE TOAST TABLE if we executed any subcommands that
2130 * might have added a column or changed column storage.
2132 foreach(ltab, *wqueue)
2134 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2136 if (tab->relkind == RELKIND_RELATION &&
2137 (tab->subcmds[AT_PASS_ADD_COL] ||
2138 tab->subcmds[AT_PASS_ALTER_TYPE] ||
2139 tab->subcmds[AT_PASS_COL_ATTRS]))
2140 AlterTableCreateToastTable(tab->relid, true);
2145 * ATExecCmd: dispatch a subcommand to appropriate execution routine
2148 ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
2150 switch (cmd->subtype)
2152 case AT_AddColumn: /* ADD COLUMN */
2153 ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
2155 case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
2156 ATExecColumnDefault(rel, cmd->name, cmd->def);
2158 case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
2159 ATExecDropNotNull(rel, cmd->name);
2161 case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
2162 ATExecSetNotNull(tab, rel, cmd->name);
2164 case AT_SetStatistics: /* ALTER COLUMN STATISTICS */
2165 ATExecSetStatistics(rel, cmd->name, cmd->def);
2167 case AT_SetStorage: /* ALTER COLUMN STORAGE */
2168 ATExecSetStorage(rel, cmd->name, cmd->def);
2170 case AT_DropColumn: /* DROP COLUMN */
2171 ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
2173 case AT_DropColumnRecurse: /* DROP COLUMN with recursion */
2174 ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
2176 case AT_AddIndex: /* ADD INDEX */
2177 ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
2179 case AT_ReAddIndex: /* ADD INDEX */
2180 ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
2182 case AT_AddConstraint: /* ADD CONSTRAINT */
2183 ATExecAddConstraint(tab, rel, cmd->def);
2185 case AT_DropConstraint: /* DROP CONSTRAINT */
2186 ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
2188 case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
2189 ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
2191 case AT_AlterColumnType: /* ALTER COLUMN TYPE */
2192 ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
2194 case AT_ToastTable: /* CREATE TOAST TABLE */
2195 AlterTableCreateToastTable(RelationGetRelid(rel), false);
2197 case AT_ChangeOwner: /* ALTER OWNER */
2198 ATExecChangeOwner(RelationGetRelid(rel),
2199 get_roleid_checked(cmd->name),
2202 case AT_ClusterOn: /* CLUSTER ON */
2203 ATExecClusterOn(rel, cmd->name);
2205 case AT_DropCluster: /* SET WITHOUT CLUSTER */
2206 ATExecDropCluster(rel);
2208 case AT_DropOids: /* SET WITHOUT OIDS */
2211 * Nothing to do here; we'll have generated a DropColumn
2212 * subcommand to do the real work
2215 case AT_SetTableSpace: /* SET TABLESPACE */
2218 * Nothing to do here; Phase 3 does the work
2221 case AT_EnableTrig: /* ENABLE TRIGGER name */
2222 ATExecEnableDisableTrigger(rel, cmd->name, true, false);
2224 case AT_DisableTrig: /* DISABLE TRIGGER name */
2225 ATExecEnableDisableTrigger(rel, cmd->name, false, false);
2227 case AT_EnableTrigAll: /* ENABLE TRIGGER ALL */
2228 ATExecEnableDisableTrigger(rel, NULL, true, false);
2230 case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */
2231 ATExecEnableDisableTrigger(rel, NULL, false, false);
2233 case AT_EnableTrigUser: /* ENABLE TRIGGER USER */
2234 ATExecEnableDisableTrigger(rel, NULL, true, true);
2236 case AT_DisableTrigUser: /* DISABLE TRIGGER USER */
2237 ATExecEnableDisableTrigger(rel, NULL, false, true);
2240 elog(ERROR, "unrecognized alter table type: %d",
2241 (int) cmd->subtype);
2246 * Bump the command counter to ensure the next subcommand in the sequence
2247 * can see the changes so far
2249 CommandCounterIncrement();
2253 * ATRewriteTables: ALTER TABLE phase 3
2256 ATRewriteTables(List **wqueue)
2260 /* Go through each table that needs to be checked or rewritten */
2261 foreach(ltab, *wqueue)
2263 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2266 * We only need to rewrite the table if at least one column needs to
2269 if (tab->newvals != NIL)
2271 /* Build a temporary relation and copy data */
2273 char NewHeapName[NAMEDATALEN];
2276 ObjectAddress object;
2278 OldHeap = heap_open(tab->relid, NoLock);
2281 * We can never allow rewriting of shared or nailed-in-cache
2282 * relations, because we can't support changing their relfilenode
2285 if (OldHeap->rd_rel->relisshared || OldHeap->rd_isnailed)
2287 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2288 errmsg("cannot rewrite system relation \"%s\"",
2289 RelationGetRelationName(OldHeap))));
2292 * Don't allow rewrite on temp tables of other backends ... their
2293 * local buffer manager is not going to cope.
2295 if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
2297 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2298 errmsg("cannot rewrite temporary tables of other sessions")));
2301 * Select destination tablespace (same as original unless user
2302 * requested a change)
2304 if (tab->newTableSpace)
2305 NewTableSpace = tab->newTableSpace;
2307 NewTableSpace = OldHeap->rd_rel->reltablespace;
2309 heap_close(OldHeap, NoLock);
2312 * Create the new heap, using a temporary name in the same
2313 * namespace as the existing table. NOTE: there is some risk of
2314 * collision with user relnames. Working around this seems more
2315 * trouble than it's worth; in particular, we can't create the new
2316 * heap in a different namespace from the old, or we will have
2317 * problems with the TEMP status of temp tables.
2319 snprintf(NewHeapName, sizeof(NewHeapName),
2320 "pg_temp_%u", tab->relid);
2322 OIDNewHeap = make_new_heap(tab->relid, NewHeapName, NewTableSpace);
2325 * Copy the heap data into the new table with the desired
2326 * modifications, and test the current data within the table
2327 * against new constraints generated by ALTER TABLE commands.
2329 ATRewriteTable(tab, OIDNewHeap);
2331 /* Swap the physical files of the old and new heaps. */
2332 swap_relation_files(tab->relid, OIDNewHeap);
2334 CommandCounterIncrement();
2336 /* Destroy new heap with old filenode */
2337 object.classId = RelationRelationId;
2338 object.objectId = OIDNewHeap;
2339 object.objectSubId = 0;
2342 * The new relation is local to our transaction and we know
2343 * nothing depends on it, so DROP_RESTRICT should be OK.
2345 performDeletion(&object, DROP_RESTRICT);
2346 /* performDeletion does CommandCounterIncrement at end */
2349 * Rebuild each index on the relation (but not the toast table,
2350 * which is all-new anyway). We do not need
2351 * CommandCounterIncrement() because reindex_relation does it.
2353 reindex_relation(tab->relid, false);
2358 * Test the current data within the table against new constraints
2359 * generated by ALTER TABLE commands, but don't rebuild data.
2361 if (tab->constraints != NIL)
2362 ATRewriteTable(tab, InvalidOid);
2365 * If we had SET TABLESPACE but no reason to reconstruct tuples,
2366 * just do a block-by-block copy.
2368 if (tab->newTableSpace)
2369 ATExecSetTableSpace(tab->relid, tab->newTableSpace);
2374 * Foreign key constraints are checked in a final pass, since (a) it's
2375 * generally best to examine each one separately, and (b) it's at least
2376 * theoretically possible that we have changed both relations of the
2377 * foreign key, and we'd better have finished both rewrites before we try
2378 * to read the tables.
2380 foreach(ltab, *wqueue)
2382 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2383 Relation rel = NULL;
2386 foreach(lcon, tab->constraints)
2388 NewConstraint *con = lfirst(lcon);
2390 if (con->contype == CONSTR_FOREIGN)
2392 FkConstraint *fkconstraint = (FkConstraint *) con->qual;
2397 /* Long since locked, no need for another */
2398 rel = heap_open(tab->relid, NoLock);
2401 refrel = heap_open(con->refrelid, RowShareLock);
2403 validateForeignKeyConstraint(fkconstraint, rel, refrel);
2405 heap_close(refrel, NoLock);
2410 heap_close(rel, NoLock);
2415 * ATRewriteTable: scan or rewrite one table
2417 * OIDNewHeap is InvalidOid if we don't need to rewrite
2420 ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
2424 TupleDesc oldTupDesc;
2425 TupleDesc newTupDesc;
2426 bool needscan = false;
2432 * Open the relation(s). We have surely already locked the existing
2435 oldrel = heap_open(tab->relid, NoLock);
2436 oldTupDesc = tab->oldDesc;
2437 newTupDesc = RelationGetDescr(oldrel); /* includes all mods */
2439 if (OidIsValid(OIDNewHeap))
2440 newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
2445 * If we need to rewrite the table, the operation has to be propagated to
2446 * tables that use this table's rowtype as a column type.
2448 * (Eventually this will probably become true for scans as well, but at
2449 * the moment a composite type does not enforce any constraints, so it's
2450 * not necessary/appropriate to enforce them just during ALTER.)
2453 find_composite_type_dependencies(oldrel->rd_rel->reltype,
2454 RelationGetRelationName(oldrel));
2457 * Generate the constraint and default execution states
2460 estate = CreateExecutorState();
2462 /* Build the needed expression execution states */
2463 foreach(l, tab->constraints)
2465 NewConstraint *con = lfirst(l);
2467 switch (con->contype)
2471 con->qualstate = (List *)
2472 ExecPrepareExpr((Expr *) con->qual, estate);
2474 case CONSTR_FOREIGN:
2475 /* Nothing to do here */
2477 case CONSTR_NOTNULL:
2481 elog(ERROR, "unrecognized constraint type: %d",
2482 (int) con->contype);
2486 foreach(l, tab->newvals)
2488 NewColumnValue *ex = lfirst(l);
2492 ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
2497 ExprContext *econtext;
2500 TupleTableSlot *oldslot;
2501 TupleTableSlot *newslot;
2504 MemoryContext oldCxt;
2505 List *dropped_attrs = NIL;
2508 econtext = GetPerTupleExprContext(estate);
2511 * Make tuple slots for old and new tuples. Note that even when the
2512 * tuples are the same, the tupDescs might not be (consider ADD COLUMN
2513 * without a default).
2515 oldslot = MakeSingleTupleTableSlot(oldTupDesc);
2516 newslot = MakeSingleTupleTableSlot(newTupDesc);
2518 /* Preallocate values/isnull arrays */
2519 i = Max(newTupDesc->natts, oldTupDesc->natts);
2520 values = (Datum *) palloc(i * sizeof(Datum));
2521 isnull = (bool *) palloc(i * sizeof(bool));
2522 memset(values, 0, i * sizeof(Datum));
2523 memset(isnull, true, i * sizeof(bool));
2526 * Any attributes that are dropped according to the new tuple
2527 * descriptor can be set to NULL. We precompute the list of dropped
2528 * attributes to avoid needing to do so in the per-tuple loop.
2530 for (i = 0; i < newTupDesc->natts; i++)
2532 if (newTupDesc->attrs[i]->attisdropped)
2533 dropped_attrs = lappend_int(dropped_attrs, i);
2537 * Scan through the rows, generating a new row if needed and then
2538 * checking all the constraints.
2540 scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL);
2543 * Switch to per-tuple memory context and reset it for each tuple
2544 * produced, so we don't leak memory.
2546 oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2548 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2552 Oid tupOid = InvalidOid;
2554 /* Extract data from old tuple */
2555 heap_deform_tuple(tuple, oldTupDesc, values, isnull);
2556 if (oldTupDesc->tdhasoid)
2557 tupOid = HeapTupleGetOid(tuple);
2559 /* Set dropped attributes to null in new tuple */
2560 foreach(lc, dropped_attrs)
2561 isnull[lfirst_int(lc)] = true;
2564 * Process supplied expressions to replace selected columns.
2565 * Expression inputs come from the old tuple.
2567 ExecStoreTuple(tuple, oldslot, InvalidBuffer, false);
2568 econtext->ecxt_scantuple = oldslot;
2570 foreach(l, tab->newvals)
2572 NewColumnValue *ex = lfirst(l);
2574 values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate,
2576 &isnull[ex->attnum - 1],
2581 * Form the new tuple. Note that we don't explicitly pfree it,
2582 * since the per-tuple memory context will be reset shortly.
2584 tuple = heap_form_tuple(newTupDesc, values, isnull);
2586 /* Preserve OID, if any */
2587 if (newTupDesc->tdhasoid)
2588 HeapTupleSetOid(tuple, tupOid);
2591 /* Now check any constraints on the possibly-changed tuple */
2592 ExecStoreTuple(tuple, newslot, InvalidBuffer, false);
2593 econtext->ecxt_scantuple = newslot;
2595 foreach(l, tab->constraints)
2597 NewConstraint *con = lfirst(l);
2599 switch (con->contype)
2602 if (!ExecQual(con->qualstate, econtext, true))
2604 (errcode(ERRCODE_CHECK_VIOLATION),
2605 errmsg("check constraint \"%s\" is violated by some row",
2608 case CONSTR_NOTNULL:
2613 d = heap_getattr(tuple, con->attnum, newTupDesc,
2617 (errcode(ERRCODE_NOT_NULL_VIOLATION),
2618 errmsg("column \"%s\" contains null values",
2619 get_attname(tab->relid,
2623 case CONSTR_FOREIGN:
2624 /* Nothing to do here */
2627 elog(ERROR, "unrecognized constraint type: %d",
2628 (int) con->contype);
2632 /* Write the tuple out to the new relation */
2634 simple_heap_insert(newrel, tuple);
2636 ResetExprContext(econtext);
2638 CHECK_FOR_INTERRUPTS();
2641 MemoryContextSwitchTo(oldCxt);
2645 FreeExecutorState(estate);
2647 heap_close(oldrel, NoLock);
2649 heap_close(newrel, NoLock);
2653 * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue
2655 static AlteredTableInfo *
2656 ATGetQueueEntry(List **wqueue, Relation rel)
2658 Oid relid = RelationGetRelid(rel);
2659 AlteredTableInfo *tab;
2662 foreach(ltab, *wqueue)
2664 tab = (AlteredTableInfo *) lfirst(ltab);
2665 if (tab->relid == relid)
2670 * Not there, so add it. Note that we make a copy of the relation's
2671 * existing descriptor before anything interesting can happen to it.
2673 tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
2675 tab->relkind = rel->rd_rel->relkind;
2676 tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel));
2678 *wqueue = lappend(*wqueue, tab);
2684 * ATSimplePermissions
2686 * - Ensure that it is a relation (or possibly a view)
2687 * - Ensure this user is the owner
2688 * - Ensure that it is not a system table
2691 ATSimplePermissions(Relation rel, bool allowView)
2693 if (rel->rd_rel->relkind != RELKIND_RELATION)
2697 if (rel->rd_rel->relkind != RELKIND_VIEW)
2699 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2700 errmsg("\"%s\" is not a table or view",
2701 RelationGetRelationName(rel))));
2705 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2706 errmsg("\"%s\" is not a table",
2707 RelationGetRelationName(rel))));
2710 /* Permissions checks */
2711 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
2712 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
2713 RelationGetRelationName(rel));
2715 if (!allowSystemTableMods && IsSystemRelation(rel))
2717 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2718 errmsg("permission denied: \"%s\" is a system catalog",
2719 RelationGetRelationName(rel))));
2725 * Simple table recursion sufficient for most ALTER TABLE operations.
2726 * All direct and indirect children are processed in an unspecified order.
2727 * Note that if a child inherits from the original table via multiple
2728 * inheritance paths, it will be visited just once.
2731 ATSimpleRecursion(List **wqueue, Relation rel,
2732 AlterTableCmd *cmd, bool recurse)
2735 * Propagate to children if desired. Non-table relations never have
2736 * children, so no need to search in that case.
2738 if (recurse && rel->rd_rel->relkind == RELKIND_RELATION)
2740 Oid relid = RelationGetRelid(rel);
2744 /* this routine is actually in the planner */
2745 children = find_all_inheritors(relid);
2748 * find_all_inheritors does the recursive search of the inheritance
2749 * hierarchy, so all we have to do is process all of the relids in the
2750 * list that it returns.
2752 foreach(child, children)
2754 Oid childrelid = lfirst_oid(child);
2757 if (childrelid == relid)
2759 childrel = relation_open(childrelid, AccessExclusiveLock);
2760 ATPrepCmd(wqueue, childrel, cmd, false, true);
2761 relation_close(childrel, NoLock);
2767 * ATOneLevelRecursion
2769 * Here, we visit only direct inheritance children. It is expected that
2770 * the command's prep routine will recurse again to find indirect children.
2771 * When using this technique, a multiply-inheriting child will be visited
2775 ATOneLevelRecursion(List **wqueue, Relation rel,
2778 Oid relid = RelationGetRelid(rel);
2782 /* this routine is actually in the planner */
2783 children = find_inheritance_children(relid);
2785 foreach(child, children)
2787 Oid childrelid = lfirst_oid(child);
2790 childrel = relation_open(childrelid, AccessExclusiveLock);
2791 ATPrepCmd(wqueue, childrel, cmd, true, true);
2792 relation_close(childrel, NoLock);
2798 * find_composite_type_dependencies
2800 * Check to see if a table's rowtype is being used as a column in some
2801 * other table (possibly nested several levels deep in composite types!).
2802 * Eventually, we'd like to propagate the check or rewrite operation
2803 * into other such tables, but for now, just error out if we find any.
2805 * We assume that functions and views depending on the type are not reasons
2806 * to reject the ALTER. (How safe is this really?)
2809 find_composite_type_dependencies(Oid typeOid, const char *origTblName)
2813 SysScanDesc depScan;
2817 * We scan pg_depend to find those things that depend on the rowtype. (We
2818 * assume we can ignore refobjsubid for a rowtype.)
2820 depRel = heap_open(DependRelationId, AccessShareLock);
2822 ScanKeyInit(&key[0],
2823 Anum_pg_depend_refclassid,
2824 BTEqualStrategyNumber, F_OIDEQ,
2825 ObjectIdGetDatum(TypeRelationId));
2826 ScanKeyInit(&key[1],
2827 Anum_pg_depend_refobjid,
2828 BTEqualStrategyNumber, F_OIDEQ,
2829 ObjectIdGetDatum(typeOid));
2831 depScan = systable_beginscan(depRel, DependReferenceIndexId, true,
2832 SnapshotNow, 2, key);
2834 while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
2836 Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
2838 Form_pg_attribute att;
2840 /* Ignore dependees that aren't user columns of relations */
2841 /* (we assume system columns are never of rowtypes) */
2842 if (pg_depend->classid != RelationRelationId ||
2843 pg_depend->objsubid <= 0)
2846 rel = relation_open(pg_depend->objid, AccessShareLock);
2847 att = rel->rd_att->attrs[pg_depend->objsubid - 1];
2849 if (rel->rd_rel->relkind == RELKIND_RELATION)
2852 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2853 errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype",
2855 RelationGetRelationName(rel),
2856 NameStr(att->attname))));
2858 else if (OidIsValid(rel->rd_rel->reltype))
2861 * A view or composite type itself isn't a problem, but we must
2862 * recursively check for indirect dependencies via its rowtype.
2864 find_composite_type_dependencies(rel->rd_rel->reltype,
2868 relation_close(rel, AccessShareLock);
2871 systable_endscan(depScan);
2873 relation_close(depRel, AccessShareLock);
2878 * ALTER TABLE ADD COLUMN
2880 * Adds an additional attribute to a relation making the assumption that
2881 * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
2882 * AT_AddColumn AlterTableCmd by analyze.c and added as independent
2886 ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
2890 * Recurse to add the column to child classes, if requested.
2892 * We must recurse one level at a time, so that multiply-inheriting
2893 * children are visited the right number of times and end up with the
2894 * right attinhcount.
2898 AlterTableCmd *childCmd = copyObject(cmd);
2899 ColumnDef *colDefChild = (ColumnDef *) childCmd->def;
2901 /* Child should see column as singly inherited */
2902 colDefChild->inhcount = 1;
2903 colDefChild->is_local = false;
2904 /* and don't make a support dependency on the child */
2905 colDefChild->support = NULL;
2907 ATOneLevelRecursion(wqueue, rel, childCmd);
2912 * If we are told not to recurse, there had better not be any child
2913 * tables; else the addition would put them out of step.
2915 if (find_inheritance_children(RelationGetRelid(rel)) != NIL)
2917 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2918 errmsg("column must be added to child tables too")));
2923 ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
2926 Oid myrelid = RelationGetRelid(rel);
2930 HeapTuple attributeTuple;
2931 Form_pg_attribute attribute;
2932 FormData_pg_attribute attributeD;
2936 HeapTuple typeTuple;
2941 attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
2944 * Are we adding the column to a recursion child? If so, check whether to
2945 * merge with an existing definition for the column.
2947 if (colDef->inhcount > 0)
2951 /* Does child already have a column by this name? */
2952 tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname);
2953 if (HeapTupleIsValid(tuple))
2955 Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
2957 /* Okay if child matches by type */
2958 if (typenameTypeId(colDef->typename) != childatt->atttypid ||
2959 colDef->typename->typmod != childatt->atttypmod)
2961 (errcode(ERRCODE_DATATYPE_MISMATCH),
2962 errmsg("child table \"%s\" has different type for column \"%s\"",
2963 RelationGetRelationName(rel), colDef->colname)));
2965 /* Bump the existing child att's inhcount */
2966 childatt->attinhcount++;
2967 simple_heap_update(attrdesc, &tuple->t_self, tuple);
2968 CatalogUpdateIndexes(attrdesc, tuple);
2970 heap_freetuple(tuple);
2972 /* Inform the user about the merge */
2974 (errmsg("merging definition of column \"%s\" for child \"%s\"",
2975 colDef->colname, RelationGetRelationName(rel))));
2977 heap_close(attrdesc, RowExclusiveLock);
2982 pgclass = heap_open(RelationRelationId, RowExclusiveLock);
2984 reltup = SearchSysCacheCopy(RELOID,
2985 ObjectIdGetDatum(myrelid),
2987 if (!HeapTupleIsValid(reltup))
2988 elog(ERROR, "cache lookup failed for relation %u", myrelid);
2991 * this test is deliberately not attisdropped-aware, since if one tries to
2992 * add a column matching a dropped column name, it's gonna fail anyway.
2994 if (SearchSysCacheExists(ATTNAME,
2995 ObjectIdGetDatum(myrelid),
2996 PointerGetDatum(colDef->colname),
2999 (errcode(ERRCODE_DUPLICATE_COLUMN),
3000 errmsg("column \"%s\" of relation \"%s\" already exists",
3001 colDef->colname, RelationGetRelationName(rel))));
3003 minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts;
3004 maxatts = minattnum + 1;
3005 if (maxatts > MaxHeapAttributeNumber)
3007 (errcode(ERRCODE_TOO_MANY_COLUMNS),
3008 errmsg("tables can have at most %d columns",
3009 MaxHeapAttributeNumber)));
3012 typeTuple = typenameType(colDef->typename);
3013 tform = (Form_pg_type) GETSTRUCT(typeTuple);
3014 typeOid = HeapTupleGetOid(typeTuple);
3016 /* make sure datatype is legal for a column */
3017 CheckAttributeType(colDef->colname, typeOid);
3019 attributeTuple = heap_addheader(Natts_pg_attribute,
3021 ATTRIBUTE_TUPLE_SIZE,
3022 (void *) &attributeD);
3024 attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple);
3026 attribute->attrelid = myrelid;
3027 namestrcpy(&(attribute->attname), colDef->colname);
3028 attribute->atttypid = typeOid;
3029 attribute->attstattarget = -1;
3030 attribute->attlen = tform->typlen;
3031 attribute->attcacheoff = -1;
3032 attribute->atttypmod = colDef->typename->typmod;
3033 attribute->attnum = i;
3034 attribute->attbyval = tform->typbyval;
3035 attribute->attndims = list_length(colDef->typename->arrayBounds);
3036 attribute->attstorage = tform->typstorage;
3037 attribute->attalign = tform->typalign;
3038 attribute->attnotnull = colDef->is_not_null;
3039 attribute->atthasdef = false;
3040 attribute->attisdropped = false;
3041 attribute->attislocal = colDef->is_local;
3042 attribute->attinhcount = colDef->inhcount;
3044 ReleaseSysCache(typeTuple);
3046 simple_heap_insert(attrdesc, attributeTuple);
3048 /* Update indexes on pg_attribute */
3049 CatalogUpdateIndexes(attrdesc, attributeTuple);
3051 heap_close(attrdesc, RowExclusiveLock);
3054 * Update number of attributes in pg_class tuple
3056 ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts;
3058 simple_heap_update(pgclass, &reltup->t_self, reltup);
3060 /* keep catalog indexes current */
3061 CatalogUpdateIndexes(pgclass, reltup);
3063 heap_freetuple(reltup);
3065 heap_close(pgclass, RowExclusiveLock);
3067 /* Make the attribute's catalog entry visible */
3068 CommandCounterIncrement();
3071 * Store the DEFAULT, if any, in the catalogs
3073 if (colDef->raw_default)
3075 RawColumnDefault *rawEnt;
3077 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3078 rawEnt->attnum = attribute->attnum;
3079 rawEnt->raw_default = copyObject(colDef->raw_default);
3082 * This function is intended for CREATE TABLE, so it processes a
3083 * _list_ of defaults, but we just do one.
3085 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3087 /* Make the additional catalog changes visible */
3088 CommandCounterIncrement();
3092 * Tell Phase 3 to fill in the default expression, if there is one.
3094 * If there is no default, Phase 3 doesn't have to do anything, because
3095 * that effectively means that the default is NULL. The heap tuple access
3096 * routines always check for attnum > # of attributes in tuple, and return
3097 * NULL if so, so without any modification of the tuple data we will get
3098 * the effect of NULL values in the new column.
3100 * An exception occurs when the new column is of a domain type: the domain
3101 * might have a NOT NULL constraint, or a check constraint that indirectly
3102 * rejects nulls. If there are any domain constraints then we construct
3103 * an explicit NULL default value that will be passed through
3104 * CoerceToDomain processing. (This is a tad inefficient, since it causes
3105 * rewriting the table which we really don't have to do, but the present
3106 * design of domain processing doesn't offer any simple way of checking
3107 * the constraints more directly.)
3109 * Note: we use build_column_default, and not just the cooked default
3110 * returned by AddRelationRawConstraints, so that the right thing happens
3111 * when a datatype's default applies.
3113 defval = (Expr *) build_column_default(rel, attribute->attnum);
3115 if (!defval && GetDomainConstraints(typeOid) != NIL)
3117 Oid basetype = getBaseType(typeOid);
3119 defval = (Expr *) makeNullConst(basetype);
3120 defval = (Expr *) coerce_to_target_type(NULL,
3124 colDef->typename->typmod,
3125 COERCION_ASSIGNMENT,
3126 COERCE_IMPLICIT_CAST);
3127 if (defval == NULL) /* should not happen */
3128 elog(ERROR, "failed to coerce base type to domain");
3133 NewColumnValue *newval;
3135 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
3136 newval->attnum = attribute->attnum;
3137 newval->expr = defval;
3139 tab->newvals = lappend(tab->newvals, newval);
3143 * Add needed dependency entries for the new column.
3145 add_column_datatype_dependency(myrelid, i, attribute->atttypid);
3146 if (colDef->support != NULL)
3147 add_column_support_dependency(myrelid, i, colDef->support);
3151 * Install a column's dependency on its datatype.
3154 add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
3156 ObjectAddress myself,
3159 myself.classId = RelationRelationId;
3160 myself.objectId = relid;
3161 myself.objectSubId = attnum;
3162 referenced.classId = TypeRelationId;
3163 referenced.objectId = typid;
3164 referenced.objectSubId = 0;
3165 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3169 * Install a dependency for a column's supporting relation (serial sequence).
3172 add_column_support_dependency(Oid relid, int32 attnum, RangeVar *support)
3174 ObjectAddress colobject,
3177 colobject.classId = RelationRelationId;
3178 colobject.objectId = relid;
3179 colobject.objectSubId = attnum;
3180 suppobject.classId = RelationRelationId;
3181 suppobject.objectId = RangeVarGetRelid(support, false);
3182 suppobject.objectSubId = 0;
3183 recordDependencyOn(&suppobject, &colobject, DEPENDENCY_INTERNAL);
3187 * ALTER TABLE ALTER COLUMN DROP NOT NULL
3190 ATExecDropNotNull(Relation rel, const char *colName)
3196 ListCell *indexoidscan;
3199 * lookup the attribute
3201 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3203 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3205 if (!HeapTupleIsValid(tuple))
3207 (errcode(ERRCODE_UNDEFINED_COLUMN),
3208 errmsg("column \"%s\" of relation \"%s\" does not exist",
3209 colName, RelationGetRelationName(rel))));
3211 attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3213 /* Prevent them from altering a system attribute */
3216 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3217 errmsg("cannot alter system column \"%s\"",
3221 * Check that the attribute is not in a primary key
3224 /* Loop over all indexes on the relation */
3225 indexoidlist = RelationGetIndexList(rel);
3227 foreach(indexoidscan, indexoidlist)
3229 Oid indexoid = lfirst_oid(indexoidscan);
3230 HeapTuple indexTuple;
3231 Form_pg_index indexStruct;
3234 indexTuple = SearchSysCache(INDEXRELID,
3235 ObjectIdGetDatum(indexoid),
3237 if (!HeapTupleIsValid(indexTuple))
3238 elog(ERROR, "cache lookup failed for index %u", indexoid);
3239 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
3241 /* If the index is not a primary key, skip the check */
3242 if (indexStruct->indisprimary)
3245 * Loop over each attribute in the primary key and see if it
3246 * matches the to-be-altered attribute
3248 for (i = 0; i < indexStruct->indnatts; i++)
3250 if (indexStruct->indkey.values[i] == attnum)
3252 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3253 errmsg("column \"%s\" is in a primary key",
3258 ReleaseSysCache(indexTuple);
3261 list_free(indexoidlist);
3264 * Okay, actually perform the catalog change ... if needed
3266 if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3268 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE;
3270 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3272 /* keep the system catalog indexes current */
3273 CatalogUpdateIndexes(attr_rel, tuple);
3276 heap_close(attr_rel, RowExclusiveLock);
3280 * ALTER TABLE ALTER COLUMN SET NOT NULL
3283 ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
3284 const char *colName)
3289 NewConstraint *newcon;
3292 * lookup the attribute
3294 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3296 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3298 if (!HeapTupleIsValid(tuple))
3300 (errcode(ERRCODE_UNDEFINED_COLUMN),
3301 errmsg("column \"%s\" of relation \"%s\" does not exist",
3302 colName, RelationGetRelationName(rel))));
3304 attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3306 /* Prevent them from altering a system attribute */
3309 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3310 errmsg("cannot alter system column \"%s\"",
3314 * Okay, actually perform the catalog change ... if needed
3316 if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3318 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE;
3320 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3322 /* keep the system catalog indexes current */
3323 CatalogUpdateIndexes(attr_rel, tuple);
3325 /* Tell Phase 3 to test the constraint */
3326 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3327 newcon->contype = CONSTR_NOTNULL;
3328 newcon->attnum = attnum;
3329 newcon->name = "NOT NULL";
3331 tab->constraints = lappend(tab->constraints, newcon);
3334 heap_close(attr_rel, RowExclusiveLock);
3338 * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT
3341 ATExecColumnDefault(Relation rel, const char *colName,
3347 * get the number of the attribute
3349 attnum = get_attnum(RelationGetRelid(rel), colName);
3350 if (attnum == InvalidAttrNumber)
3352 (errcode(ERRCODE_UNDEFINED_COLUMN),
3353 errmsg("column \"%s\" of relation \"%s\" does not exist",
3354 colName, RelationGetRelationName(rel))));
3356 /* Prevent them from altering a system attribute */
3359 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3360 errmsg("cannot alter system column \"%s\"",
3364 * Remove any old default for the column. We use RESTRICT here for
3365 * safety, but at present we do not expect anything to depend on the
3368 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false);
3373 RawColumnDefault *rawEnt;
3375 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3376 rawEnt->attnum = attnum;
3377 rawEnt->raw_default = newDefault;
3380 * This function is intended for CREATE TABLE, so it processes a
3381 * _list_ of defaults, but we just do one.
3383 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3388 * ALTER TABLE ALTER COLUMN SET STATISTICS
3391 ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue)
3394 * We do our own permission checking because (a) we want to allow SET
3395 * STATISTICS on indexes (for expressional index columns), and (b) we want
3396 * to allow SET STATISTICS on system catalogs without requiring
3397 * allowSystemTableMods to be turned on.
3399 if (rel->rd_rel->relkind != RELKIND_RELATION &&
3400 rel->rd_rel->relkind != RELKIND_INDEX)
3402 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3403 errmsg("\"%s\" is not a table or index",
3404 RelationGetRelationName(rel))));
3406 /* Permissions checks */
3407 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
3408 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
3409 RelationGetRelationName(rel));
3413 ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
3416 Relation attrelation;
3418 Form_pg_attribute attrtuple;
3420 Assert(IsA(newValue, Integer));
3421 newtarget = intVal(newValue);
3424 * Limit target to a sane range
3429 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3430 errmsg("statistics target %d is too low",
3433 else if (newtarget > 1000)
3437 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3438 errmsg("lowering statistics target to %d",
3442 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3444 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3446 if (!HeapTupleIsValid(tuple))
3448 (errcode(ERRCODE_UNDEFINED_COLUMN),
3449 errmsg("column \"%s\" of relation \"%s\" does not exist",
3450 colName, RelationGetRelationName(rel))));
3451 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3453 if (attrtuple->attnum <= 0)
3455 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3456 errmsg("cannot alter system column \"%s\"",
3459 attrtuple->attstattarget = newtarget;
3461 simple_heap_update(attrelation, &tuple->t_self, tuple);
3463 /* keep system catalog indexes current */
3464 CatalogUpdateIndexes(attrelation, tuple);
3466 heap_freetuple(tuple);
3468 heap_close(attrelation, RowExclusiveLock);
3472 * ALTER TABLE ALTER COLUMN SET STORAGE
3475 ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
3479 Relation attrelation;
3481 Form_pg_attribute attrtuple;
3483 Assert(IsA(newValue, String));
3484 storagemode = strVal(newValue);
3486 if (pg_strcasecmp(storagemode, "plain") == 0)
3488 else if (pg_strcasecmp(storagemode, "external") == 0)
3490 else if (pg_strcasecmp(storagemode, "extended") == 0)
3492 else if (pg_strcasecmp(storagemode, "main") == 0)
3497 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3498 errmsg("invalid storage type \"%s\"",
3500 newstorage = 0; /* keep compiler quiet */
3503 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3505 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3507 if (!HeapTupleIsValid(tuple))
3509 (errcode(ERRCODE_UNDEFINED_COLUMN),
3510 errmsg("column \"%s\" of relation \"%s\" does not exist",
3511 colName, RelationGetRelationName(rel))));
3512 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3514 if (attrtuple->attnum <= 0)
3516 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3517 errmsg("cannot alter system column \"%s\"",
3521 * safety check: do not allow toasted storage modes unless column datatype
3524 if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid))
3525 attrtuple->attstorage = newstorage;
3528 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3529 errmsg("column data type %s can only have storage PLAIN",
3530 format_type_be(attrtuple->atttypid))));
3532 simple_heap_update(attrelation, &tuple->t_self, tuple);
3534 /* keep system catalog indexes current */
3535 CatalogUpdateIndexes(attrelation, tuple);
3537 heap_freetuple(tuple);
3539 heap_close(attrelation, RowExclusiveLock);
3544 * ALTER TABLE DROP COLUMN
3546 * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism,
3547 * because we have to decide at runtime whether to recurse or not depending
3548 * on whether attinhcount goes to zero or not. (We can't check this in a
3549 * static pre-pass because it won't handle multiple inheritance situations
3550 * correctly.) Since DROP COLUMN doesn't need to create any work queue
3551 * entries for Phase 3, it's okay to recurse internally in this routine
3552 * without considering the work queue.
3555 ATExecDropColumn(Relation rel, const char *colName,
3556 DropBehavior behavior,
3557 bool recurse, bool recursing)
3560 Form_pg_attribute targetatt;
3563 ObjectAddress object;
3565 /* At top level, permission check was done in ATPrepCmd, else do it */
3567 ATSimplePermissions(rel, false);
3570 * get the number of the attribute
3572 tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
3573 if (!HeapTupleIsValid(tuple))
3575 (errcode(ERRCODE_UNDEFINED_COLUMN),
3576 errmsg("column \"%s\" of relation \"%s\" does not exist",
3577 colName, RelationGetRelationName(rel))));
3578 targetatt = (Form_pg_attribute) GETSTRUCT(tuple);
3580 attnum = targetatt->attnum;
3582 /* Can't drop a system attribute, except OID */
3583 if (attnum <= 0 && attnum != ObjectIdAttributeNumber)
3585 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3586 errmsg("cannot drop system column \"%s\"",
3589 /* Don't drop inherited columns */
3590 if (targetatt->attinhcount > 0 && !recursing)
3592 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3593 errmsg("cannot drop inherited column \"%s\"",
3596 ReleaseSysCache(tuple);
3599 * Propagate to children as appropriate. Unlike most other ALTER
3600 * routines, we have to do this one level of recursion at a time; we can't
3601 * use find_all_inheritors to do it in one pass.
3603 children = find_inheritance_children(RelationGetRelid(rel));
3610 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3611 foreach(child, children)
3613 Oid childrelid = lfirst_oid(child);
3615 Form_pg_attribute childatt;
3617 childrel = heap_open(childrelid, AccessExclusiveLock);
3619 tuple = SearchSysCacheCopyAttName(childrelid, colName);
3620 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
3621 elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
3622 colName, childrelid);
3623 childatt = (Form_pg_attribute) GETSTRUCT(tuple);
3625 if (childatt->attinhcount <= 0) /* shouldn't happen */
3626 elog(ERROR, "relation %u has non-inherited attribute \"%s\"",
3627 childrelid, colName);
3632 * If the child column has other definition sources, just
3633 * decrement its inheritance count; if not, recurse to delete
3636 if (childatt->attinhcount == 1 && !childatt->attislocal)
3638 /* Time to delete this child column, too */
3639 ATExecDropColumn(childrel, colName, behavior, true, true);
3643 /* Child column must survive my deletion */
3644 childatt->attinhcount--;
3646 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3648 /* keep the system catalog indexes current */
3649 CatalogUpdateIndexes(attr_rel, tuple);
3651 /* Make update visible */
3652 CommandCounterIncrement();
3658 * If we were told to drop ONLY in this table (no recursion),
3659 * we need to mark the inheritors' attribute as locally
3660 * defined rather than inherited.
3662 childatt->attinhcount--;
3663 childatt->attislocal = true;
3665 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3667 /* keep the system catalog indexes current */
3668 CatalogUpdateIndexes(attr_rel, tuple);
3670 /* Make update visible */
3671 CommandCounterIncrement();
3674 heap_freetuple(tuple);
3676 heap_close(childrel, NoLock);
3678 heap_close(attr_rel, RowExclusiveLock);
3682 * Perform the actual column deletion
3684 object.classId = RelationRelationId;
3685 object.objectId = RelationGetRelid(rel);
3686 object.objectSubId = attnum;
3688 performDeletion(&object, behavior);
3691 * If we dropped the OID column, must adjust pg_class.relhasoids
3693 if (attnum == ObjectIdAttributeNumber)
3696 Form_pg_class tuple_class;
3698 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
3700 tuple = SearchSysCacheCopy(RELOID,
3701 ObjectIdGetDatum(RelationGetRelid(rel)),
3703 if (!HeapTupleIsValid(tuple))
3704 elog(ERROR, "cache lookup failed for relation %u",
3705 RelationGetRelid(rel));
3706 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
3708 tuple_class->relhasoids = false;
3709 simple_heap_update(class_rel, &tuple->t_self, tuple);
3711 /* Keep the catalog indexes up to date */
3712 CatalogUpdateIndexes(class_rel, tuple);
3714 heap_close(class_rel, RowExclusiveLock);
3719 * ALTER TABLE ADD INDEX
3721 * There is no such command in the grammar, but the parser converts UNIQUE
3722 * and PRIMARY KEY constraints into AT_AddIndex subcommands. This lets us
3723 * schedule creation of the index at the appropriate time during ALTER.
3726 ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
3727 IndexStmt *stmt, bool is_rebuild)
3733 Assert(IsA(stmt, IndexStmt));
3735 /* suppress schema rights check when rebuilding existing index */
3736 check_rights = !is_rebuild;
3737 /* skip index build if phase 3 will have to rewrite table anyway */
3738 skip_build = (tab->newvals != NIL);
3739 /* suppress notices when rebuilding existing index */
3742 DefineIndex(stmt->relation, /* relation */
3743 stmt->idxname, /* index name */
3744 InvalidOid, /* no predefined OID */
3745 stmt->accessMethod, /* am name */
3747 stmt->indexParams, /* parameters */
3748 (Expr *) stmt->whereClause,
3753 true, /* is_alter_table */
3760 * ALTER TABLE ADD CONSTRAINT
3763 ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
3765 switch (nodeTag(newConstraint))
3769 Constraint *constr = (Constraint *) newConstraint;
3772 * Currently, we only expect to see CONSTR_CHECK nodes
3773 * arriving here (see the preprocessing done in
3774 * parser/analyze.c). Use a switch anyway to make it easier
3775 * to add more code later.
3777 switch (constr->contype)
3785 * Call AddRelationRawConstraints to do the work.
3786 * It returns a list of cooked constraints.
3788 newcons = AddRelationRawConstraints(rel, NIL,
3789 list_make1(constr));
3790 /* Add each constraint to Phase 3's queue */
3791 foreach(lcon, newcons)
3793 CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
3794 NewConstraint *newcon;
3796 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3797 newcon->name = ccon->name;
3798 newcon->contype = ccon->contype;
3799 newcon->attnum = ccon->attnum;
3800 /* ExecQual wants implicit-AND format */
3801 newcon->qual = (Node *)
3802 make_ands_implicit((Expr *) ccon->expr);
3804 tab->constraints = lappend(tab->constraints,
3810 elog(ERROR, "unrecognized constraint type: %d",
3811 (int) constr->contype);
3815 case T_FkConstraint:
3817 FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
3820 * Assign or validate constraint name
3822 if (fkconstraint->constr_name)
3824 if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
3825 RelationGetRelid(rel),
3826 RelationGetNamespace(rel),
3827 fkconstraint->constr_name))
3829 (errcode(ERRCODE_DUPLICATE_OBJECT),
3830 errmsg("constraint \"%s\" for relation \"%s\" already exists",
3831 fkconstraint->constr_name,
3832 RelationGetRelationName(rel))));
3835 fkconstraint->constr_name =
3836 ChooseConstraintName(RelationGetRelationName(rel),
3837 strVal(linitial(fkconstraint->fk_attrs)),
3839 RelationGetNamespace(rel),
3842 ATAddForeignKeyConstraint(tab, rel, fkconstraint);
3847 elog(ERROR, "unrecognized node type: %d",
3848 (int) nodeTag(newConstraint));
3853 * Add a foreign-key constraint to a single table
3855 * Subroutine for ATExecAddConstraint. Must already hold exclusive
3856 * lock on the rel, and have done appropriate validity/permissions checks
3860 ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
3861 FkConstraint *fkconstraint)
3864 AclResult aclresult;
3865 int16 pkattnum[INDEX_MAX_KEYS];
3866 int16 fkattnum[INDEX_MAX_KEYS];
3867 Oid pktypoid[INDEX_MAX_KEYS];
3868 Oid fktypoid[INDEX_MAX_KEYS];
3869 Oid opclasses[INDEX_MAX_KEYS];
3877 * Grab an exclusive lock on the pk table, so that someone doesn't delete
3878 * rows out from under us. (Although a lesser lock would do for that
3879 * purpose, we'll need exclusive lock anyway to add triggers to the pk
3880 * table; trying to start with a lesser lock will just create a risk of
3883 pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
3886 * Validity and permissions checks
3888 * Note: REFERENCES permissions checks are redundant with CREATE TRIGGER,
3889 * but we may as well error out sooner instead of later.
3891 if (pkrel->rd_rel->relkind != RELKIND_RELATION)
3893 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3894 errmsg("referenced relation \"%s\" is not a table",
3895 RelationGetRelationName(pkrel))));
3897 aclresult = pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(),
3899 if (aclresult != ACLCHECK_OK)
3900 aclcheck_error(aclresult, ACL_KIND_CLASS,
3901 RelationGetRelationName(pkrel));
3903 if (!allowSystemTableMods && IsSystemRelation(pkrel))
3905 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
3906 errmsg("permission denied: \"%s\" is a system catalog",
3907 RelationGetRelationName(pkrel))));
3909 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
3911 if (aclresult != ACLCHECK_OK)
3912 aclcheck_error(aclresult, ACL_KIND_CLASS,
3913 RelationGetRelationName(rel));
3916 * Disallow reference from permanent table to temp table or vice versa.
3917 * (The ban on perm->temp is for fairly obvious reasons. The ban on
3918 * temp->perm is because other backends might need to run the RI triggers
3919 * on the perm table, but they can't reliably see tuples the owning
3920 * backend has created in the temp table, because non-shared buffers are
3921 * used for temp tables.)
3923 if (isTempNamespace(RelationGetNamespace(pkrel)))
3925 if (!isTempNamespace(RelationGetNamespace(rel)))
3927 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3928 errmsg("cannot reference temporary table from permanent table constraint")));
3932 if (isTempNamespace(RelationGetNamespace(rel)))
3934 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3935 errmsg("cannot reference permanent table from temporary table constraint")));
3939 * Look up the referencing attributes to make sure they exist, and record
3940 * their attnums and type OIDs.
3942 MemSet(pkattnum, 0, sizeof(pkattnum));
3943 MemSet(fkattnum, 0, sizeof(fkattnum));
3944 MemSet(pktypoid, 0, sizeof(pktypoid));
3945 MemSet(fktypoid, 0, sizeof(fktypoid));
3946 MemSet(opclasses, 0, sizeof(opclasses));
3948 numfks = transformColumnNameList(RelationGetRelid(rel),
3949 fkconstraint->fk_attrs,
3950 fkattnum, fktypoid);
3953 * If the attribute list for the referenced table was omitted, lookup the
3954 * definition of the primary key and use it. Otherwise, validate the
3955 * supplied attribute list. In either case, discover the index OID and
3956 * index opclasses, and the attnums and type OIDs of the attributes.
3958 if (fkconstraint->pk_attrs == NIL)
3960 numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
3961 &fkconstraint->pk_attrs,
3967 numpks = transformColumnNameList(RelationGetRelid(pkrel),
3968 fkconstraint->pk_attrs,
3969 pkattnum, pktypoid);
3970 /* Look for an index matching the column list */
3971 indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum,
3975 /* Be sure referencing and referenced column types are comparable */
3976 if (numfks != numpks)
3978 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
3979 errmsg("number of referencing and referenced columns for foreign key disagree")));
3981 for (i = 0; i < numpks; i++)
3984 * pktypoid[i] is the primary key table's i'th key's type fktypoid[i]
3985 * is the foreign key table's i'th key's type
3987 * Note that we look for an operator with the PK type on the left;
3988 * when the types are different this is critical because the PK index
3989 * will need operators with the indexkey on the left. (Ordinarily both
3990 * commutator operators will exist if either does, but we won't get
3991 * the right answer from the test below on opclass membership unless
3992 * we select the proper operator.)
3994 Operator o = oper(list_make1(makeString("=")),
3995 pktypoid[i], fktypoid[i], true);
3999 (errcode(ERRCODE_UNDEFINED_FUNCTION),
4000 errmsg("foreign key constraint \"%s\" "
4001 "cannot be implemented",
4002 fkconstraint->constr_name),
4003 errdetail("Key columns \"%s\" and \"%s\" "
4004 "are of incompatible types: %s and %s.",
4005 strVal(list_nth(fkconstraint->fk_attrs, i)),
4006 strVal(list_nth(fkconstraint->pk_attrs, i)),
4007 format_type_be(fktypoid[i]),
4008 format_type_be(pktypoid[i]))));
4011 * Check that the found operator is compatible with the PK index, and
4012 * generate a warning if not, since otherwise costly seqscans will be
4013 * incurred to check FK validity.
4015 if (!op_in_opclass(oprid(o), opclasses[i]))
4017 (errmsg("foreign key constraint \"%s\" "
4018 "will require costly sequential scans",
4019 fkconstraint->constr_name),
4020 errdetail("Key columns \"%s\" and \"%s\" "
4021 "are of different types: %s and %s.",
4022 strVal(list_nth(fkconstraint->fk_attrs, i)),
4023 strVal(list_nth(fkconstraint->pk_attrs, i)),
4024 format_type_be(fktypoid[i]),
4025 format_type_be(pktypoid[i]))));
4031 * Tell Phase 3 to check that the constraint is satisfied by existing rows
4032 * (we can skip this during table creation).
4034 if (!fkconstraint->skip_validation)
4036 NewConstraint *newcon;
4038 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
4039 newcon->name = fkconstraint->constr_name;
4040 newcon->contype = CONSTR_FOREIGN;
4041 newcon->refrelid = RelationGetRelid(pkrel);
4042 newcon->qual = (Node *) fkconstraint;
4044 tab->constraints = lappend(tab->constraints, newcon);
4048 * Record the FK constraint in pg_constraint.
4050 constrOid = CreateConstraintEntry(fkconstraint->constr_name,
4051 RelationGetNamespace(rel),
4053 fkconstraint->deferrable,
4054 fkconstraint->initdeferred,
4055 RelationGetRelid(rel),
4058 InvalidOid, /* not a domain
4060 RelationGetRelid(pkrel),
4063 fkconstraint->fk_upd_action,
4064 fkconstraint->fk_del_action,
4065 fkconstraint->fk_matchtype,
4067 NULL, /* no check constraint */
4072 * Create the triggers that will enforce the constraint.
4074 createForeignKeyTriggers(rel, fkconstraint, constrOid);
4077 * Close pk table, but keep lock until we've committed.
4079 heap_close(pkrel, NoLock);
4084 * transformColumnNameList - transform list of column names
4086 * Lookup each name and return its attnum and type OID
4089 transformColumnNameList(Oid relId, List *colList,
4090 int16 *attnums, Oid *atttypids)
4098 char *attname = strVal(lfirst(l));
4101 atttuple = SearchSysCacheAttName(relId, attname);
4102 if (!HeapTupleIsValid(atttuple))
4104 (errcode(ERRCODE_UNDEFINED_COLUMN),
4105 errmsg("column \"%s\" referenced in foreign key constraint does not exist",
4107 if (attnum >= INDEX_MAX_KEYS)
4109 (errcode(ERRCODE_TOO_MANY_COLUMNS),
4110 errmsg("cannot have more than %d keys in a foreign key",
4112 attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum;
4113 atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
4114 ReleaseSysCache(atttuple);
4122 * transformFkeyGetPrimaryKey -
4124 * Look up the names, attnums, and types of the primary key attributes
4125 * for the pkrel. Also return the index OID and index opclasses of the
4126 * index supporting the primary key.
4128 * All parameters except pkrel are output parameters. Also, the function
4129 * return value is the number of attributes in the primary key.
4131 * Used when the column list in the REFERENCES specification is omitted.
4134 transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
4136 int16 *attnums, Oid *atttypids,
4140 ListCell *indexoidscan;
4141 HeapTuple indexTuple = NULL;
4142 Form_pg_index indexStruct = NULL;
4143 Datum indclassDatum;
4145 oidvector *indclass;
4149 * Get the list of index OIDs for the table from the relcache, and look up
4150 * each one in the pg_index syscache until we find one marked primary key
4151 * (hopefully there isn't more than one such).
4153 *indexOid = InvalidOid;
4155 indexoidlist = RelationGetIndexList(pkrel);
4157 foreach(indexoidscan, indexoidlist)
4159 Oid indexoid = lfirst_oid(indexoidscan);
4161 indexTuple = SearchSysCache(INDEXRELID,
4162 ObjectIdGetDatum(indexoid),
4164 if (!HeapTupleIsValid(indexTuple))
4165 elog(ERROR, "cache lookup failed for index %u", indexoid);
4166 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4167 if (indexStruct->indisprimary)
4169 *indexOid = indexoid;
4172 ReleaseSysCache(indexTuple);
4175 list_free(indexoidlist);
4178 * Check that we found it
4180 if (!OidIsValid(*indexOid))
4182 (errcode(ERRCODE_UNDEFINED_OBJECT),
4183 errmsg("there is no primary key for referenced table \"%s\"",
4184 RelationGetRelationName(pkrel))));
4186 /* Must get indclass the hard way */
4187 indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4188 Anum_pg_index_indclass, &isnull);
4190 indclass = (oidvector *) DatumGetPointer(indclassDatum);
4193 * Now build the list of PK attributes from the indkey definition (we
4194 * assume a primary key cannot have expressional elements)
4197 for (i = 0; i < indexStruct->indnatts; i++)
4199 int pkattno = indexStruct->indkey.values[i];
4201 attnums[i] = pkattno;
4202 atttypids[i] = attnumTypeId(pkrel, pkattno);
4203 opclasses[i] = indclass->values[i];
4204 *attnamelist = lappend(*attnamelist,
4205 makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
4208 ReleaseSysCache(indexTuple);
4214 * transformFkeyCheckAttrs -
4216 * Make sure that the attributes of a referenced table belong to a unique
4217 * (or primary key) constraint. Return the OID of the index supporting
4218 * the constraint, as well as the opclasses associated with the index
4222 transformFkeyCheckAttrs(Relation pkrel,
4223 int numattrs, int16 *attnums,
4224 Oid *opclasses) /* output parameter */
4226 Oid indexoid = InvalidOid;
4229 ListCell *indexoidscan;
4232 * Get the list of index OIDs for the table from the relcache, and look up
4233 * each one in the pg_index syscache, and match unique indexes to the list
4234 * of attnums we are given.
4236 indexoidlist = RelationGetIndexList(pkrel);
4238 foreach(indexoidscan, indexoidlist)
4240 HeapTuple indexTuple;
4241 Form_pg_index indexStruct;
4245 indexoid = lfirst_oid(indexoidscan);
4246 indexTuple = SearchSysCache(INDEXRELID,
4247 ObjectIdGetDatum(indexoid),
4249 if (!HeapTupleIsValid(indexTuple))
4250 elog(ERROR, "cache lookup failed for index %u", indexoid);
4251 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4254 * Must have the right number of columns; must be unique and not a
4255 * partial index; forget it if there are any expressions, too
4257 if (indexStruct->indnatts == numattrs &&
4258 indexStruct->indisunique &&
4259 heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
4260 heap_attisnull(indexTuple, Anum_pg_index_indexprs))
4262 /* Must get indclass the hard way */
4263 Datum indclassDatum;
4265 oidvector *indclass;
4267 indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4268 Anum_pg_index_indclass, &isnull);
4270 indclass = (oidvector *) DatumGetPointer(indclassDatum);
4273 * The given attnum list may match the index columns in any order.
4274 * Check that each list is a subset of the other.
4276 for (i = 0; i < numattrs; i++)
4279 for (j = 0; j < numattrs; j++)
4281 if (attnums[i] == indexStruct->indkey.values[j])
4292 for (i = 0; i < numattrs; i++)
4295 for (j = 0; j < numattrs; j++)
4297 if (attnums[j] == indexStruct->indkey.values[i])
4299 opclasses[j] = indclass->values[i];
4309 ReleaseSysCache(indexTuple);
4316 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4317 errmsg("there is no unique constraint matching given keys for referenced table \"%s\"",
4318 RelationGetRelationName(pkrel))));
4320 list_free(indexoidlist);
4326 * Scan the existing rows in a table to verify they meet a proposed FK
4329 * Caller must have opened and locked both relations.
4332 validateForeignKeyConstraint(FkConstraint *fkconstraint,
4343 * See if we can do it with a single LEFT JOIN query. A FALSE result
4344 * indicates we must proceed with the fire-the-trigger method.
4346 if (RI_Initial_Check(fkconstraint, rel, pkrel))
4350 * Scan through each tuple, calling RI_FKey_check_ins (insert trigger) as
4351 * if that tuple had just been inserted. If any of those fail, it should
4352 * ereport(ERROR) and that's that.
4354 MemSet(&trig, 0, sizeof(trig));
4355 trig.tgoid = InvalidOid;
4356 trig.tgname = fkconstraint->constr_name;
4357 trig.tgenabled = TRUE;
4358 trig.tgisconstraint = TRUE;
4359 trig.tgconstrrelid = RelationGetRelid(pkrel);
4360 trig.tgdeferrable = FALSE;
4361 trig.tginitdeferred = FALSE;
4363 trig.tgargs = (char **) palloc(sizeof(char *) *
4364 (4 + list_length(fkconstraint->fk_attrs)
4365 + list_length(fkconstraint->pk_attrs)));
4367 trig.tgargs[0] = trig.tgname;
4368 trig.tgargs[1] = RelationGetRelationName(rel);
4369 trig.tgargs[2] = RelationGetRelationName(pkrel);
4370 trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
4372 foreach(list, fkconstraint->fk_attrs)
4374 char *fk_at = strVal(lfirst(list));
4376 trig.tgargs[count] = fk_at;
4380 foreach(list, fkconstraint->pk_attrs)
4382 char *pk_at = strVal(lfirst(list));
4384 trig.tgargs[count] = pk_at;
4387 trig.tgnargs = count - 1;
4389 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
4391 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
4393 FunctionCallInfoData fcinfo;
4394 TriggerData trigdata;
4397 * Make a call to the trigger function
4399 * No parameters are passed, but we do set a context
4401 MemSet(&fcinfo, 0, sizeof(fcinfo));
4404 * We assume RI_FKey_check_ins won't look at flinfo...
4406 trigdata.type = T_TriggerData;
4407 trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
4408 trigdata.tg_relation = rel;
4409 trigdata.tg_trigtuple = tuple;
4410 trigdata.tg_newtuple = NULL;
4411 trigdata.tg_trigger = &trig;
4412 trigdata.tg_trigtuplebuf = scan->rs_cbuf;
4413 trigdata.tg_newtuplebuf = InvalidBuffer;
4415 fcinfo.context = (Node *) &trigdata;
4417 RI_FKey_check_ins(&fcinfo);
4426 CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
4427 ObjectAddress *constrobj, ObjectAddress *trigobj,
4430 CreateTrigStmt *fk_trigger;
4434 fk_trigger = makeNode(CreateTrigStmt);
4435 fk_trigger->trigname = fkconstraint->constr_name;
4436 fk_trigger->relation = myRel;
4437 fk_trigger->before = false;
4438 fk_trigger->row = true;
4440 /* Either ON INSERT or ON UPDATE */
4443 fk_trigger->funcname = SystemFuncName("RI_FKey_check_ins");
4444 fk_trigger->actions[0] = 'i';
4448 fk_trigger->funcname = SystemFuncName("RI_FKey_check_upd");
4449 fk_trigger->actions[0] = 'u';
4451 fk_trigger->actions[1] = '\0';
4453 fk_trigger->isconstraint = true;
4454 fk_trigger->deferrable = fkconstraint->deferrable;
4455 fk_trigger->initdeferred = fkconstraint->initdeferred;
4456 fk_trigger->constrrel = fkconstraint->pktable;
4458 fk_trigger->args = NIL;
4459 fk_trigger->args = lappend(fk_trigger->args,
4460 makeString(fkconstraint->constr_name));
4461 fk_trigger->args = lappend(fk_trigger->args,
4462 makeString(myRel->relname));
4463 fk_trigger->args = lappend(fk_trigger->args,
4464 makeString(fkconstraint->pktable->relname));
4465 fk_trigger->args = lappend(fk_trigger->args,
4466 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4467 if (list_length(fkconstraint->fk_attrs) != list_length(fkconstraint->pk_attrs))
4469 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4470 errmsg("number of referencing and referenced columns for foreign key disagree")));
4472 forboth(fk_attr, fkconstraint->fk_attrs,
4473 pk_attr, fkconstraint->pk_attrs)
4475 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4476 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4479 trigobj->objectId = CreateTrigger(fk_trigger, true);
4481 /* Register dependency from trigger to constraint */
4482 recordDependencyOn(trigobj, constrobj, DEPENDENCY_INTERNAL);
4484 /* Make changes-so-far visible */
4485 CommandCounterIncrement();
4489 * Create the triggers that implement an FK constraint.
4492 createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
4496 CreateTrigStmt *fk_trigger;
4499 ObjectAddress trigobj,
4503 * Reconstruct a RangeVar for my relation (not passed in, unfortunately).
4505 myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
4506 pstrdup(RelationGetRelationName(rel)));
4509 * Preset objectAddress fields
4511 constrobj.classId = ConstraintRelationId;
4512 constrobj.objectId = constrOid;
4513 constrobj.objectSubId = 0;
4514 trigobj.classId = TriggerRelationId;
4515 trigobj.objectSubId = 0;
4517 /* Make changes-so-far visible */
4518 CommandCounterIncrement();
4521 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the CHECK
4522 * action for both INSERTs and UPDATEs on the referencing table.
4524 CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, true);
4525 CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, false);
4528 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4529 * DELETE action on the referenced table.
4531 fk_trigger = makeNode(CreateTrigStmt);
4532 fk_trigger->trigname = fkconstraint->constr_name;
4533 fk_trigger->relation = fkconstraint->pktable;
4534 fk_trigger->before = false;
4535 fk_trigger->row = true;
4536 fk_trigger->actions[0] = 'd';
4537 fk_trigger->actions[1] = '\0';
4539 fk_trigger->isconstraint = true;
4540 fk_trigger->constrrel = myRel;
4541 switch (fkconstraint->fk_del_action)
4543 case FKCONSTR_ACTION_NOACTION:
4544 fk_trigger->deferrable = fkconstraint->deferrable;
4545 fk_trigger->initdeferred = fkconstraint->initdeferred;
4546 fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_del");
4548 case FKCONSTR_ACTION_RESTRICT:
4549 fk_trigger->deferrable = false;
4550 fk_trigger->initdeferred = false;
4551 fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_del");
4553 case FKCONSTR_ACTION_CASCADE:
4554 fk_trigger->deferrable = false;
4555 fk_trigger->initdeferred = false;
4556 fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_del");
4558 case FKCONSTR_ACTION_SETNULL:
4559 fk_trigger->deferrable = false;
4560 fk_trigger->initdeferred = false;
4561 fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_del");
4563 case FKCONSTR_ACTION_SETDEFAULT:
4564 fk_trigger->deferrable = false;
4565 fk_trigger->initdeferred = false;
4566 fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_del");
4569 elog(ERROR, "unrecognized FK action type: %d",
4570 (int) fkconstraint->fk_del_action);
4574 fk_trigger->args = NIL;
4575 fk_trigger->args = lappend(fk_trigger->args,
4576 makeString(fkconstraint->constr_name));
4577 fk_trigger->args = lappend(fk_trigger->args,
4578 makeString(myRel->relname));
4579 fk_trigger->args = lappend(fk_trigger->args,
4580 makeString(fkconstraint->pktable->relname));
4581 fk_trigger->args = lappend(fk_trigger->args,
4582 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4583 forboth(fk_attr, fkconstraint->fk_attrs,
4584 pk_attr, fkconstraint->pk_attrs)
4586 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4587 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4590 trigobj.objectId = CreateTrigger(fk_trigger, true);
4592 /* Register dependency from trigger to constraint */
4593 recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4595 /* Make changes-so-far visible */
4596 CommandCounterIncrement();
4599 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4600 * UPDATE action on the referenced table.
4602 fk_trigger = makeNode(CreateTrigStmt);
4603 fk_trigger->trigname = fkconstraint->constr_name;
4604 fk_trigger->relation = fkconstraint->pktable;
4605 fk_trigger->before = false;
4606 fk_trigger->row = true;
4607 fk_trigger->actions[0] = 'u';
4608 fk_trigger->actions[1] = '\0';
4609 fk_trigger->isconstraint = true;
4610 fk_trigger->constrrel = myRel;
4611 switch (fkconstraint->fk_upd_action)
4613 case FKCONSTR_ACTION_NOACTION:
4614 fk_trigger->deferrable = fkconstraint->deferrable;
4615 fk_trigger->initdeferred = fkconstraint->initdeferred;
4616 fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_upd");
4618 case FKCONSTR_ACTION_RESTRICT:
4619 fk_trigger->deferrable = false;
4620 fk_trigger->initdeferred = false;
4621 fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_upd");
4623 case FKCONSTR_ACTION_CASCADE:
4624 fk_trigger->deferrable = false;
4625 fk_trigger->initdeferred = false;
4626 fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_upd");
4628 case FKCONSTR_ACTION_SETNULL:
4629 fk_trigger->deferrable = false;
4630 fk_trigger->initdeferred = false;
4631 fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_upd");
4633 case FKCONSTR_ACTION_SETDEFAULT:
4634 fk_trigger->deferrable = false;
4635 fk_trigger->initdeferred = false;
4636 fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_upd");
4639 elog(ERROR, "unrecognized FK action type: %d",
4640 (int) fkconstraint->fk_upd_action);
4644 fk_trigger->args = NIL;
4645 fk_trigger->args = lappend(fk_trigger->args,
4646 makeString(fkconstraint->constr_name));
4647 fk_trigger->args = lappend(fk_trigger->args,
4648 makeString(myRel->relname));
4649 fk_trigger->args = lappend(fk_trigger->args,
4650 makeString(fkconstraint->pktable->relname));
4651 fk_trigger->args = lappend(fk_trigger->args,
4652 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4653 forboth(fk_attr, fkconstraint->fk_attrs,
4654 pk_attr, fkconstraint->pk_attrs)
4656 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4657 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4660 trigobj.objectId = CreateTrigger(fk_trigger, true);
4662 /* Register dependency from trigger to constraint */
4663 recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4667 * fkMatchTypeToString -
4668 * convert FKCONSTR_MATCH_xxx code to string to use in trigger args
4671 fkMatchTypeToString(char match_type)
4675 case FKCONSTR_MATCH_FULL:
4676 return pstrdup("FULL");
4677 case FKCONSTR_MATCH_PARTIAL:
4678 return pstrdup("PARTIAL");
4679 case FKCONSTR_MATCH_UNSPECIFIED:
4680 return pstrdup("UNSPECIFIED");
4682 elog(ERROR, "unrecognized match type: %d",
4685 return NULL; /* can't get here */
4689 * ALTER TABLE DROP CONSTRAINT
4692 ATPrepDropConstraint(List **wqueue, Relation rel,
4693 bool recurse, AlterTableCmd *cmd)
4696 * We don't want errors or noise from child tables, so we have to pass
4697 * down a modified command.
4701 AlterTableCmd *childCmd = copyObject(cmd);
4703 childCmd->subtype = AT_DropConstraintQuietly;
4704 ATSimpleRecursion(wqueue, rel, childCmd, recurse);
4709 ATExecDropConstraint(Relation rel, const char *constrName,
4710 DropBehavior behavior, bool quiet)
4714 deleted = RemoveRelConstraints(rel, constrName, behavior);
4718 /* If zero constraints deleted, complain */
4721 (errcode(ERRCODE_UNDEFINED_OBJECT),
4722 errmsg("constraint \"%s\" does not exist",
4724 /* Otherwise if more than one constraint deleted, notify */
4725 else if (deleted > 1)
4727 (errmsg("multiple constraints named \"%s\" were dropped",
4736 ATPrepAlterColumnType(List **wqueue,
4737 AlteredTableInfo *tab, Relation rel,
4738 bool recurse, bool recursing,
4741 char *colName = cmd->name;
4742 TypeName *typename = (TypeName *) cmd->def;
4744 Form_pg_attribute attTup;
4748 NewColumnValue *newval;
4749 ParseState *pstate = make_parsestate(NULL);
4751 /* lookup the attribute so we can check inheritance status */
4752 tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
4753 if (!HeapTupleIsValid(tuple))
4755 (errcode(ERRCODE_UNDEFINED_COLUMN),
4756 errmsg("column \"%s\" of relation \"%s\" does not exist",
4757 colName, RelationGetRelationName(rel))));
4758 attTup = (Form_pg_attribute) GETSTRUCT(tuple);
4759 attnum = attTup->attnum;
4761 /* Can't alter a system attribute */
4764 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4765 errmsg("cannot alter system column \"%s\"",
4768 /* Don't alter inherited columns */
4769 if (attTup->attinhcount > 0 && !recursing)
4771 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4772 errmsg("cannot alter inherited column \"%s\"",
4775 /* Look up the target type */
4776 targettype = LookupTypeName(typename);
4777 if (!OidIsValid(targettype))
4779 (errcode(ERRCODE_UNDEFINED_OBJECT),
4780 errmsg("type \"%s\" does not exist",
4781 TypeNameToString(typename))));
4783 /* make sure datatype is legal for a column */
4784 CheckAttributeType(colName, targettype);
4787 * Set up an expression to transform the old data value to the new type.
4788 * If a USING option was given, transform and use that expression, else
4789 * just take the old value and try to coerce it. We do this first so that
4790 * type incompatibility can be detected before we waste effort, and
4791 * because we need the expression to be parsed against the original table
4798 /* Expression must be able to access vars of old table */
4799 rte = addRangeTableEntryForRelation(pstate,
4804 addRTEtoQuery(pstate, rte, false, true, true);
4806 transform = transformExpr(pstate, cmd->transform);
4808 /* It can't return a set */
4809 if (expression_returns_set(transform))
4811 (errcode(ERRCODE_DATATYPE_MISMATCH),
4812 errmsg("transform expression must not return a set")));
4814 /* No subplans or aggregates, either... */
4815 if (pstate->p_hasSubLinks)
4817 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4818 errmsg("cannot use subquery in transform expression")));
4819 if (pstate->p_hasAggs)
4821 (errcode(ERRCODE_GROUPING_ERROR),
4822 errmsg("cannot use aggregate function in transform expression")));
4826 transform = (Node *) makeVar(1, attnum,
4827 attTup->atttypid, attTup->atttypmod,
4831 transform = coerce_to_target_type(pstate,
4832 transform, exprType(transform),
4833 targettype, typename->typmod,
4834 COERCION_ASSIGNMENT,
4835 COERCE_IMPLICIT_CAST);
4836 if (transform == NULL)
4838 (errcode(ERRCODE_DATATYPE_MISMATCH),
4839 errmsg("column \"%s\" cannot be cast to type \"%s\"",
4840 colName, TypeNameToString(typename))));
4843 * Add a work queue item to make ATRewriteTable update the column
4846 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
4847 newval->attnum = attnum;
4848 newval->expr = (Expr *) transform;
4850 tab->newvals = lappend(tab->newvals, newval);
4852 ReleaseSysCache(tuple);
4855 * The recursion case is handled by ATSimpleRecursion. However, if we are
4856 * told not to recurse, there had better not be any child tables; else the
4857 * alter would put them out of step.
4860 ATSimpleRecursion(wqueue, rel, cmd, recurse);
4861 else if (!recursing &&
4862 find_inheritance_children(RelationGetRelid(rel)) != NIL)
4864 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4865 errmsg("type of inherited column \"%s\" must be changed in child tables too",
4870 ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
4871 const char *colName, TypeName *typename)
4874 Form_pg_attribute attTup;
4876 HeapTuple typeTuple;
4880 Relation attrelation;
4886 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
4888 /* Look up the target column */
4889 heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
4890 if (!HeapTupleIsValid(heapTup)) /* shouldn't happen */
4892 (errcode(ERRCODE_UNDEFINED_COLUMN),
4893 errmsg("column \"%s\" of relation \"%s\" does not exist",
4894 colName, RelationGetRelationName(rel))));
4895 attTup = (Form_pg_attribute) GETSTRUCT(heapTup);
4896 attnum = attTup->attnum;
4898 /* Check for multiple ALTER TYPE on same column --- can't cope */
4899 if (attTup->atttypid != tab->oldDesc->attrs[attnum - 1]->atttypid ||
4900 attTup->atttypmod != tab->oldDesc->attrs[attnum - 1]->atttypmod)
4902 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4903 errmsg("cannot alter type of column \"%s\" twice",
4906 /* Look up the target type (should not fail, since prep found it) */
4907 typeTuple = typenameType(typename);
4908 tform = (Form_pg_type) GETSTRUCT(typeTuple);
4909 targettype = HeapTupleGetOid(typeTuple);
4912 * If there is a default expression for the column, get it and ensure we
4913 * can coerce it to the new datatype. (We must do this before changing
4914 * the column type, because build_column_default itself will try to
4915 * coerce, and will not issue the error message we want if it fails.)
4917 * We remove any implicit coercion steps at the top level of the old
4918 * default expression; this has been agreed to satisfy the principle of
4919 * least surprise. (The conversion to the new column type should act like
4920 * it started from what the user sees as the stored expression, and the
4921 * implicit coercions aren't going to be shown.)
4923 if (attTup->atthasdef)
4925 defaultexpr = build_column_default(rel, attnum);
4926 Assert(defaultexpr);
4927 defaultexpr = strip_implicit_coercions(defaultexpr);
4928 defaultexpr = coerce_to_target_type(NULL, /* no UNKNOWN params */
4929 defaultexpr, exprType(defaultexpr),
4930 targettype, typename->typmod,
4931 COERCION_ASSIGNMENT,
4932 COERCE_IMPLICIT_CAST);
4933 if (defaultexpr == NULL)
4935 (errcode(ERRCODE_DATATYPE_MISMATCH),
4936 errmsg("default for column \"%s\" cannot be cast to type \"%s\"",
4937 colName, TypeNameToString(typename))));
4943 * Find everything that depends on the column (constraints, indexes, etc),
4944 * and record enough information to let us recreate the objects.
4946 * The actual recreation does not happen here, but only after we have
4947 * performed all the individual ALTER TYPE operations. We have to save
4948 * the info before executing ALTER TYPE, though, else the deparser will
4951 * There could be multiple entries for the same object, so we must check
4952 * to ensure we process each one only once. Note: we assume that an index
4953 * that implements a constraint will not show a direct dependency on the
4956 depRel = heap_open(DependRelationId, RowExclusiveLock);
4958 ScanKeyInit(&key[0],
4959 Anum_pg_depend_refclassid,
4960 BTEqualStrategyNumber, F_OIDEQ,
4961 ObjectIdGetDatum(RelationRelationId));
4962 ScanKeyInit(&key[1],
4963 Anum_pg_depend_refobjid,
4964 BTEqualStrategyNumber, F_OIDEQ,
4965 ObjectIdGetDatum(RelationGetRelid(rel)));
4966 ScanKeyInit(&key[2],
4967 Anum_pg_depend_refobjsubid,
4968 BTEqualStrategyNumber, F_INT4EQ,
4969 Int32GetDatum((int32) attnum));
4971 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
4972 SnapshotNow, 3, key);
4974 while (HeapTupleIsValid(depTup = systable_getnext(scan)))
4976 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
4977 ObjectAddress foundObject;
4979 /* We don't expect any PIN dependencies on columns */
4980 if (foundDep->deptype == DEPENDENCY_PIN)
4981 elog(ERROR, "cannot alter type of a pinned column");
4983 foundObject.classId = foundDep->classid;
4984 foundObject.objectId = foundDep->objid;
4985 foundObject.objectSubId = foundDep->objsubid;
4987 switch (getObjectClass(&foundObject))
4991 char relKind = get_rel_relkind(foundObject.objectId);
4993 if (relKind == RELKIND_INDEX)
4995 Assert(foundObject.objectSubId == 0);
4996 if (!list_member_oid(tab->changedIndexOids, foundObject.objectId))
4998 tab->changedIndexOids = lappend_oid(tab->changedIndexOids,
4999 foundObject.objectId);
5000 tab->changedIndexDefs = lappend(tab->changedIndexDefs,
5001 pg_get_indexdef_string(foundObject.objectId));
5004 else if (relKind == RELKIND_SEQUENCE)
5007 * This must be a SERIAL column's sequence. We need
5008 * not do anything to it.
5010 Assert(foundObject.objectSubId == 0);
5014 /* Not expecting any other direct dependencies... */
5015 elog(ERROR, "unexpected object depending on column: %s",
5016 getObjectDescription(&foundObject));
5021 case OCLASS_CONSTRAINT:
5022 Assert(foundObject.objectSubId == 0);
5023 if (!list_member_oid(tab->changedConstraintOids,
5024 foundObject.objectId))
5026 char *defstring = pg_get_constraintdef_string(foundObject.objectId);
5029 * Put NORMAL dependencies at the front of the list and
5030 * AUTO dependencies at the back. This makes sure that
5031 * foreign-key constraints depending on this column will
5032 * be dropped before unique or primary-key constraints of
5033 * the column; which we must have because the FK
5034 * constraints depend on the indexes belonging to the
5035 * unique constraints.
5037 if (foundDep->deptype == DEPENDENCY_NORMAL)
5039 tab->changedConstraintOids =
5040 lcons_oid(foundObject.objectId,
5041 tab->changedConstraintOids);
5042 tab->changedConstraintDefs =
5044 tab->changedConstraintDefs);
5048 tab->changedConstraintOids =
5049 lappend_oid(tab->changedConstraintOids,
5050 foundObject.objectId);
5051 tab->changedConstraintDefs =
5052 lappend(tab->changedConstraintDefs,
5058 case OCLASS_REWRITE:
5059 /* XXX someday see if we can cope with revising views */
5061 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5062 errmsg("cannot alter type of a column used by a view or rule"),
5063 errdetail("%s depends on column \"%s\"",
5064 getObjectDescription(&foundObject),
5068 case OCLASS_DEFAULT:
5071 * Ignore the column's default expression, since we will fix
5074 Assert(defaultexpr);
5080 case OCLASS_CONVERSION:
5081 case OCLASS_LANGUAGE:
5082 case OCLASS_OPERATOR:
5083 case OCLASS_OPCLASS:
5084 case OCLASS_TRIGGER:
5088 * We don't expect any of these sorts of objects to depend on
5091 elog(ERROR, "unexpected object depending on column: %s",
5092 getObjectDescription(&foundObject));
5096 elog(ERROR, "unrecognized object class: %u",
5097 foundObject.classId);
5101 systable_endscan(scan);
5104 * Now scan for dependencies of this column on other things. The only
5105 * thing we should find is the dependency on the column datatype, which we
5108 ScanKeyInit(&key[0],
5109 Anum_pg_depend_classid,
5110 BTEqualStrategyNumber, F_OIDEQ,
5111 ObjectIdGetDatum(RelationRelationId));
5112 ScanKeyInit(&key[1],
5113 Anum_pg_depend_objid,
5114 BTEqualStrategyNumber, F_OIDEQ,
5115 ObjectIdGetDatum(RelationGetRelid(rel)));
5116 ScanKeyInit(&key[2],
5117 Anum_pg_depend_objsubid,
5118 BTEqualStrategyNumber, F_INT4EQ,
5119 Int32GetDatum((int32) attnum));
5121 scan = systable_beginscan(depRel, DependDependerIndexId, true,
5122 SnapshotNow, 3, key);
5124 while (HeapTupleIsValid(depTup = systable_getnext(scan)))
5126 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
5128 if (foundDep->deptype != DEPENDENCY_NORMAL)
5129 elog(ERROR, "found unexpected dependency type '%c'",
5131 if (foundDep->refclassid != TypeRelationId ||
5132 foundDep->refobjid != attTup->atttypid)
5133 elog(ERROR, "found unexpected dependency for column");
5135 simple_heap_delete(depRel, &depTup->t_self);
5138 systable_endscan(scan);
5140 heap_close(depRel, RowExclusiveLock);
5143 * Here we go --- change the recorded column type. (Note heapTup is a
5144 * copy of the syscache entry, so okay to scribble on.)
5146 attTup->atttypid = targettype;
5147 attTup->atttypmod = typename->typmod;
5148 attTup->attndims = list_length(typename->arrayBounds);
5149 attTup->attlen = tform->typlen;
5150 attTup->attbyval = tform->typbyval;
5151 attTup->attalign = tform->typalign;
5152 attTup->attstorage = tform->typstorage;
5154 ReleaseSysCache(typeTuple);
5156 simple_heap_update(attrelation, &heapTup->t_self, heapTup);
5158 /* keep system catalog indexes current */
5159 CatalogUpdateIndexes(attrelation, heapTup);
5161 heap_close(attrelation, RowExclusiveLock);
5163 /* Install dependency on new datatype */
5164 add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype);
5167 * Drop any pg_statistic entry for the column, since it's now wrong type
5169 RemoveStatistics(RelationGetRelid(rel), attnum);
5172 * Update the default, if present, by brute force --- remove and re-add
5173 * the default. Probably unsafe to take shortcuts, since the new version
5174 * may well have additional dependencies. (It's okay to do this now,
5175 * rather than after other ALTER TYPE commands, since the default won't
5176 * depend on other column types.)
5180 /* Must make new row visible since it will be updated again */
5181 CommandCounterIncrement();
5184 * We use RESTRICT here for safety, but at present we do not expect
5185 * anything to depend on the default.
5187 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
5189 StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
5193 heap_freetuple(heapTup);
5197 * Cleanup after we've finished all the ALTER TYPE operations for a
5198 * particular relation. We have to drop and recreate all the indexes
5199 * and constraints that depend on the altered columns.
5202 ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
5208 * Re-parse the index and constraint definitions, and attach them to the
5209 * appropriate work queue entries. We do this before dropping because in
5210 * the case of a FOREIGN KEY constraint, we might not yet have exclusive
5211 * lock on the table the constraint is attached to, and we need to get
5212 * that before dropping. It's safe because the parser won't actually look
5213 * at the catalogs to detect the existing entry.
5215 foreach(l, tab->changedIndexDefs)
5216 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5217 foreach(l, tab->changedConstraintDefs)
5218 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5221 * Now we can drop the existing constraints and indexes --- constraints
5222 * first, since some of them might depend on the indexes. In fact, we
5223 * have to delete FOREIGN KEY constraints before UNIQUE constraints,
5224 * but we already ordered the constraint list to ensure that would happen.
5225 * It should be okay to use DROP_RESTRICT here, since nothing else should
5226 * be depending on these objects.
5228 foreach(l, tab->changedConstraintOids)
5230 obj.classId = ConstraintRelationId;
5231 obj.objectId = lfirst_oid(l);
5232 obj.objectSubId = 0;
5233 performDeletion(&obj, DROP_RESTRICT);
5236 foreach(l, tab->changedIndexOids)
5238 obj.classId = RelationRelationId;
5239 obj.objectId = lfirst_oid(l);
5240 obj.objectSubId = 0;
5241 performDeletion(&obj, DROP_RESTRICT);
5245 * The objects will get recreated during subsequent passes over the work
5251 ATPostAlterTypeParse(char *cmd, List **wqueue)
5253 List *raw_parsetree_list;
5254 List *querytree_list;
5255 ListCell *list_item;
5258 * We expect that we only have to do raw parsing and parse analysis, not
5259 * any rule rewriting, since these will all be utility statements.
5261 raw_parsetree_list = raw_parser(cmd);
5262 querytree_list = NIL;
5263 foreach(list_item, raw_parsetree_list)
5265 Node *parsetree = (Node *) lfirst(list_item);
5267 querytree_list = list_concat(querytree_list,
5268 parse_analyze(parsetree, NULL, 0));
5272 * Attach each generated command to the proper place in the work queue.
5273 * Note this could result in creation of entirely new work-queue entries.
5275 foreach(list_item, querytree_list)
5277 Query *query = (Query *) lfirst(list_item);
5279 AlteredTableInfo *tab;
5281 Assert(IsA(query, Query));
5282 Assert(query->commandType == CMD_UTILITY);
5283 switch (nodeTag(query->utilityStmt))
5287 IndexStmt *stmt = (IndexStmt *) query->utilityStmt;
5288 AlterTableCmd *newcmd;
5290 rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5291 tab = ATGetQueueEntry(wqueue, rel);
5292 newcmd = makeNode(AlterTableCmd);
5293 newcmd->subtype = AT_ReAddIndex;
5294 newcmd->def = (Node *) stmt;
5295 tab->subcmds[AT_PASS_OLD_INDEX] =
5296 lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
5297 relation_close(rel, NoLock);
5300 case T_AlterTableStmt:
5302 AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt;
5305 rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5306 tab = ATGetQueueEntry(wqueue, rel);
5307 foreach(lcmd, stmt->cmds)
5309 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
5311 switch (cmd->subtype)
5314 cmd->subtype = AT_ReAddIndex;
5315 tab->subcmds[AT_PASS_OLD_INDEX] =
5316 lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd);
5318 case AT_AddConstraint:
5319 tab->subcmds[AT_PASS_OLD_CONSTR] =
5320 lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd);
5323 elog(ERROR, "unexpected statement type: %d",
5324 (int) cmd->subtype);
5327 relation_close(rel, NoLock);
5331 elog(ERROR, "unexpected statement type: %d",
5332 (int) nodeTag(query->utilityStmt));
5341 * recursing is true if we are recursing from a table to its indexes or
5342 * toast table. We don't allow the ownership of those things to be
5343 * changed separately from the parent table. Also, we can skip permission
5344 * checks (this is necessary not just an optimization, else we'd fail to
5345 * handle toast tables properly).
5348 ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
5350 Relation target_rel;
5353 Form_pg_class tuple_class;
5356 * Get exclusive lock till end of transaction on the target table. Use
5357 * relation_open so that we can work on indexes and sequences.
5359 target_rel = relation_open(relationOid, AccessExclusiveLock);
5361 /* Get its pg_class tuple, too */
5362 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
5364 tuple = SearchSysCache(RELOID,
5365 ObjectIdGetDatum(relationOid),
5367 if (!HeapTupleIsValid(tuple))
5368 elog(ERROR, "cache lookup failed for relation %u", relationOid);
5369 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
5371 /* Can we change the ownership of this tuple? */
5372 switch (tuple_class->relkind)
5374 case RELKIND_RELATION:
5376 case RELKIND_SEQUENCE:
5377 /* ok to change owner */
5383 * Because ALTER INDEX OWNER used to be allowed, and in fact
5384 * is generated by old versions of pg_dump, we give a warning
5385 * and do nothing rather than erroring out. Also, to avoid
5386 * unnecessary chatter while restoring those old dumps, say
5387 * nothing at all if the command would be a no-op anyway.
5389 if (tuple_class->relowner != newOwnerId)
5391 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5392 errmsg("cannot change owner of index \"%s\"",
5393 NameStr(tuple_class->relname)),
5394 errhint("Change the ownership of the index's table, instead.")));
5395 /* quick hack to exit via the no-op path */
5396 newOwnerId = tuple_class->relowner;
5399 case RELKIND_TOASTVALUE:
5405 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5406 errmsg("\"%s\" is not a table, view, or sequence",
5407 NameStr(tuple_class->relname))));
5411 * If the new owner is the same as the existing owner, consider the
5412 * command to have succeeded. This is for dump restoration purposes.
5414 if (tuple_class->relowner != newOwnerId)
5416 Datum repl_val[Natts_pg_class];
5417 char repl_null[Natts_pg_class];
5418 char repl_repl[Natts_pg_class];
5424 /* skip permission checks when recursing to index or toast table */
5427 /* Superusers can always do it */
5430 Oid namespaceOid = tuple_class->relnamespace;
5431 AclResult aclresult;
5433 /* Otherwise, must be owner of the existing object */
5434 if (!pg_class_ownercheck(relationOid, GetUserId()))
5435 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5436 RelationGetRelationName(target_rel));
5438 /* Must be able to become new owner */
5439 check_is_member_of_role(GetUserId(), newOwnerId);
5441 /* New owner must have CREATE privilege on namespace */
5442 aclresult = pg_namespace_aclcheck(namespaceOid, newOwnerId,
5444 if (aclresult != ACLCHECK_OK)
5445 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
5446 get_namespace_name(namespaceOid));
5450 memset(repl_null, ' ', sizeof(repl_null));
5451 memset(repl_repl, ' ', sizeof(repl_repl));
5453 repl_repl[Anum_pg_class_relowner - 1] = 'r';
5454 repl_val[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(newOwnerId);
5457 * Determine the modified ACL for the new owner. This is only
5458 * necessary when the ACL is non-null.
5460 aclDatum = SysCacheGetAttr(RELOID, tuple,
5461 Anum_pg_class_relacl,
5465 newAcl = aclnewowner(DatumGetAclP(aclDatum),
5466 tuple_class->relowner, newOwnerId);
5467 repl_repl[Anum_pg_class_relacl - 1] = 'r';
5468 repl_val[Anum_pg_class_relacl - 1] = PointerGetDatum(newAcl);
5471 newtuple = heap_modifytuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl);
5473 simple_heap_update(class_rel, &newtuple->t_self, newtuple);
5474 CatalogUpdateIndexes(class_rel, newtuple);
5476 heap_freetuple(newtuple);
5478 /* Update owner dependency reference */
5479 changeDependencyOnOwner(RelationRelationId, relationOid, newOwnerId);
5482 * Also change the ownership of the table's rowtype, if it has one
5484 if (tuple_class->relkind != RELKIND_INDEX)
5485 AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId);
5488 * If we are operating on a table, also change the ownership of any
5489 * indexes and sequences that belong to the table, as well as the
5490 * table's toast table (if it has one)
5492 if (tuple_class->relkind == RELKIND_RELATION ||
5493 tuple_class->relkind == RELKIND_TOASTVALUE)
5495 List *index_oid_list;
5498 /* Find all the indexes belonging to this relation */
5499 index_oid_list = RelationGetIndexList(target_rel);
5501 /* For each index, recursively change its ownership */
5502 foreach(i, index_oid_list)
5503 ATExecChangeOwner(lfirst_oid(i), newOwnerId, true);
5505 list_free(index_oid_list);
5508 if (tuple_class->relkind == RELKIND_RELATION)
5510 /* If it has a toast table, recurse to change its ownership */
5511 if (tuple_class->reltoastrelid != InvalidOid)
5512 ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerId,
5515 /* If it has dependent sequences, recurse to change them too */
5516 change_owner_recurse_to_sequences(relationOid, newOwnerId);
5520 ReleaseSysCache(tuple);
5521 heap_close(class_rel, RowExclusiveLock);
5522 relation_close(target_rel, NoLock);
5526 * change_owner_recurse_to_sequences
5528 * Helper function for ATExecChangeOwner. Examines pg_depend searching
5529 * for sequences that are dependent on serial columns, and changes their
5533 change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId)
5541 * SERIAL sequences are those having an internal dependency on one of the
5542 * table's columns (we don't care *which* column, exactly).
5544 depRel = heap_open(DependRelationId, AccessShareLock);
5546 ScanKeyInit(&key[0],
5547 Anum_pg_depend_refclassid,
5548 BTEqualStrategyNumber, F_OIDEQ,
5549 ObjectIdGetDatum(RelationRelationId));
5550 ScanKeyInit(&key[1],
5551 Anum_pg_depend_refobjid,
5552 BTEqualStrategyNumber, F_OIDEQ,
5553 ObjectIdGetDatum(relationOid));
5554 /* we leave refobjsubid unspecified */
5556 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
5557 SnapshotNow, 2, key);
5559 while (HeapTupleIsValid(tup = systable_getnext(scan)))
5561 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
5564 /* skip dependencies other than internal dependencies on columns */
5565 if (depForm->refobjsubid == 0 ||
5566 depForm->classid != RelationRelationId ||
5567 depForm->objsubid != 0 ||
5568 depForm->deptype != DEPENDENCY_INTERNAL)
5571 /* Use relation_open just in case it's an index */
5572 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
5574 /* skip non-sequence relations */
5575 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
5577 /* No need to keep the lock */
5578 relation_close(seqRel, AccessExclusiveLock);
5582 /* We don't need to close the sequence while we alter it. */
5583 ATExecChangeOwner(depForm->objid, newOwnerId, false);
5585 /* Now we can close it. Keep the lock till end of transaction. */
5586 relation_close(seqRel, NoLock);
5589 systable_endscan(scan);
5591 relation_close(depRel, AccessShareLock);
5595 * ALTER TABLE CLUSTER ON
5597 * The only thing we have to do is to change the indisclustered bits.
5600 ATExecClusterOn(Relation rel, const char *indexName)
5604 indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
5606 if (!OidIsValid(indexOid))
5608 (errcode(ERRCODE_UNDEFINED_OBJECT),
5609 errmsg("index \"%s\" for table \"%s\" does not exist",
5610 indexName, RelationGetRelationName(rel))));
5612 /* Check index is valid to cluster on */
5613 check_index_is_clusterable(rel, indexOid, false);
5615 /* And do the work */
5616 mark_index_clustered(rel, indexOid);
5620 * ALTER TABLE SET WITHOUT CLUSTER
5622 * We have to find any indexes on the table that have indisclustered bit
5623 * set and turn it off.
5626 ATExecDropCluster(Relation rel)
5628 mark_index_clustered(rel, InvalidOid);
5632 * ALTER TABLE SET TABLESPACE
5635 ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
5638 AclResult aclresult;
5641 * We do our own permission checking because we want to allow this on
5644 if (rel->rd_rel->relkind != RELKIND_RELATION &&
5645 rel->rd_rel->relkind != RELKIND_INDEX)
5647 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5648 errmsg("\"%s\" is not a table or index",
5649 RelationGetRelationName(rel))));
5651 /* Permissions checks */
5652 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
5653 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5654 RelationGetRelationName(rel));
5656 if (!allowSystemTableMods && IsSystemRelation(rel))
5658 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5659 errmsg("permission denied: \"%s\" is a system catalog",
5660 RelationGetRelationName(rel))));
5662 /* Check that the tablespace exists */
5663 tablespaceId = get_tablespace_oid(tablespacename);
5664 if (!OidIsValid(tablespaceId))
5666 (errcode(ERRCODE_UNDEFINED_OBJECT),
5667 errmsg("tablespace \"%s\" does not exist", tablespacename)));
5669 /* Check its permissions */
5670 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
5671 if (aclresult != ACLCHECK_OK)
5672 aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
5674 /* Save info for Phase 3 to do the real work */
5675 if (OidIsValid(tab->newTableSpace))
5677 (errcode(ERRCODE_SYNTAX_ERROR),
5678 errmsg("cannot have multiple SET TABLESPACE subcommands")));
5679 tab->newTableSpace = tablespaceId;
5683 * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
5684 * rewriting to be done, so we just want to copy the data as fast as possible.
5687 ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
5693 RelFileNode newrnode;
5694 SMgrRelation dstrel;
5697 Form_pg_class rd_rel;
5699 rel = relation_open(tableOid, NoLock);
5702 * We can never allow moving of shared or nailed-in-cache relations,
5703 * because we can't support changing their reltablespace values.
5705 if (rel->rd_rel->relisshared || rel->rd_isnailed)
5707 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5708 errmsg("cannot move system relation \"%s\"",
5709 RelationGetRelationName(rel))));
5712 * Don't allow moving temp tables of other backends ... their local buffer
5713 * manager is not going to cope.
5715 if (isOtherTempNamespace(RelationGetNamespace(rel)))
5717 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5718 errmsg("cannot move temporary tables of other sessions")));
5721 * No work if no change in tablespace.
5723 oldTableSpace = rel->rd_rel->reltablespace;
5724 if (newTableSpace == oldTableSpace ||
5725 (newTableSpace == MyDatabaseTableSpace && oldTableSpace == 0))
5727 relation_close(rel, NoLock);
5731 reltoastrelid = rel->rd_rel->reltoastrelid;
5732 reltoastidxid = rel->rd_rel->reltoastidxid;
5734 /* Get a modifiable copy of the relation's pg_class row */
5735 pg_class = heap_open(RelationRelationId, RowExclusiveLock);
5737 tuple = SearchSysCacheCopy(RELOID,
5738 ObjectIdGetDatum(tableOid),
5740 if (!HeapTupleIsValid(tuple))
5741 elog(ERROR, "cache lookup failed for relation %u", tableOid);
5742 rd_rel = (Form_pg_class) GETSTRUCT(tuple);
5744 /* create another storage file. Is it a little ugly ? */
5745 /* NOTE: any conflict in relfilenode value will be caught here */
5746 newrnode = rel->rd_node;
5747 newrnode.spcNode = newTableSpace;
5749 dstrel = smgropen(newrnode);
5750 smgrcreate(dstrel, rel->rd_istemp, false);
5752 /* copy relation data to the new physical file */
5753 copy_relation_data(rel, dstrel);
5755 /* schedule unlinking old physical file */
5756 RelationOpenSmgr(rel);
5757 smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
5760 * Now drop smgr references. The source was already dropped by
5761 * smgrscheduleunlink.
5765 /* update the pg_class row */
5766 rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
5767 simple_heap_update(pg_class, &tuple->t_self, tuple);
5768 CatalogUpdateIndexes(pg_class, tuple);
5770 heap_freetuple(tuple);
5772 heap_close(pg_class, RowExclusiveLock);
5774 relation_close(rel, NoLock);
5776 /* Make sure the reltablespace change is visible */
5777 CommandCounterIncrement();
5779 /* Move associated toast relation and/or index, too */
5780 if (OidIsValid(reltoastrelid))
5781 ATExecSetTableSpace(reltoastrelid, newTableSpace);
5782 if (OidIsValid(reltoastidxid))
5783 ATExecSetTableSpace(reltoastidxid, newTableSpace);
5787 * Copy data, block by block
5790 copy_relation_data(Relation rel, SMgrRelation dst)
5794 BlockNumber nblocks;
5797 Page page = (Page) buf;
5800 * Since we copy the file directly without looking at the shared buffers,
5801 * we'd better first flush out any pages of the source relation that are
5802 * in shared buffers. We assume no new changes will be made while we are
5803 * holding exclusive lock on the rel.
5805 FlushRelationBuffers(rel);
5808 * We need to log the copied data in WAL iff WAL archiving is enabled AND
5809 * it's not a temp rel.
5811 use_wal = XLogArchivingActive() && !rel->rd_istemp;
5813 nblocks = RelationGetNumberOfBlocks(rel);
5814 /* RelationGetNumberOfBlocks will certainly have opened rd_smgr */
5817 for (blkno = 0; blkno < nblocks; blkno++)
5819 smgrread(src, blkno, buf);
5824 xl_heap_newpage xlrec;
5826 XLogRecData rdata[2];
5828 /* NO ELOG(ERROR) from here till newpage op is logged */
5829 START_CRIT_SECTION();
5831 xlrec.node = dst->smgr_rnode;
5832 xlrec.blkno = blkno;
5834 rdata[0].data = (char *) &xlrec;
5835 rdata[0].len = SizeOfHeapNewpage;
5836 rdata[0].buffer = InvalidBuffer;
5837 rdata[0].next = &(rdata[1]);
5839 rdata[1].data = (char *) page;
5840 rdata[1].len = BLCKSZ;
5841 rdata[1].buffer = InvalidBuffer;
5842 rdata[1].next = NULL;
5844 recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
5846 PageSetLSN(page, recptr);
5847 PageSetTLI(page, ThisTimeLineID);
5853 * Now write the page. We say isTemp = true even if it's not a temp
5854 * rel, because there's no need for smgr to schedule an fsync for this
5855 * write; we'll do it ourselves below.
5857 smgrwrite(dst, blkno, buf, true);
5861 * If the rel isn't temp, we must fsync it down to disk before it's safe
5862 * to commit the transaction. (For a temp rel we don't care since the rel
5863 * will be uninteresting after a crash anyway.)
5865 * It's obvious that we must do this when not WAL-logging the copy. It's
5866 * less obvious that we have to do it even if we did WAL-log the copied
5867 * pages. The reason is that since we're copying outside shared buffers, a
5868 * CHECKPOINT occurring during the copy has no way to flush the previously
5869 * written data to disk (indeed it won't know the new rel even exists). A
5870 * crash later on would replay WAL from the checkpoint, therefore it
5871 * wouldn't replay our earlier WAL entries. If we do not fsync those pages
5872 * here, they might still not be on disk when the crash occurs.
5874 if (!rel->rd_istemp)
5879 * ALTER TABLE ENABLE/DISABLE TRIGGER
5881 * We just pass this off to trigger.c.
5884 ATExecEnableDisableTrigger(Relation rel, char *trigname,
5885 bool enable, bool skip_system)
5887 EnableDisableTrigger(rel, trigname, enable, skip_system);
5891 * ALTER TABLE CREATE TOAST TABLE
5893 * Note: this is also invoked from outside this module; in such cases we
5894 * expect the caller to have verified that the relation is a table and we
5895 * have all the right permissions. Callers expect this function
5896 * to end with CommandCounterIncrement if it makes any changes.
5899 AlterTableCreateToastTable(Oid relOid, bool silent)
5904 bool shared_relation;
5908 char toast_relname[NAMEDATALEN];
5909 char toast_idxname[NAMEDATALEN];
5910 IndexInfo *indexInfo;
5911 Oid classObjectId[2];
5912 ObjectAddress baseobject,
5916 * Grab an exclusive lock on the target table, which we will NOT release
5917 * until end of transaction. (This is probably redundant in all present
5920 rel = heap_open(relOid, AccessExclusiveLock);
5923 * Toast table is shared if and only if its parent is.
5925 * We cannot allow toasting a shared relation after initdb (because
5926 * there's no way to mark it toasted in other databases' pg_class).
5927 * Unfortunately we can't distinguish initdb from a manually started
5928 * standalone backend (toasting happens after the bootstrap phase, so
5929 * checking IsBootstrapProcessingMode() won't work). However, we can at
5930 * least prevent this mistake under normal multi-user operation.
5932 shared_relation = rel->rd_rel->relisshared;
5933 if (shared_relation && IsUnderPostmaster)
5935 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5936 errmsg("shared tables cannot be toasted after initdb")));
5939 * Is it already toasted?
5941 if (rel->rd_rel->reltoastrelid != InvalidOid)
5945 heap_close(rel, NoLock);
5950 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5951 errmsg("table \"%s\" already has a TOAST table",
5952 RelationGetRelationName(rel))));
5956 * Check to see whether the table actually needs a TOAST table.
5958 if (!needs_toast_table(rel))
5962 heap_close(rel, NoLock);
5967 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5968 errmsg("table \"%s\" does not need a TOAST table",
5969 RelationGetRelationName(rel))));
5973 * Create the toast table and its index
5975 snprintf(toast_relname, sizeof(toast_relname),
5976 "pg_toast_%u", relOid);
5977 snprintf(toast_idxname, sizeof(toast_idxname),
5978 "pg_toast_%u_index", relOid);
5980 /* this is pretty painful... need a tuple descriptor */
5981 tupdesc = CreateTemplateTupleDesc(3, false);
5982 TupleDescInitEntry(tupdesc, (AttrNumber) 1,
5986 TupleDescInitEntry(tupdesc, (AttrNumber) 2,
5990 TupleDescInitEntry(tupdesc, (AttrNumber) 3,
5996 * Ensure that the toast table doesn't itself get toasted, or we'll be
5997 * toast :-(. This is essential for chunk_data because type bytea is
5998 * toastable; hit the other two just to be sure.
6000 tupdesc->attrs[0]->attstorage = 'p';
6001 tupdesc->attrs[1]->attstorage = 'p';
6002 tupdesc->attrs[2]->attstorage = 'p';
6005 * Note: the toast relation is placed in the regular pg_toast namespace
6006 * even if its master relation is a temp table. There cannot be any
6007 * naming collision, and the toast rel will be destroyed when its master
6008 * is, so there's no need to handle the toast rel as temp.
6010 toast_relid = heap_create_with_catalog(toast_relname,
6012 rel->rd_rel->reltablespace,
6014 rel->rd_rel->relowner,
6023 /* make the toast relation visible, else index creation will fail */
6024 CommandCounterIncrement();
6027 * Create unique index on chunk_id, chunk_seq.
6029 * NOTE: the normal TOAST access routines could actually function with a
6030 * single-column index on chunk_id only. However, the slice access
6031 * routines use both columns for faster access to an individual chunk. In
6032 * addition, we want it to be unique as a check against the possibility of
6033 * duplicate TOAST chunk OIDs. The index might also be a little more
6034 * efficient this way, since btree isn't all that happy with large numbers
6038 indexInfo = makeNode(IndexInfo);
6039 indexInfo->ii_NumIndexAttrs = 2;
6040 indexInfo->ii_KeyAttrNumbers[0] = 1;
6041 indexInfo->ii_KeyAttrNumbers[1] = 2;
6042 indexInfo->ii_Expressions = NIL;
6043 indexInfo->ii_ExpressionsState = NIL;
6044 indexInfo->ii_Predicate = NIL;
6045 indexInfo->ii_PredicateState = NIL;
6046 indexInfo->ii_Unique = true;
6048 classObjectId[0] = OID_BTREE_OPS_OID;
6049 classObjectId[1] = INT4_BTREE_OPS_OID;
6051 toast_idxid = index_create(toast_relid, toast_idxname, InvalidOid,
6054 rel->rd_rel->reltablespace,
6056 true, false, true, false);
6059 * Update toast rel's pg_class entry to show that it has an index. The
6060 * index OID is stored into the reltoastidxid field for easy access by the
6063 setRelhasindex(toast_relid, true, true, toast_idxid);
6066 * Store the toast table's OID in the parent relation's pg_class row
6068 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
6070 reltup = SearchSysCacheCopy(RELOID,
6071 ObjectIdGetDatum(relOid),
6073 if (!HeapTupleIsValid(reltup))
6074 elog(ERROR, "cache lookup failed for relation %u", relOid);
6076 ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
6078 simple_heap_update(class_rel, &reltup->t_self, reltup);
6080 /* Keep catalog indexes current */
6081 CatalogUpdateIndexes(class_rel, reltup);
6083 heap_freetuple(reltup);
6085 heap_close(class_rel, RowExclusiveLock);
6088 * Register dependency from the toast table to the master, so that the
6089 * toast table will be deleted if the master is.
6091 baseobject.classId = RelationRelationId;
6092 baseobject.objectId = relOid;
6093 baseobject.objectSubId = 0;
6094 toastobject.classId = RelationRelationId;
6095 toastobject.objectId = toast_relid;
6096 toastobject.objectSubId = 0;
6098 recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
6101 * Clean up and make changes visible
6103 heap_close(rel, NoLock);
6105 CommandCounterIncrement();
6109 * Check to see whether the table needs a TOAST table. It does only if
6110 * (1) there are any toastable attributes, and (2) the maximum length
6111 * of a tuple could exceed TOAST_TUPLE_THRESHOLD. (We don't want to
6112 * create a toast table for something like "f1 varchar(20)".)
6115 needs_toast_table(Relation rel)
6117 int32 data_length = 0;
6118 bool maxlength_unknown = false;
6119 bool has_toastable_attrs = false;
6121 Form_pg_attribute *att;
6125 tupdesc = rel->rd_att;
6126 att = tupdesc->attrs;
6128 for (i = 0; i < tupdesc->natts; i++)
6130 if (att[i]->attisdropped)
6132 data_length = att_align(data_length, att[i]->attalign);
6133 if (att[i]->attlen > 0)
6135 /* Fixed-length types are never toastable */
6136 data_length += att[i]->attlen;
6140 int32 maxlen = type_maximum_size(att[i]->atttypid,
6144 maxlength_unknown = true;
6146 data_length += maxlen;
6147 if (att[i]->attstorage != 'p')
6148 has_toastable_attrs = true;
6151 if (!has_toastable_attrs)
6152 return false; /* nothing to toast? */
6153 if (maxlength_unknown)
6154 return true; /* any unlimited-length attrs? */
6155 tuple_length = MAXALIGN(offsetof(HeapTupleHeaderData, t_bits) +
6156 BITMAPLEN(tupdesc->natts)) +
6157 MAXALIGN(data_length);
6158 return (tuple_length > TOAST_TUPLE_THRESHOLD);
6163 * Execute ALTER TABLE SET SCHEMA
6165 * Note: caller must have checked ownership of the relation already
6168 AlterTableNamespace(RangeVar *relation, const char *newschema)
6176 rel = heap_openrv(relation, AccessExclusiveLock);
6178 /* heap_openrv allows TOAST, but we don't want to */
6179 if (rel->rd_rel->relkind == RELKIND_TOASTVALUE)
6181 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
6182 errmsg("\"%s\" is a TOAST relation",
6183 RelationGetRelationName(rel))));
6185 relid = RelationGetRelid(rel);
6186 oldNspOid = RelationGetNamespace(rel);
6188 /* get schema OID and check its permissions */
6189 nspOid = LookupCreationNamespace(newschema);
6191 if (oldNspOid == nspOid)
6193 (errcode(ERRCODE_DUPLICATE_TABLE),
6194 errmsg("relation \"%s\" is already in schema \"%s\"",
6195 RelationGetRelationName(rel),
6198 /* disallow renaming into or out of temp schemas */
6199 if (isAnyTempNamespace(nspOid) || isAnyTempNamespace(oldNspOid))
6201 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6202 errmsg("cannot move objects into or out of temporary schemas")));
6204 /* same for TOAST schema */
6205 if (nspOid == PG_TOAST_NAMESPACE || oldNspOid == PG_TOAST_NAMESPACE)
6207 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6208 errmsg("cannot move objects into or out of TOAST schema")));
6210 /* OK, modify the pg_class row and pg_depend entry */
6211 classRel = heap_open(RelationRelationId, RowExclusiveLock);
6213 AlterRelationNamespaceInternal(classRel, relid, oldNspOid, nspOid, true);
6215 /* Fix the table's rowtype too */
6216 AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false);
6218 /* Fix other dependent stuff */
6219 if (rel->rd_rel->relkind == RELKIND_RELATION)
6221 AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
6222 AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema);
6223 AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
6226 heap_close(classRel, RowExclusiveLock);
6228 /* close rel, but keep lock until commit */
6229 relation_close(rel, NoLock);
6233 * The guts of relocating a relation to another namespace: fix the pg_class
6234 * entry, and the pg_depend entry if any. Caller must already have
6235 * opened and write-locked pg_class.
6238 AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
6239 Oid oldNspOid, Oid newNspOid,
6240 bool hasDependEntry)
6243 Form_pg_class classForm;
6245 classTup = SearchSysCacheCopy(RELOID,
6246 ObjectIdGetDatum(relOid),
6248 if (!HeapTupleIsValid(classTup))
6249 elog(ERROR, "cache lookup failed for relation %u", relOid);
6250 classForm = (Form_pg_class) GETSTRUCT(classTup);
6252 Assert(classForm->relnamespace == oldNspOid);
6254 /* check for duplicate name (more friendly than unique-index failure) */
6255 if (get_relname_relid(NameStr(classForm->relname),
6256 newNspOid) != InvalidOid)
6258 (errcode(ERRCODE_DUPLICATE_TABLE),
6259 errmsg("relation \"%s\" already exists in schema \"%s\"",
6260 NameStr(classForm->relname),
6261 get_namespace_name(newNspOid))));
6263 /* classTup is a copy, so OK to scribble on */
6264 classForm->relnamespace = newNspOid;
6266 simple_heap_update(classRel, &classTup->t_self, classTup);
6267 CatalogUpdateIndexes(classRel, classTup);
6269 /* Update dependency on schema if caller said so */
6270 if (hasDependEntry &&
6271 changeDependencyFor(RelationRelationId, relOid,
6272 NamespaceRelationId, oldNspOid, newNspOid) != 1)
6273 elog(ERROR, "failed to change schema dependency for relation \"%s\"",
6274 NameStr(classForm->relname));
6276 heap_freetuple(classTup);
6280 * Move all indexes for the specified relation to another namespace.
6282 * Note: we assume adequate permission checking was done by the caller,
6283 * and that the caller has a suitable lock on the owning relation.
6286 AlterIndexNamespaces(Relation classRel, Relation rel,
6287 Oid oldNspOid, Oid newNspOid)
6292 indexList = RelationGetIndexList(rel);
6294 foreach(l, indexList)
6296 Oid indexOid = lfirst_oid(l);
6299 * Note: currently, the index will not have its own dependency on the
6300 * namespace, so we don't need to do changeDependencyFor(). There's no
6301 * rowtype in pg_type, either.
6303 AlterRelationNamespaceInternal(classRel, indexOid,
6304 oldNspOid, newNspOid,
6308 list_free(indexList);
6312 * Move all SERIAL-column sequences of the specified relation to another
6315 * Note: we assume adequate permission checking was done by the caller,
6316 * and that the caller has a suitable lock on the owning relation.
6319 AlterSeqNamespaces(Relation classRel, Relation rel,
6320 Oid oldNspOid, Oid newNspOid, const char *newNspName)
6328 * SERIAL sequences are those having an internal dependency on one of the
6329 * table's columns (we don't care *which* column, exactly).
6331 depRel = heap_open(DependRelationId, AccessShareLock);
6333 ScanKeyInit(&key[0],
6334 Anum_pg_depend_refclassid,
6335 BTEqualStrategyNumber, F_OIDEQ,
6336 ObjectIdGetDatum(RelationRelationId));
6337 ScanKeyInit(&key[1],
6338 Anum_pg_depend_refobjid,
6339 BTEqualStrategyNumber, F_OIDEQ,
6340 ObjectIdGetDatum(RelationGetRelid(rel)));
6341 /* we leave refobjsubid unspecified */
6343 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
6344 SnapshotNow, 2, key);
6346 while (HeapTupleIsValid(tup = systable_getnext(scan)))
6348 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
6351 /* skip dependencies other than internal dependencies on columns */
6352 if (depForm->refobjsubid == 0 ||
6353 depForm->classid != RelationRelationId ||
6354 depForm->objsubid != 0 ||
6355 depForm->deptype != DEPENDENCY_INTERNAL)
6358 /* Use relation_open just in case it's an index */
6359 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
6361 /* skip non-sequence relations */
6362 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
6364 /* No need to keep the lock */
6365 relation_close(seqRel, AccessExclusiveLock);
6369 /* Fix the pg_class and pg_depend entries */
6370 AlterRelationNamespaceInternal(classRel, depForm->objid,
6371 oldNspOid, newNspOid,
6375 * Sequences have entries in pg_type. We need to be careful to move
6376 * them to the new namespace, too.
6378 AlterTypeNamespaceInternal(RelationGetForm(seqRel)->reltype,
6381 /* Now we can close it. Keep the lock till end of transaction. */
6382 relation_close(seqRel, NoLock);
6385 systable_endscan(scan);
6387 relation_close(depRel, AccessShareLock);
6392 * This code supports
6393 * CREATE TEMP TABLE ... ON COMMIT { DROP | PRESERVE ROWS | DELETE ROWS }
6395 * Because we only support this for TEMP tables, it's sufficient to remember
6396 * the state in a backend-local data structure.
6400 * Register a newly-created relation's ON COMMIT action.
6403 register_on_commit_action(Oid relid, OnCommitAction action)
6406 MemoryContext oldcxt;
6409 * We needn't bother registering the relation unless there is an ON COMMIT
6410 * action we need to take.
6412 if (action == ONCOMMIT_NOOP || action == ONCOMMIT_PRESERVE_ROWS)
6415 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
6417 oc = (OnCommitItem *) palloc(sizeof(OnCommitItem));
6419 oc->oncommit = action;
6420 oc->creating_subid = GetCurrentSubTransactionId();
6421 oc->deleting_subid = InvalidSubTransactionId;
6423 on_commits = lcons(oc, on_commits);
6425 MemoryContextSwitchTo(oldcxt);
6429 * Unregister any ON COMMIT action when a relation is deleted.
6431 * Actually, we only mark the OnCommitItem entry as to be deleted after commit.
6434 remove_on_commit_action(Oid relid)
6438 foreach(l, on_commits)
6440 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6442 if (oc->relid == relid)
6444 oc->deleting_subid = GetCurrentSubTransactionId();
6451 * Perform ON COMMIT actions.
6453 * This is invoked just before actually committing, since it's possible
6454 * to encounter errors.
6457 PreCommit_on_commit_actions(void)
6460 List *oids_to_truncate = NIL;
6462 foreach(l, on_commits)
6464 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6466 /* Ignore entry if already dropped in this xact */
6467 if (oc->deleting_subid != InvalidSubTransactionId)
6470 switch (oc->oncommit)
6473 case ONCOMMIT_PRESERVE_ROWS:
6474 /* Do nothing (there shouldn't be such entries, actually) */
6476 case ONCOMMIT_DELETE_ROWS:
6477 oids_to_truncate = lappend_oid(oids_to_truncate, oc->relid);
6481 ObjectAddress object;
6483 object.classId = RelationRelationId;
6484 object.objectId = oc->relid;
6485 object.objectSubId = 0;
6486 performDeletion(&object, DROP_CASCADE);
6489 * Note that table deletion will call
6490 * remove_on_commit_action, so the entry should get marked
6493 Assert(oc->deleting_subid != InvalidSubTransactionId);
6498 if (oids_to_truncate != NIL)
6500 heap_truncate(oids_to_truncate);
6501 CommandCounterIncrement(); /* XXX needed? */
6506 * Post-commit or post-abort cleanup for ON COMMIT management.
6508 * All we do here is remove no-longer-needed OnCommitItem entries.
6510 * During commit, remove entries that were deleted during this transaction;
6511 * during abort, remove those created during this transaction.
6514 AtEOXact_on_commit_actions(bool isCommit)
6517 ListCell *prev_item;
6520 cur_item = list_head(on_commits);
6522 while (cur_item != NULL)
6524 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6526 if (isCommit ? oc->deleting_subid != InvalidSubTransactionId :
6527 oc->creating_subid != InvalidSubTransactionId)
6529 /* cur_item must be removed */
6530 on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6533 cur_item = lnext(prev_item);
6535 cur_item = list_head(on_commits);
6539 /* cur_item must be preserved */
6540 oc->creating_subid = InvalidSubTransactionId;
6541 oc->deleting_subid = InvalidSubTransactionId;
6542 prev_item = cur_item;
6543 cur_item = lnext(prev_item);
6549 * Post-subcommit or post-subabort cleanup for ON COMMIT management.
6551 * During subabort, we can immediately remove entries created during this
6552 * subtransaction. During subcommit, just relabel entries marked during
6553 * this subtransaction as being the parent's responsibility.
6556 AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
6557 SubTransactionId parentSubid)
6560 ListCell *prev_item;
6563 cur_item = list_head(on_commits);
6565 while (cur_item != NULL)
6567 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6569 if (!isCommit && oc->creating_subid == mySubid)
6571 /* cur_item must be removed */
6572 on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6575 cur_item = lnext(prev_item);
6577 cur_item = list_head(on_commits);
6581 /* cur_item must be preserved */
6582 if (oc->creating_subid == mySubid)
6583 oc->creating_subid = parentSubid;
6584 if (oc->deleting_subid == mySubid)
6585 oc->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId;
6586 prev_item = cur_item;
6587 cur_item = lnext(prev_item);