]> granicus.if.org Git - postgresql/blob - src/backend/commands/tablecmds.c
Dept. of second thoughts: rejigger the TRUNCATE ... CASCADE patch so that
[postgresql] / src / backend / commands / tablecmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * tablecmds.c
4  *        Commands for creating and altering table structures and settings
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.179 2006/03/03 18:25:14 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/tuptoaster.h"
19 #include "catalog/catalog.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_constraint.h"
26 #include "catalog/pg_depend.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_namespace.h"
29 #include "catalog/pg_opclass.h"
30 #include "catalog/pg_trigger.h"
31 #include "catalog/pg_type.h"
32 #include "commands/cluster.h"
33 #include "commands/defrem.h"
34 #include "commands/tablecmds.h"
35 #include "commands/tablespace.h"
36 #include "commands/trigger.h"
37 #include "commands/typecmds.h"
38 #include "executor/executor.h"
39 #include "lib/stringinfo.h"
40 #include "miscadmin.h"
41 #include "nodes/makefuncs.h"
42 #include "optimizer/clauses.h"
43 #include "optimizer/plancat.h"
44 #include "optimizer/prep.h"
45 #include "parser/analyze.h"
46 #include "parser/gramparse.h"
47 #include "parser/parser.h"
48 #include "parser/parse_clause.h"
49 #include "parser/parse_coerce.h"
50 #include "parser/parse_expr.h"
51 #include "parser/parse_oper.h"
52 #include "parser/parse_relation.h"
53 #include "parser/parse_type.h"
54 #include "rewrite/rewriteHandler.h"
55 #include "storage/smgr.h"
56 #include "utils/acl.h"
57 #include "utils/builtins.h"
58 #include "utils/fmgroids.h"
59 #include "utils/inval.h"
60 #include "utils/lsyscache.h"
61 #include "utils/memutils.h"
62 #include "utils/relcache.h"
63 #include "utils/syscache.h"
64
65
66 /*
67  * ON COMMIT action list
68  */
69 typedef struct OnCommitItem
70 {
71         Oid                     relid;                  /* relid of relation */
72         OnCommitAction oncommit;        /* what to do at end of xact */
73
74         /*
75          * If this entry was created during the current transaction,
76          * creating_subid is the ID of the creating subxact; if created in a prior
77          * transaction, creating_subid is zero.  If deleted during the current
78          * transaction, deleting_subid is the ID of the deleting subxact; if no
79          * deletion request is pending, deleting_subid is zero.
80          */
81         SubTransactionId creating_subid;
82         SubTransactionId deleting_subid;
83 } OnCommitItem;
84
85 static List *on_commits = NIL;
86
87
88 /*
89  * State information for ALTER TABLE
90  *
91  * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo
92  * structs, one for each table modified by the operation (the named table
93  * plus any child tables that are affected).  We save lists of subcommands
94  * to apply to this table (possibly modified by parse transformation steps);
95  * these lists will be executed in Phase 2.  If a Phase 3 step is needed,
96  * necessary information is stored in the constraints and newvals lists.
97  *
98  * Phase 2 is divided into multiple passes; subcommands are executed in
99  * a pass determined by subcommand type.
100  */
101
102 #define AT_PASS_DROP                    0               /* DROP (all flavors) */
103 #define AT_PASS_ALTER_TYPE              1               /* ALTER COLUMN TYPE */
104 #define AT_PASS_OLD_INDEX               2               /* re-add existing indexes */
105 #define AT_PASS_OLD_CONSTR              3               /* re-add existing constraints */
106 #define AT_PASS_COL_ATTRS               4               /* set other column attributes */
107 /* We could support a RENAME COLUMN pass here, but not currently used */
108 #define AT_PASS_ADD_COL                 5               /* ADD COLUMN */
109 #define AT_PASS_ADD_INDEX               6               /* ADD indexes */
110 #define AT_PASS_ADD_CONSTR              7               /* ADD constraints, defaults */
111 #define AT_PASS_MISC                    8               /* other stuff */
112 #define AT_NUM_PASSES                   9
113
114 typedef struct AlteredTableInfo
115 {
116         /* Information saved before any work commences: */
117         Oid                     relid;                  /* Relation to work on */
118         char            relkind;                /* Its relkind */
119         TupleDesc       oldDesc;                /* Pre-modification tuple descriptor */
120         /* Information saved by Phase 1 for Phase 2: */
121         List       *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
122         /* Information saved by Phases 1/2 for Phase 3: */
123         List       *constraints;        /* List of NewConstraint */
124         List       *newvals;            /* List of NewColumnValue */
125         Oid                     newTableSpace;  /* new tablespace; 0 means no change */
126         /* Objects to rebuild after completing ALTER TYPE operations */
127         List       *changedConstraintOids;      /* OIDs of constraints to rebuild */
128         List       *changedConstraintDefs;      /* string definitions of same */
129         List       *changedIndexOids;           /* OIDs of indexes to rebuild */
130         List       *changedIndexDefs;           /* string definitions of same */
131 } AlteredTableInfo;
132
133 /* Struct describing one new constraint to check in Phase 3 scan */
134 typedef struct NewConstraint
135 {
136         char       *name;                       /* Constraint name, or NULL if none */
137         ConstrType      contype;                /* CHECK, NOT_NULL, or FOREIGN */
138         AttrNumber      attnum;                 /* only relevant for NOT_NULL */
139         Oid                     refrelid;               /* PK rel, if FOREIGN */
140         Node       *qual;                       /* Check expr or FkConstraint struct */
141         List       *qualstate;          /* Execution state for CHECK */
142 } NewConstraint;
143
144 /*
145  * Struct describing one new column value that needs to be computed during
146  * Phase 3 copy (this could be either a new column with a non-null default, or
147  * a column that we're changing the type of).  Columns without such an entry
148  * are just copied from the old table during ATRewriteTable.  Note that the
149  * expr is an expression over *old* table values.
150  */
151 typedef struct NewColumnValue
152 {
153         AttrNumber      attnum;                 /* which column */
154         Expr       *expr;                       /* expression to compute */
155         ExprState  *exprstate;          /* execution state */
156 } NewColumnValue;
157
158
159 static void truncate_check_rel(Relation rel);
160 static List *MergeAttributes(List *schema, List *supers, bool istemp,
161                                 List **supOids, List **supconstr, int *supOidCount);
162 static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
163 static void StoreCatalogInheritance(Oid relationId, List *supers);
164 static int      findAttrByName(const char *attributeName, List *schema);
165 static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
166 static bool needs_toast_table(Relation rel);
167 static void AlterIndexNamespaces(Relation classRel, Relation rel,
168                                          Oid oldNspOid, Oid newNspOid);
169 static void AlterSeqNamespaces(Relation classRel, Relation rel,
170                                    Oid oldNspOid, Oid newNspOid,
171                                    const char *newNspName);
172 static int transformColumnNameList(Oid relId, List *colList,
173                                                 int16 *attnums, Oid *atttypids);
174 static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
175                                                    List **attnamelist,
176                                                    int16 *attnums, Oid *atttypids,
177                                                    Oid *opclasses);
178 static Oid transformFkeyCheckAttrs(Relation pkrel,
179                                                 int numattrs, int16 *attnums,
180                                                 Oid *opclasses);
181 static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
182                                                          Relation rel, Relation pkrel);
183 static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
184                                                  Oid constrOid);
185 static char *fkMatchTypeToString(char match_type);
186 static void ATController(Relation rel, List *cmds, bool recurse);
187 static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
188                   bool recurse, bool recursing);
189 static void ATRewriteCatalogs(List **wqueue);
190 static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
191 static void ATRewriteTables(List **wqueue);
192 static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
193 static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
194 static void ATSimplePermissions(Relation rel, bool allowView);
195 static void ATSimpleRecursion(List **wqueue, Relation rel,
196                                   AlterTableCmd *cmd, bool recurse);
197 static void ATOneLevelRecursion(List **wqueue, Relation rel,
198                                         AlterTableCmd *cmd);
199 static void find_composite_type_dependencies(Oid typeOid,
200                                                                  const char *origTblName);
201 static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
202                                 AlterTableCmd *cmd);
203 static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
204                                 ColumnDef *colDef);
205 static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
206 static void add_column_support_dependency(Oid relid, int32 attnum,
207                                                           RangeVar *support);
208 static void ATExecDropNotNull(Relation rel, const char *colName);
209 static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
210                                  const char *colName);
211 static void ATExecColumnDefault(Relation rel, const char *colName,
212                                         Node *newDefault);
213 static void ATPrepSetStatistics(Relation rel, const char *colName,
214                                         Node *flagValue);
215 static void ATExecSetStatistics(Relation rel, const char *colName,
216                                         Node *newValue);
217 static void ATExecSetStorage(Relation rel, const char *colName,
218                                  Node *newValue);
219 static void ATExecDropColumn(Relation rel, const char *colName,
220                                  DropBehavior behavior,
221                                  bool recurse, bool recursing);
222 static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
223                            IndexStmt *stmt, bool is_rebuild);
224 static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
225                                         Node *newConstraint);
226 static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
227                                                   FkConstraint *fkconstraint);
228 static void ATPrepDropConstraint(List **wqueue, Relation rel,
229                                          bool recurse, AlterTableCmd *cmd);
230 static void ATExecDropConstraint(Relation rel, const char *constrName,
231                                          DropBehavior behavior, bool quiet);
232 static void ATPrepAlterColumnType(List **wqueue,
233                                           AlteredTableInfo *tab, Relation rel,
234                                           bool recurse, bool recursing,
235                                           AlterTableCmd *cmd);
236 static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
237                                           const char *colName, TypeName *typename);
238 static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
239 static void ATPostAlterTypeParse(char *cmd, List **wqueue);
240 static void change_owner_recurse_to_sequences(Oid relationOid,
241                                                                   Oid newOwnerId);
242 static void ATExecClusterOn(Relation rel, const char *indexName);
243 static void ATExecDropCluster(Relation rel);
244 static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
245                                         char *tablespacename);
246 static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
247 static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
248                                                    bool enable, bool skip_system);
249 static void copy_relation_data(Relation rel, SMgrRelation dst);
250 static void update_ri_trigger_args(Oid relid,
251                                            const char *oldname,
252                                            const char *newname,
253                                            bool fk_scan,
254                                            bool update_relname);
255
256
257 /* ----------------------------------------------------------------
258  *              DefineRelation
259  *                              Creates a new relation.
260  *
261  * If successful, returns the OID of the new relation.
262  * ----------------------------------------------------------------
263  */
264 Oid
265 DefineRelation(CreateStmt *stmt, char relkind)
266 {
267         char            relname[NAMEDATALEN];
268         Oid                     namespaceId;
269         List       *schema = stmt->tableElts;
270         Oid                     relationId;
271         Oid                     tablespaceId;
272         Relation        rel;
273         TupleDesc       descriptor;
274         List       *inheritOids;
275         List       *old_constraints;
276         bool            localHasOids;
277         int                     parentOidCount;
278         List       *rawDefaults;
279         ListCell   *listptr;
280         int                     i;
281         AttrNumber      attnum;
282
283         /*
284          * Truncate relname to appropriate length (probably a waste of time, as
285          * parser should have done this already).
286          */
287         StrNCpy(relname, stmt->relation->relname, NAMEDATALEN);
288
289         /*
290          * Check consistency of arguments
291          */
292         if (stmt->oncommit != ONCOMMIT_NOOP && !stmt->relation->istemp)
293                 ereport(ERROR,
294                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
295                                  errmsg("ON COMMIT can only be used on temporary tables")));
296
297         /*
298          * Look up the namespace in which we are supposed to create the relation.
299          * Check we have permission to create there. Skip check if bootstrapping,
300          * since permissions machinery may not be working yet.
301          */
302         namespaceId = RangeVarGetCreationNamespace(stmt->relation);
303
304         if (!IsBootstrapProcessingMode())
305         {
306                 AclResult       aclresult;
307
308                 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
309                                                                                   ACL_CREATE);
310                 if (aclresult != ACLCHECK_OK)
311                         aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
312                                                    get_namespace_name(namespaceId));
313         }
314
315         /*
316          * Select tablespace to use.  If not specified, use default_tablespace
317          * (which may in turn default to database's default).
318          */
319         if (stmt->tablespacename)
320         {
321                 tablespaceId = get_tablespace_oid(stmt->tablespacename);
322                 if (!OidIsValid(tablespaceId))
323                         ereport(ERROR,
324                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
325                                          errmsg("tablespace \"%s\" does not exist",
326                                                         stmt->tablespacename)));
327         }
328         else
329         {
330                 tablespaceId = GetDefaultTablespace();
331                 /* note InvalidOid is OK in this case */
332         }
333
334         /* Check permissions except when using database's default */
335         if (OidIsValid(tablespaceId))
336         {
337                 AclResult       aclresult;
338
339                 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
340                                                                                    ACL_CREATE);
341                 if (aclresult != ACLCHECK_OK)
342                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
343                                                    get_tablespace_name(tablespaceId));
344         }
345
346         /*
347          * Look up inheritance ancestors and generate relation schema, including
348          * inherited attributes.
349          */
350         schema = MergeAttributes(schema, stmt->inhRelations,
351                                                          stmt->relation->istemp,
352                                                          &inheritOids, &old_constraints, &parentOidCount);
353
354         /*
355          * Create a relation descriptor from the relation schema and create the
356          * relation.  Note that in this stage only inherited (pre-cooked) defaults
357          * and constraints will be included into the new relation.
358          * (BuildDescForRelation takes care of the inherited defaults, but we have
359          * to copy inherited constraints here.)
360          */
361         descriptor = BuildDescForRelation(schema);
362
363         localHasOids = interpretOidsOption(stmt->hasoids);
364         descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
365
366         if (old_constraints != NIL)
367         {
368                 ConstrCheck *check = (ConstrCheck *)
369                 palloc0(list_length(old_constraints) * sizeof(ConstrCheck));
370                 int                     ncheck = 0;
371
372                 foreach(listptr, old_constraints)
373                 {
374                         Constraint *cdef = (Constraint *) lfirst(listptr);
375                         bool            dup = false;
376
377                         if (cdef->contype != CONSTR_CHECK)
378                                 continue;
379                         Assert(cdef->name != NULL);
380                         Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
381
382                         /*
383                          * In multiple-inheritance situations, it's possible to inherit
384                          * the same grandparent constraint through multiple parents.
385                          * Hence, discard inherited constraints that match as to both name
386                          * and expression.      Otherwise, gripe if the names conflict.
387                          */
388                         for (i = 0; i < ncheck; i++)
389                         {
390                                 if (strcmp(check[i].ccname, cdef->name) != 0)
391                                         continue;
392                                 if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0)
393                                 {
394                                         dup = true;
395                                         break;
396                                 }
397                                 ereport(ERROR,
398                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
399                                                  errmsg("duplicate check constraint name \"%s\"",
400                                                                 cdef->name)));
401                         }
402                         if (!dup)
403                         {
404                                 check[ncheck].ccname = cdef->name;
405                                 check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
406                                 ncheck++;
407                         }
408                 }
409                 if (ncheck > 0)
410                 {
411                         if (descriptor->constr == NULL)
412                         {
413                                 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
414                                 descriptor->constr->defval = NULL;
415                                 descriptor->constr->num_defval = 0;
416                                 descriptor->constr->has_not_null = false;
417                         }
418                         descriptor->constr->num_check = ncheck;
419                         descriptor->constr->check = check;
420                 }
421         }
422
423         relationId = heap_create_with_catalog(relname,
424                                                                                   namespaceId,
425                                                                                   tablespaceId,
426                                                                                   InvalidOid,
427                                                                                   GetUserId(),
428                                                                                   descriptor,
429                                                                                   relkind,
430                                                                                   false,
431                                                                                   localHasOids,
432                                                                                   parentOidCount,
433                                                                                   stmt->oncommit,
434                                                                                   allowSystemTableMods);
435
436         StoreCatalogInheritance(relationId, inheritOids);
437
438         /*
439          * We must bump the command counter to make the newly-created relation
440          * tuple visible for opening.
441          */
442         CommandCounterIncrement();
443
444         /*
445          * Open the new relation and acquire exclusive lock on it.      This isn't
446          * really necessary for locking out other backends (since they can't see
447          * the new rel anyway until we commit), but it keeps the lock manager from
448          * complaining about deadlock risks.
449          */
450         rel = relation_open(relationId, AccessExclusiveLock);
451
452         /*
453          * Now add any newly specified column default values and CHECK constraints
454          * to the new relation.  These are passed to us in the form of raw
455          * parsetrees; we need to transform them to executable expression trees
456          * before they can be added. The most convenient way to do that is to
457          * apply the parser's transformExpr routine, but transformExpr doesn't
458          * work unless we have a pre-existing relation. So, the transformation has
459          * to be postponed to this final step of CREATE TABLE.
460          *
461          * Another task that's conveniently done at this step is to add dependency
462          * links between columns and supporting relations (such as SERIAL
463          * sequences).
464          *
465          * First, scan schema to find new column defaults.
466          */
467         rawDefaults = NIL;
468         attnum = 0;
469
470         foreach(listptr, schema)
471         {
472                 ColumnDef  *colDef = lfirst(listptr);
473
474                 attnum++;
475
476                 if (colDef->raw_default != NULL)
477                 {
478                         RawColumnDefault *rawEnt;
479
480                         Assert(colDef->cooked_default == NULL);
481
482                         rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
483                         rawEnt->attnum = attnum;
484                         rawEnt->raw_default = colDef->raw_default;
485                         rawDefaults = lappend(rawDefaults, rawEnt);
486                 }
487
488                 /* Create dependency for supporting relation for this column */
489                 if (colDef->support != NULL)
490                         add_column_support_dependency(relationId, attnum, colDef->support);
491         }
492
493         /*
494          * Parse and add the defaults/constraints, if any.
495          */
496         if (rawDefaults || stmt->constraints)
497                 AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
498
499         /*
500          * Clean up.  We keep lock on new relation (although it shouldn't be
501          * visible to anyone else anyway, until commit).
502          */
503         relation_close(rel, NoLock);
504
505         return relationId;
506 }
507
508 /*
509  * RemoveRelation
510  *              Deletes a relation.
511  */
512 void
513 RemoveRelation(const RangeVar *relation, DropBehavior behavior)
514 {
515         Oid                     relOid;
516         ObjectAddress object;
517
518         relOid = RangeVarGetRelid(relation, false);
519
520         object.classId = RelationRelationId;
521         object.objectId = relOid;
522         object.objectSubId = 0;
523
524         performDeletion(&object, behavior);
525 }
526
527 /*
528  * ExecuteTruncate
529  *              Executes a TRUNCATE command.
530  *
531  * This is a multi-relation truncate.  We first open and grab exclusive
532  * lock on all relations involved, checking permissions and otherwise
533  * verifying that the relation is OK for truncation.  In CASCADE mode,
534  * relations having FK references to the targeted relations are automatically
535  * added to the group; in RESTRICT mode, we check that all FK references are
536  * internal to the group that's being truncated.  Finally all the relations
537  * are truncated and reindexed.
538  */
539 void
540 ExecuteTruncate(TruncateStmt *stmt)
541 {
542         List       *rels = NIL;
543         List       *relids = NIL;
544         ListCell   *cell;
545
546         /*
547          * Open, exclusive-lock, and check all the explicitly-specified relations
548          */
549         foreach(cell, stmt->relations)
550         {
551                 RangeVar   *rv = lfirst(cell);
552                 Relation        rel;
553
554                 rel = heap_openrv(rv, AccessExclusiveLock);
555                 truncate_check_rel(rel);
556                 rels = lappend(rels, rel);
557                 relids = lappend_oid(relids, RelationGetRelid(rel));
558         }
559
560         /*
561          * In CASCADE mode, suck in all referencing relations as well.  This
562          * requires multiple iterations to find indirectly-dependent relations.
563          * At each phase, we need to exclusive-lock new rels before looking
564          * for their dependencies, else we might miss something.  Also, we
565          * check each rel as soon as we open it, to avoid a faux pas such as
566          * holding lock for a long time on a rel we have no permissions for.
567          */
568         if (stmt->behavior == DROP_CASCADE)
569         {
570                 for (;;)
571                 {
572                         List   *newrelids;
573
574                         newrelids = heap_truncate_find_FKs(relids);
575                         if (newrelids == NIL)
576                                 break;                  /* nothing else to add */
577
578                         foreach(cell, newrelids)
579                         {
580                                 Oid             relid = lfirst_oid(cell);
581                                 Relation        rel;
582
583                                 rel = heap_open(relid, AccessExclusiveLock);
584                                 ereport(NOTICE,
585                                                 (errmsg("truncate cascades to table \"%s\"",
586                                                                 RelationGetRelationName(rel))));
587                                 truncate_check_rel(rel);
588                                 rels = lappend(rels, rel);
589                                 relids = lappend_oid(relids, relid);
590                         }
591                 }
592         }
593
594         /*
595          * Check foreign key references.  In CASCADE mode, this should be
596          * unnecessary since we just pulled in all the references; but as
597          * a cross-check, do it anyway if in an Assert-enabled build.
598          */
599 #ifdef USE_ASSERT_CHECKING
600         heap_truncate_check_FKs(rels, false);
601 #else
602         if (stmt->behavior == DROP_RESTRICT)
603                 heap_truncate_check_FKs(rels, false);
604 #endif
605
606         /*
607          * OK, truncate each table.
608          */
609         foreach(cell, rels)
610         {
611                 Relation        rel = (Relation) lfirst(cell);
612                 Oid                     heap_relid;
613                 Oid                     toast_relid;
614
615                 /*
616                  * Create a new empty storage file for the relation, and assign it as
617                  * the relfilenode value.       The old storage file is scheduled for
618                  * deletion at commit.
619                  */
620                 setNewRelfilenode(rel);
621
622                 heap_relid = RelationGetRelid(rel);
623                 toast_relid = rel->rd_rel->reltoastrelid;
624
625                 heap_close(rel, NoLock);
626
627                 /*
628                  * The same for the toast table, if any.
629                  */
630                 if (OidIsValid(toast_relid))
631                 {
632                         rel = relation_open(toast_relid, AccessExclusiveLock);
633                         setNewRelfilenode(rel);
634                         heap_close(rel, NoLock);
635                 }
636
637                 /*
638                  * Reconstruct the indexes to match, and we're done.
639                  */
640                 reindex_relation(heap_relid, true);
641         }
642 }
643
644 /*
645  * Check that a given rel is safe to truncate.  Subroutine for ExecuteTruncate
646  */
647 static void
648 truncate_check_rel(Relation rel)
649 {
650         /* Only allow truncate on regular tables */
651         if (rel->rd_rel->relkind != RELKIND_RELATION)
652                 ereport(ERROR,
653                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
654                                  errmsg("\"%s\" is not a table",
655                                                 RelationGetRelationName(rel))));
656
657         /* Permissions checks */
658         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
659                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
660                                            RelationGetRelationName(rel));
661
662         if (!allowSystemTableMods && IsSystemRelation(rel))
663                 ereport(ERROR,
664                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
665                                  errmsg("permission denied: \"%s\" is a system catalog",
666                                                 RelationGetRelationName(rel))));
667
668         /*
669          * We can never allow truncation of shared or nailed-in-cache
670          * relations, because we can't support changing their relfilenode
671          * values.
672          */
673         if (rel->rd_rel->relisshared || rel->rd_isnailed)
674                 ereport(ERROR,
675                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
676                                  errmsg("cannot truncate system relation \"%s\"",
677                                                 RelationGetRelationName(rel))));
678
679         /*
680          * Don't allow truncate on temp tables of other backends ... their
681          * local buffer manager is not going to cope.
682          */
683         if (isOtherTempNamespace(RelationGetNamespace(rel)))
684                 ereport(ERROR,
685                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
686                                  errmsg("cannot truncate temporary tables of other sessions")));
687 }
688
689 /*----------
690  * MergeAttributes
691  *              Returns new schema given initial schema and superclasses.
692  *
693  * Input arguments:
694  * 'schema' is the column/attribute definition for the table. (It's a list
695  *              of ColumnDef's.) It is destructively changed.
696  * 'supers' is a list of names (as RangeVar nodes) of parent relations.
697  * 'istemp' is TRUE if we are creating a temp relation.
698  *
699  * Output arguments:
700  * 'supOids' receives a list of the OIDs of the parent relations.
701  * 'supconstr' receives a list of constraints belonging to the parents,
702  *              updated as necessary to be valid for the child.
703  * 'supOidCount' is set to the number of parents that have OID columns.
704  *
705  * Return value:
706  * Completed schema list.
707  *
708  * Notes:
709  *        The order in which the attributes are inherited is very important.
710  *        Intuitively, the inherited attributes should come first. If a table
711  *        inherits from multiple parents, the order of those attributes are
712  *        according to the order of the parents specified in CREATE TABLE.
713  *
714  *        Here's an example:
715  *
716  *              create table person (name text, age int4, location point);
717  *              create table emp (salary int4, manager text) inherits(person);
718  *              create table student (gpa float8) inherits (person);
719  *              create table stud_emp (percent int4) inherits (emp, student);
720  *
721  *        The order of the attributes of stud_emp is:
722  *
723  *                                                      person {1:name, 2:age, 3:location}
724  *                                                      /        \
725  *                         {6:gpa}      student   emp {4:salary, 5:manager}
726  *                                                      \        /
727  *                                                 stud_emp {7:percent}
728  *
729  *         If the same attribute name appears multiple times, then it appears
730  *         in the result table in the proper location for its first appearance.
731  *
732  *         Constraints (including NOT NULL constraints) for the child table
733  *         are the union of all relevant constraints, from both the child schema
734  *         and parent tables.
735  *
736  *         The default value for a child column is defined as:
737  *              (1) If the child schema specifies a default, that value is used.
738  *              (2) If neither the child nor any parent specifies a default, then
739  *                      the column will not have a default.
740  *              (3) If conflicting defaults are inherited from different parents
741  *                      (and not overridden by the child), an error is raised.
742  *              (4) Otherwise the inherited default is used.
743  *              Rule (3) is new in Postgres 7.1; in earlier releases you got a
744  *              rather arbitrary choice of which parent default to use.
745  *----------
746  */
747 static List *
748 MergeAttributes(List *schema, List *supers, bool istemp,
749                                 List **supOids, List **supconstr, int *supOidCount)
750 {
751         ListCell   *entry;
752         List       *inhSchema = NIL;
753         List       *parentOids = NIL;
754         List       *constraints = NIL;
755         int                     parentsWithOids = 0;
756         bool            have_bogus_defaults = false;
757         char       *bogus_marker = "Bogus!";            /* marks conflicting defaults */
758         int                     child_attno;
759
760         /*
761          * Check for and reject tables with too many columns. We perform this
762          * check relatively early for two reasons: (a) we don't run the risk of
763          * overflowing an AttrNumber in subsequent code (b) an O(n^2) algorithm is
764          * okay if we're processing <= 1600 columns, but could take minutes to
765          * execute if the user attempts to create a table with hundreds of
766          * thousands of columns.
767          *
768          * Note that we also need to check that any we do not exceed this figure
769          * after including columns from inherited relations.
770          */
771         if (list_length(schema) > MaxHeapAttributeNumber)
772                 ereport(ERROR,
773                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
774                                  errmsg("tables can have at most %d columns",
775                                                 MaxHeapAttributeNumber)));
776
777         /*
778          * Check for duplicate names in the explicit list of attributes.
779          *
780          * Although we might consider merging such entries in the same way that we
781          * handle name conflicts for inherited attributes, it seems to make more
782          * sense to assume such conflicts are errors.
783          */
784         foreach(entry, schema)
785         {
786                 ColumnDef  *coldef = lfirst(entry);
787                 ListCell   *rest;
788
789                 for_each_cell(rest, lnext(entry))
790                 {
791                         ColumnDef  *restdef = lfirst(rest);
792
793                         if (strcmp(coldef->colname, restdef->colname) == 0)
794                                 ereport(ERROR,
795                                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
796                                                  errmsg("column \"%s\" duplicated",
797                                                                 coldef->colname)));
798                 }
799         }
800
801         /*
802          * Scan the parents left-to-right, and merge their attributes to form a
803          * list of inherited attributes (inhSchema).  Also check to see if we need
804          * to inherit an OID column.
805          */
806         child_attno = 0;
807         foreach(entry, supers)
808         {
809                 RangeVar   *parent = (RangeVar *) lfirst(entry);
810                 Relation        relation;
811                 TupleDesc       tupleDesc;
812                 TupleConstr *constr;
813                 AttrNumber *newattno;
814                 AttrNumber      parent_attno;
815
816                 relation = heap_openrv(parent, AccessShareLock);
817
818                 if (relation->rd_rel->relkind != RELKIND_RELATION)
819                         ereport(ERROR,
820                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
821                                          errmsg("inherited relation \"%s\" is not a table",
822                                                         parent->relname)));
823                 /* Permanent rels cannot inherit from temporary ones */
824                 if (!istemp && isTempNamespace(RelationGetNamespace(relation)))
825                         ereport(ERROR,
826                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
827                                          errmsg("cannot inherit from temporary relation \"%s\"",
828                                                         parent->relname)));
829
830                 /*
831                  * We should have an UNDER permission flag for this, but for now,
832                  * demand that creator of a child table own the parent.
833                  */
834                 if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
835                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
836                                                    RelationGetRelationName(relation));
837
838                 /*
839                  * Reject duplications in the list of parents.
840                  */
841                 if (list_member_oid(parentOids, RelationGetRelid(relation)))
842                         ereport(ERROR,
843                                         (errcode(ERRCODE_DUPLICATE_TABLE),
844                                          errmsg("inherited relation \"%s\" duplicated",
845                                                         parent->relname)));
846
847                 parentOids = lappend_oid(parentOids, RelationGetRelid(relation));
848
849                 if (relation->rd_rel->relhasoids)
850                         parentsWithOids++;
851
852                 tupleDesc = RelationGetDescr(relation);
853                 constr = tupleDesc->constr;
854
855                 /*
856                  * newattno[] will contain the child-table attribute numbers for the
857                  * attributes of this parent table.  (They are not the same for
858                  * parents after the first one, nor if we have dropped columns.)
859                  */
860                 newattno = (AttrNumber *)
861                         palloc(tupleDesc->natts * sizeof(AttrNumber));
862
863                 for (parent_attno = 1; parent_attno <= tupleDesc->natts;
864                          parent_attno++)
865                 {
866                         Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
867                         char       *attributeName = NameStr(attribute->attname);
868                         int                     exist_attno;
869                         ColumnDef  *def;
870                         TypeName   *typename;
871
872                         /*
873                          * Ignore dropped columns in the parent.
874                          */
875                         if (attribute->attisdropped)
876                         {
877                                 /*
878                                  * change_varattnos_of_a_node asserts that this is greater
879                                  * than zero, so if anything tries to use it, we should find
880                                  * out.
881                                  */
882                                 newattno[parent_attno - 1] = 0;
883                                 continue;
884                         }
885
886                         /*
887                          * Does it conflict with some previously inherited column?
888                          */
889                         exist_attno = findAttrByName(attributeName, inhSchema);
890                         if (exist_attno > 0)
891                         {
892                                 /*
893                                  * Yes, try to merge the two column definitions. They must
894                                  * have the same type and typmod.
895                                  */
896                                 ereport(NOTICE,
897                                                 (errmsg("merging multiple inherited definitions of column \"%s\"",
898                                                                 attributeName)));
899                                 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
900                                 if (typenameTypeId(def->typename) != attribute->atttypid ||
901                                         def->typename->typmod != attribute->atttypmod)
902                                         ereport(ERROR,
903                                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
904                                                 errmsg("inherited column \"%s\" has a type conflict",
905                                                            attributeName),
906                                                          errdetail("%s versus %s",
907                                                                            TypeNameToString(def->typename),
908                                                                            format_type_be(attribute->atttypid))));
909                                 def->inhcount++;
910                                 /* Merge of NOT NULL constraints = OR 'em together */
911                                 def->is_not_null |= attribute->attnotnull;
912                                 /* Default and other constraints are handled below */
913                                 newattno[parent_attno - 1] = exist_attno;
914                         }
915                         else
916                         {
917                                 /*
918                                  * No, create a new inherited column
919                                  */
920                                 def = makeNode(ColumnDef);
921                                 def->colname = pstrdup(attributeName);
922                                 typename = makeNode(TypeName);
923                                 typename->typeid = attribute->atttypid;
924                                 typename->typmod = attribute->atttypmod;
925                                 def->typename = typename;
926                                 def->inhcount = 1;
927                                 def->is_local = false;
928                                 def->is_not_null = attribute->attnotnull;
929                                 def->raw_default = NULL;
930                                 def->cooked_default = NULL;
931                                 def->constraints = NIL;
932                                 def->support = NULL;
933                                 inhSchema = lappend(inhSchema, def);
934                                 newattno[parent_attno - 1] = ++child_attno;
935                         }
936
937                         /*
938                          * Copy default if any
939                          */
940                         if (attribute->atthasdef)
941                         {
942                                 char       *this_default = NULL;
943                                 AttrDefault *attrdef;
944                                 int                     i;
945
946                                 /* Find default in constraint structure */
947                                 Assert(constr != NULL);
948                                 attrdef = constr->defval;
949                                 for (i = 0; i < constr->num_defval; i++)
950                                 {
951                                         if (attrdef[i].adnum == parent_attno)
952                                         {
953                                                 this_default = attrdef[i].adbin;
954                                                 break;
955                                         }
956                                 }
957                                 Assert(this_default != NULL);
958
959                                 /*
960                                  * If default expr could contain any vars, we'd need to fix
961                                  * 'em, but it can't; so default is ready to apply to child.
962                                  *
963                                  * If we already had a default from some prior parent, check
964                                  * to see if they are the same.  If so, no problem; if not,
965                                  * mark the column as having a bogus default. Below, we will
966                                  * complain if the bogus default isn't overridden by the child
967                                  * schema.
968                                  */
969                                 Assert(def->raw_default == NULL);
970                                 if (def->cooked_default == NULL)
971                                         def->cooked_default = pstrdup(this_default);
972                                 else if (strcmp(def->cooked_default, this_default) != 0)
973                                 {
974                                         def->cooked_default = bogus_marker;
975                                         have_bogus_defaults = true;
976                                 }
977                         }
978                 }
979
980                 /*
981                  * Now copy the constraints of this parent, adjusting attnos using the
982                  * completed newattno[] map
983                  */
984                 if (constr && constr->num_check > 0)
985                 {
986                         ConstrCheck *check = constr->check;
987                         int                     i;
988
989                         for (i = 0; i < constr->num_check; i++)
990                         {
991                                 Constraint *cdef = makeNode(Constraint);
992                                 Node       *expr;
993
994                                 cdef->contype = CONSTR_CHECK;
995                                 cdef->name = pstrdup(check[i].ccname);
996                                 cdef->raw_expr = NULL;
997                                 /* adjust varattnos of ccbin here */
998                                 expr = stringToNode(check[i].ccbin);
999                                 change_varattnos_of_a_node(expr, newattno);
1000                                 cdef->cooked_expr = nodeToString(expr);
1001                                 constraints = lappend(constraints, cdef);
1002                         }
1003                 }
1004
1005                 pfree(newattno);
1006
1007                 /*
1008                  * Close the parent rel, but keep our AccessShareLock on it until xact
1009                  * commit.      That will prevent someone else from deleting or ALTERing
1010                  * the parent before the child is committed.
1011                  */
1012                 heap_close(relation, NoLock);
1013         }
1014
1015         /*
1016          * If we had no inherited attributes, the result schema is just the
1017          * explicitly declared columns.  Otherwise, we need to merge the declared
1018          * columns into the inherited schema list.
1019          */
1020         if (inhSchema != NIL)
1021         {
1022                 foreach(entry, schema)
1023                 {
1024                         ColumnDef  *newdef = lfirst(entry);
1025                         char       *attributeName = newdef->colname;
1026                         int                     exist_attno;
1027
1028                         /*
1029                          * Does it conflict with some previously inherited column?
1030                          */
1031                         exist_attno = findAttrByName(attributeName, inhSchema);
1032                         if (exist_attno > 0)
1033                         {
1034                                 ColumnDef  *def;
1035
1036                                 /*
1037                                  * Yes, try to merge the two column definitions. They must
1038                                  * have the same type and typmod.
1039                                  */
1040                                 ereport(NOTICE,
1041                                    (errmsg("merging column \"%s\" with inherited definition",
1042                                                    attributeName)));
1043                                 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
1044                                 if (typenameTypeId(def->typename) != typenameTypeId(newdef->typename) ||
1045                                         def->typename->typmod != newdef->typename->typmod)
1046                                         ereport(ERROR,
1047                                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1048                                                          errmsg("column \"%s\" has a type conflict",
1049                                                                         attributeName),
1050                                                          errdetail("%s versus %s",
1051                                                                            TypeNameToString(def->typename),
1052                                                                            TypeNameToString(newdef->typename))));
1053                                 /* Mark the column as locally defined */
1054                                 def->is_local = true;
1055                                 /* Merge of NOT NULL constraints = OR 'em together */
1056                                 def->is_not_null |= newdef->is_not_null;
1057                                 /* If new def has a default, override previous default */
1058                                 if (newdef->raw_default != NULL)
1059                                 {
1060                                         def->raw_default = newdef->raw_default;
1061                                         def->cooked_default = newdef->cooked_default;
1062                                 }
1063                         }
1064                         else
1065                         {
1066                                 /*
1067                                  * No, attach new column to result schema
1068                                  */
1069                                 inhSchema = lappend(inhSchema, newdef);
1070                         }
1071                 }
1072
1073                 schema = inhSchema;
1074
1075                 /*
1076                  * Check that we haven't exceeded the legal # of columns after merging
1077                  * in inherited columns.
1078                  */
1079                 if (list_length(schema) > MaxHeapAttributeNumber)
1080                         ereport(ERROR,
1081                                         (errcode(ERRCODE_TOO_MANY_COLUMNS),
1082                                          errmsg("tables can have at most %d columns",
1083                                                         MaxHeapAttributeNumber)));
1084         }
1085
1086         /*
1087          * If we found any conflicting parent default values, check to make sure
1088          * they were overridden by the child.
1089          */
1090         if (have_bogus_defaults)
1091         {
1092                 foreach(entry, schema)
1093                 {
1094                         ColumnDef  *def = lfirst(entry);
1095
1096                         if (def->cooked_default == bogus_marker)
1097                                 ereport(ERROR,
1098                                                 (errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
1099                                   errmsg("column \"%s\" inherits conflicting default values",
1100                                                  def->colname),
1101                                                  errhint("To resolve the conflict, specify a default explicitly.")));
1102                 }
1103         }
1104
1105         *supOids = parentOids;
1106         *supconstr = constraints;
1107         *supOidCount = parentsWithOids;
1108         return schema;
1109 }
1110
1111 /*
1112  * complementary static functions for MergeAttributes().
1113  *
1114  * Varattnos of pg_constraint.conbin must be rewritten when subclasses inherit
1115  * constraints from parent classes, since the inherited attributes could
1116  * be given different column numbers in multiple-inheritance cases.
1117  *
1118  * Note that the passed node tree is modified in place!
1119  */
1120 static bool
1121 change_varattnos_walker(Node *node, const AttrNumber *newattno)
1122 {
1123         if (node == NULL)
1124                 return false;
1125         if (IsA(node, Var))
1126         {
1127                 Var                *var = (Var *) node;
1128
1129                 if (var->varlevelsup == 0 && var->varno == 1 &&
1130                         var->varattno > 0)
1131                 {
1132                         /*
1133                          * ??? the following may be a problem when the node is multiply
1134                          * referenced though stringToNode() doesn't create such a node
1135                          * currently.
1136                          */
1137                         Assert(newattno[var->varattno - 1] > 0);
1138                         var->varattno = newattno[var->varattno - 1];
1139                 }
1140                 return false;
1141         }
1142         return expression_tree_walker(node, change_varattnos_walker,
1143                                                                   (void *) newattno);
1144 }
1145
1146 static bool
1147 change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
1148 {
1149         return change_varattnos_walker(node, newattno);
1150 }
1151
1152 /*
1153  * StoreCatalogInheritance
1154  *              Updates the system catalogs with proper inheritance information.
1155  *
1156  * supers is a list of the OIDs of the new relation's direct ancestors.
1157  */
1158 static void
1159 StoreCatalogInheritance(Oid relationId, List *supers)
1160 {
1161         Relation        relation;
1162         TupleDesc       desc;
1163         int16           seqNumber;
1164         ListCell   *entry;
1165         HeapTuple       tuple;
1166
1167         /*
1168          * sanity checks
1169          */
1170         AssertArg(OidIsValid(relationId));
1171
1172         if (supers == NIL)
1173                 return;
1174
1175         /*
1176          * Store INHERITS information in pg_inherits using direct ancestors only.
1177          * Also enter dependencies on the direct ancestors, and make sure they are
1178          * marked with relhassubclass = true.
1179          *
1180          * (Once upon a time, both direct and indirect ancestors were found here
1181          * and then entered into pg_ipl.  Since that catalog doesn't exist
1182          * anymore, there's no need to look for indirect ancestors.)
1183          */
1184         relation = heap_open(InheritsRelationId, RowExclusiveLock);
1185         desc = RelationGetDescr(relation);
1186
1187         seqNumber = 1;
1188         foreach(entry, supers)
1189         {
1190                 Oid                     parentOid = lfirst_oid(entry);
1191                 Datum           datum[Natts_pg_inherits];
1192                 char            nullarr[Natts_pg_inherits];
1193                 ObjectAddress childobject,
1194                                         parentobject;
1195
1196                 datum[0] = ObjectIdGetDatum(relationId);                /* inhrel */
1197                 datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
1198                 datum[2] = Int16GetDatum(seqNumber);    /* inhseqno */
1199
1200                 nullarr[0] = ' ';
1201                 nullarr[1] = ' ';
1202                 nullarr[2] = ' ';
1203
1204                 tuple = heap_formtuple(desc, datum, nullarr);
1205
1206                 simple_heap_insert(relation, tuple);
1207
1208                 CatalogUpdateIndexes(relation, tuple);
1209
1210                 heap_freetuple(tuple);
1211
1212                 /*
1213                  * Store a dependency too
1214                  */
1215                 parentobject.classId = RelationRelationId;
1216                 parentobject.objectId = parentOid;
1217                 parentobject.objectSubId = 0;
1218                 childobject.classId = RelationRelationId;
1219                 childobject.objectId = relationId;
1220                 childobject.objectSubId = 0;
1221
1222                 recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
1223
1224                 /*
1225                  * Mark the parent as having subclasses.
1226                  */
1227                 setRelhassubclassInRelation(parentOid, true);
1228
1229                 seqNumber += 1;
1230         }
1231
1232         heap_close(relation, RowExclusiveLock);
1233 }
1234
1235 /*
1236  * Look for an existing schema entry with the given name.
1237  *
1238  * Returns the index (starting with 1) if attribute already exists in schema,
1239  * 0 if it doesn't.
1240  */
1241 static int
1242 findAttrByName(const char *attributeName, List *schema)
1243 {
1244         ListCell   *s;
1245         int                     i = 1;
1246
1247         foreach(s, schema)
1248         {
1249                 ColumnDef  *def = lfirst(s);
1250
1251                 if (strcmp(attributeName, def->colname) == 0)
1252                         return i;
1253
1254                 i++;
1255         }
1256         return 0;
1257 }
1258
1259 /*
1260  * Update a relation's pg_class.relhassubclass entry to the given value
1261  */
1262 static void
1263 setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
1264 {
1265         Relation        relationRelation;
1266         HeapTuple       tuple;
1267         Form_pg_class classtuple;
1268
1269         /*
1270          * Fetch a modifiable copy of the tuple, modify it, update pg_class.
1271          *
1272          * If the tuple already has the right relhassubclass setting, we don't
1273          * need to update it, but we still need to issue an SI inval message.
1274          */
1275         relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
1276         tuple = SearchSysCacheCopy(RELOID,
1277                                                            ObjectIdGetDatum(relationId),
1278                                                            0, 0, 0);
1279         if (!HeapTupleIsValid(tuple))
1280                 elog(ERROR, "cache lookup failed for relation %u", relationId);
1281         classtuple = (Form_pg_class) GETSTRUCT(tuple);
1282
1283         if (classtuple->relhassubclass != relhassubclass)
1284         {
1285                 classtuple->relhassubclass = relhassubclass;
1286                 simple_heap_update(relationRelation, &tuple->t_self, tuple);
1287
1288                 /* keep the catalog indexes up to date */
1289                 CatalogUpdateIndexes(relationRelation, tuple);
1290         }
1291         else
1292         {
1293                 /* no need to change tuple, but force relcache rebuild anyway */
1294                 CacheInvalidateRelcacheByTuple(tuple);
1295         }
1296
1297         heap_freetuple(tuple);
1298         heap_close(relationRelation, RowExclusiveLock);
1299 }
1300
1301
1302 /*
1303  *              renameatt               - changes the name of a attribute in a relation
1304  *
1305  *              Attname attribute is changed in attribute catalog.
1306  *              No record of the previous attname is kept (correct?).
1307  *
1308  *              get proper relrelation from relation catalog (if not arg)
1309  *              scan attribute catalog
1310  *                              for name conflict (within rel)
1311  *                              for original attribute (if not arg)
1312  *              modify attname in attribute tuple
1313  *              insert modified attribute in attribute catalog
1314  *              delete original attribute from attribute catalog
1315  */
1316 void
1317 renameatt(Oid myrelid,
1318                   const char *oldattname,
1319                   const char *newattname,
1320                   bool recurse,
1321                   bool recursing)
1322 {
1323         Relation        targetrelation;
1324         Relation        attrelation;
1325         HeapTuple       atttup;
1326         Form_pg_attribute attform;
1327         int                     attnum;
1328         List       *indexoidlist;
1329         ListCell   *indexoidscan;
1330
1331         /*
1332          * Grab an exclusive lock on the target table, which we will NOT release
1333          * until end of transaction.
1334          */
1335         targetrelation = relation_open(myrelid, AccessExclusiveLock);
1336
1337         /*
1338          * permissions checking.  this would normally be done in utility.c, but
1339          * this particular routine is recursive.
1340          *
1341          * normally, only the owner of a class can change its schema.
1342          */
1343         if (!pg_class_ownercheck(myrelid, GetUserId()))
1344                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1345                                            RelationGetRelationName(targetrelation));
1346         if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1347                 ereport(ERROR,
1348                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1349                                  errmsg("permission denied: \"%s\" is a system catalog",
1350                                                 RelationGetRelationName(targetrelation))));
1351
1352         /*
1353          * if the 'recurse' flag is set then we are supposed to rename this
1354          * attribute in all classes that inherit from 'relname' (as well as in
1355          * 'relname').
1356          *
1357          * any permissions or problems with duplicate attributes will cause the
1358          * whole transaction to abort, which is what we want -- all or nothing.
1359          */
1360         if (recurse)
1361         {
1362                 ListCell   *child;
1363                 List       *children;
1364
1365                 /* this routine is actually in the planner */
1366                 children = find_all_inheritors(myrelid);
1367
1368                 /*
1369                  * find_all_inheritors does the recursive search of the inheritance
1370                  * hierarchy, so all we have to do is process all of the relids in the
1371                  * list that it returns.
1372                  */
1373                 foreach(child, children)
1374                 {
1375                         Oid                     childrelid = lfirst_oid(child);
1376
1377                         if (childrelid == myrelid)
1378                                 continue;
1379                         /* note we need not recurse again */
1380                         renameatt(childrelid, oldattname, newattname, false, true);
1381                 }
1382         }
1383         else
1384         {
1385                 /*
1386                  * If we are told not to recurse, there had better not be any child
1387                  * tables; else the rename would put them out of step.
1388                  */
1389                 if (!recursing &&
1390                         find_inheritance_children(myrelid) != NIL)
1391                         ereport(ERROR,
1392                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1393                                          errmsg("inherited column \"%s\" must be renamed in child tables too",
1394                                                         oldattname)));
1395         }
1396
1397         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
1398
1399         atttup = SearchSysCacheCopyAttName(myrelid, oldattname);
1400         if (!HeapTupleIsValid(atttup))
1401                 ereport(ERROR,
1402                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
1403                                  errmsg("column \"%s\" does not exist",
1404                                                 oldattname)));
1405         attform = (Form_pg_attribute) GETSTRUCT(atttup);
1406
1407         attnum = attform->attnum;
1408         if (attnum <= 0)
1409                 ereport(ERROR,
1410                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1411                                  errmsg("cannot rename system column \"%s\"",
1412                                                 oldattname)));
1413
1414         /*
1415          * if the attribute is inherited, forbid the renaming, unless we are
1416          * already inside a recursive rename.
1417          */
1418         if (attform->attinhcount > 0 && !recursing)
1419                 ereport(ERROR,
1420                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1421                                  errmsg("cannot rename inherited column \"%s\"",
1422                                                 oldattname)));
1423
1424         /* should not already exist */
1425         /* this test is deliberately not attisdropped-aware */
1426         if (SearchSysCacheExists(ATTNAME,
1427                                                          ObjectIdGetDatum(myrelid),
1428                                                          PointerGetDatum(newattname),
1429                                                          0, 0))
1430                 ereport(ERROR,
1431                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
1432                                  errmsg("column \"%s\" of relation \"%s\" already exists",
1433                                           newattname, RelationGetRelationName(targetrelation))));
1434
1435         namestrcpy(&(attform->attname), newattname);
1436
1437         simple_heap_update(attrelation, &atttup->t_self, atttup);
1438
1439         /* keep system catalog indexes current */
1440         CatalogUpdateIndexes(attrelation, atttup);
1441
1442         heap_freetuple(atttup);
1443
1444         /*
1445          * Update column names of indexes that refer to the column being renamed.
1446          */
1447         indexoidlist = RelationGetIndexList(targetrelation);
1448
1449         foreach(indexoidscan, indexoidlist)
1450         {
1451                 Oid                     indexoid = lfirst_oid(indexoidscan);
1452                 HeapTuple       indextup;
1453                 Form_pg_index indexform;
1454                 int                     i;
1455
1456                 /*
1457                  * Scan through index columns to see if there's any simple index
1458                  * entries for this attribute.  We ignore expressional entries.
1459                  */
1460                 indextup = SearchSysCache(INDEXRELID,
1461                                                                   ObjectIdGetDatum(indexoid),
1462                                                                   0, 0, 0);
1463                 if (!HeapTupleIsValid(indextup))
1464                         elog(ERROR, "cache lookup failed for index %u", indexoid);
1465                 indexform = (Form_pg_index) GETSTRUCT(indextup);
1466
1467                 for (i = 0; i < indexform->indnatts; i++)
1468                 {
1469                         if (attnum != indexform->indkey.values[i])
1470                                 continue;
1471
1472                         /*
1473                          * Found one, rename it.
1474                          */
1475                         atttup = SearchSysCacheCopy(ATTNUM,
1476                                                                                 ObjectIdGetDatum(indexoid),
1477                                                                                 Int16GetDatum(i + 1),
1478                                                                                 0, 0);
1479                         if (!HeapTupleIsValid(atttup))
1480                                 continue;               /* should we raise an error? */
1481
1482                         /*
1483                          * Update the (copied) attribute tuple.
1484                          */
1485                         namestrcpy(&(((Form_pg_attribute) GETSTRUCT(atttup))->attname),
1486                                            newattname);
1487
1488                         simple_heap_update(attrelation, &atttup->t_self, atttup);
1489
1490                         /* keep system catalog indexes current */
1491                         CatalogUpdateIndexes(attrelation, atttup);
1492
1493                         heap_freetuple(atttup);
1494                 }
1495
1496                 ReleaseSysCache(indextup);
1497         }
1498
1499         list_free(indexoidlist);
1500
1501         heap_close(attrelation, RowExclusiveLock);
1502
1503         /*
1504          * Update att name in any RI triggers associated with the relation.
1505          */
1506         if (targetrelation->rd_rel->reltriggers > 0)
1507         {
1508                 /* update tgargs column reference where att is primary key */
1509                 update_ri_trigger_args(RelationGetRelid(targetrelation),
1510                                                            oldattname, newattname,
1511                                                            false, false);
1512                 /* update tgargs column reference where att is foreign key */
1513                 update_ri_trigger_args(RelationGetRelid(targetrelation),
1514                                                            oldattname, newattname,
1515                                                            true, false);
1516         }
1517
1518         relation_close(targetrelation, NoLock);         /* close rel but keep lock */
1519 }
1520
1521 /*
1522  *              renamerel               - change the name of a relation
1523  *
1524  *              XXX - When renaming sequences, we don't bother to modify the
1525  *                        sequence name that is stored within the sequence itself
1526  *                        (this would cause problems with MVCC). In the future,
1527  *                        the sequence name should probably be removed from the
1528  *                        sequence, AFAIK there's no need for it to be there.
1529  */
1530 void
1531 renamerel(Oid myrelid, const char *newrelname)
1532 {
1533         Relation        targetrelation;
1534         Relation        relrelation;    /* for RELATION relation */
1535         HeapTuple       reltup;
1536         Oid                     namespaceId;
1537         char       *oldrelname;
1538         char            relkind;
1539         bool            relhastriggers;
1540
1541         /*
1542          * Grab an exclusive lock on the target table or index, which we will NOT
1543          * release until end of transaction.
1544          */
1545         targetrelation = relation_open(myrelid, AccessExclusiveLock);
1546
1547         oldrelname = pstrdup(RelationGetRelationName(targetrelation));
1548         namespaceId = RelationGetNamespace(targetrelation);
1549
1550         if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1551                 ereport(ERROR,
1552                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1553                                  errmsg("permission denied: \"%s\" is a system catalog",
1554                                                 RelationGetRelationName(targetrelation))));
1555
1556         relkind = targetrelation->rd_rel->relkind;
1557         relhastriggers = (targetrelation->rd_rel->reltriggers > 0);
1558
1559         /*
1560          * Find relation's pg_class tuple, and make sure newrelname isn't in use.
1561          */
1562         relrelation = heap_open(RelationRelationId, RowExclusiveLock);
1563
1564         reltup = SearchSysCacheCopy(RELOID,
1565                                                                 PointerGetDatum(myrelid),
1566                                                                 0, 0, 0);
1567         if (!HeapTupleIsValid(reltup))          /* shouldn't happen */
1568                 elog(ERROR, "cache lookup failed for relation %u", myrelid);
1569
1570         if (get_relname_relid(newrelname, namespaceId) != InvalidOid)
1571                 ereport(ERROR,
1572                                 (errcode(ERRCODE_DUPLICATE_TABLE),
1573                                  errmsg("relation \"%s\" already exists",
1574                                                 newrelname)));
1575
1576         /*
1577          * Update pg_class tuple with new relname.      (Scribbling on reltup is OK
1578          * because it's a copy...)
1579          */
1580         namestrcpy(&(((Form_pg_class) GETSTRUCT(reltup))->relname), newrelname);
1581
1582         simple_heap_update(relrelation, &reltup->t_self, reltup);
1583
1584         /* keep the system catalog indexes current */
1585         CatalogUpdateIndexes(relrelation, reltup);
1586
1587         heap_freetuple(reltup);
1588         heap_close(relrelation, RowExclusiveLock);
1589
1590         /*
1591          * Also rename the associated type, if any.
1592          */
1593         if (relkind != RELKIND_INDEX)
1594                 TypeRename(oldrelname, namespaceId, newrelname);
1595
1596         /*
1597          * Update rel name in any RI triggers associated with the relation.
1598          */
1599         if (relhastriggers)
1600         {
1601                 /* update tgargs where relname is primary key */
1602                 update_ri_trigger_args(myrelid,
1603                                                            oldrelname,
1604                                                            newrelname,
1605                                                            false, true);
1606                 /* update tgargs where relname is foreign key */
1607                 update_ri_trigger_args(myrelid,
1608                                                            oldrelname,
1609                                                            newrelname,
1610                                                            true, true);
1611         }
1612
1613         /*
1614          * Close rel, but keep exclusive lock!
1615          */
1616         relation_close(targetrelation, NoLock);
1617 }
1618
1619 /*
1620  * Scan pg_trigger for RI triggers that are on the specified relation
1621  * (if fk_scan is false) or have it as the tgconstrrel (if fk_scan
1622  * is true).  Update RI trigger args fields matching oldname to contain
1623  * newname instead.  If update_relname is true, examine the relname
1624  * fields; otherwise examine the attname fields.
1625  */
1626 static void
1627 update_ri_trigger_args(Oid relid,
1628                                            const char *oldname,
1629                                            const char *newname,
1630                                            bool fk_scan,
1631                                            bool update_relname)
1632 {
1633         Relation        tgrel;
1634         ScanKeyData skey[1];
1635         SysScanDesc trigscan;
1636         HeapTuple       tuple;
1637         Datum           values[Natts_pg_trigger];
1638         char            nulls[Natts_pg_trigger];
1639         char            replaces[Natts_pg_trigger];
1640
1641         tgrel = heap_open(TriggerRelationId, RowExclusiveLock);
1642         if (fk_scan)
1643         {
1644                 ScanKeyInit(&skey[0],
1645                                         Anum_pg_trigger_tgconstrrelid,
1646                                         BTEqualStrategyNumber, F_OIDEQ,
1647                                         ObjectIdGetDatum(relid));
1648                 trigscan = systable_beginscan(tgrel, TriggerConstrRelidIndexId,
1649                                                                           true, SnapshotNow,
1650                                                                           1, skey);
1651         }
1652         else
1653         {
1654                 ScanKeyInit(&skey[0],
1655                                         Anum_pg_trigger_tgrelid,
1656                                         BTEqualStrategyNumber, F_OIDEQ,
1657                                         ObjectIdGetDatum(relid));
1658                 trigscan = systable_beginscan(tgrel, TriggerRelidNameIndexId,
1659                                                                           true, SnapshotNow,
1660                                                                           1, skey);
1661         }
1662
1663         while ((tuple = systable_getnext(trigscan)) != NULL)
1664         {
1665                 Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1666                 bytea      *val;
1667                 bytea      *newtgargs;
1668                 bool            isnull;
1669                 int                     tg_type;
1670                 bool            examine_pk;
1671                 bool            changed;
1672                 int                     tgnargs;
1673                 int                     i;
1674                 int                     newlen;
1675                 const char *arga[RI_MAX_ARGUMENTS];
1676                 const char *argp;
1677
1678                 tg_type = RI_FKey_trigger_type(pg_trigger->tgfoid);
1679                 if (tg_type == RI_TRIGGER_NONE)
1680                 {
1681                         /* Not an RI trigger, forget it */
1682                         continue;
1683                 }
1684
1685                 /*
1686                  * It is an RI trigger, so parse the tgargs bytea.
1687                  *
1688                  * NB: we assume the field will never be compressed or moved out of
1689                  * line; so does trigger.c ...
1690                  */
1691                 tgnargs = pg_trigger->tgnargs;
1692                 val = (bytea *)
1693                         DatumGetPointer(fastgetattr(tuple,
1694                                                                                 Anum_pg_trigger_tgargs,
1695                                                                                 tgrel->rd_att, &isnull));
1696                 if (isnull || tgnargs < RI_FIRST_ATTNAME_ARGNO ||
1697                         tgnargs > RI_MAX_ARGUMENTS)
1698                 {
1699                         /* This probably shouldn't happen, but ignore busted triggers */
1700                         continue;
1701                 }
1702                 argp = (const char *) VARDATA(val);
1703                 for (i = 0; i < tgnargs; i++)
1704                 {
1705                         arga[i] = argp;
1706                         argp += strlen(argp) + 1;
1707                 }
1708
1709                 /*
1710                  * Figure out which item(s) to look at.  If the trigger is primary-key
1711                  * type and attached to my rel, I should look at the PK fields; if it
1712                  * is foreign-key type and attached to my rel, I should look at the FK
1713                  * fields.      But the opposite rule holds when examining triggers found
1714                  * by tgconstrrel search.
1715                  */
1716                 examine_pk = (tg_type == RI_TRIGGER_PK) == (!fk_scan);
1717
1718                 changed = false;
1719                 if (update_relname)
1720                 {
1721                         /* Change the relname if needed */
1722                         i = examine_pk ? RI_PK_RELNAME_ARGNO : RI_FK_RELNAME_ARGNO;
1723                         if (strcmp(arga[i], oldname) == 0)
1724                         {
1725                                 arga[i] = newname;
1726                                 changed = true;
1727                         }
1728                 }
1729                 else
1730                 {
1731                         /* Change attname(s) if needed */
1732                         i = examine_pk ? RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX :
1733                                 RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_FK_IDX;
1734                         for (; i < tgnargs; i += 2)
1735                         {
1736                                 if (strcmp(arga[i], oldname) == 0)
1737                                 {
1738                                         arga[i] = newname;
1739                                         changed = true;
1740                                 }
1741                         }
1742                 }
1743
1744                 if (!changed)
1745                 {
1746                         /* Don't need to update this tuple */
1747                         continue;
1748                 }
1749
1750                 /*
1751                  * Construct modified tgargs bytea.
1752                  */
1753                 newlen = VARHDRSZ;
1754                 for (i = 0; i < tgnargs; i++)
1755                         newlen += strlen(arga[i]) + 1;
1756                 newtgargs = (bytea *) palloc(newlen);
1757                 VARATT_SIZEP(newtgargs) = newlen;
1758                 newlen = VARHDRSZ;
1759                 for (i = 0; i < tgnargs; i++)
1760                 {
1761                         strcpy(((char *) newtgargs) + newlen, arga[i]);
1762                         newlen += strlen(arga[i]) + 1;
1763                 }
1764
1765                 /*
1766                  * Build modified tuple.
1767                  */
1768                 for (i = 0; i < Natts_pg_trigger; i++)
1769                 {
1770                         values[i] = (Datum) 0;
1771                         replaces[i] = ' ';
1772                         nulls[i] = ' ';
1773                 }
1774                 values[Anum_pg_trigger_tgargs - 1] = PointerGetDatum(newtgargs);
1775                 replaces[Anum_pg_trigger_tgargs - 1] = 'r';
1776
1777                 tuple = heap_modifytuple(tuple, RelationGetDescr(tgrel), values, nulls, replaces);
1778
1779                 /*
1780                  * Update pg_trigger and its indexes
1781                  */
1782                 simple_heap_update(tgrel, &tuple->t_self, tuple);
1783
1784                 CatalogUpdateIndexes(tgrel, tuple);
1785
1786                 /*
1787                  * Invalidate trigger's relation's relcache entry so that other
1788                  * backends (and this one too!) are sent SI message to make them
1789                  * rebuild relcache entries.  (Ideally this should happen
1790                  * automatically...)
1791                  *
1792                  * We can skip this for triggers on relid itself, since that relcache
1793                  * flush will happen anyway due to the table or column rename.  We
1794                  * just need to catch the far ends of RI relationships.
1795                  */
1796                 pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1797                 if (pg_trigger->tgrelid != relid)
1798                         CacheInvalidateRelcacheByRelid(pg_trigger->tgrelid);
1799
1800                 /* free up our scratch memory */
1801                 pfree(newtgargs);
1802                 heap_freetuple(tuple);
1803         }
1804
1805         systable_endscan(trigscan);
1806
1807         heap_close(tgrel, RowExclusiveLock);
1808
1809         /*
1810          * Increment cmd counter to make updates visible; this is needed in case
1811          * the same tuple has to be updated again by next pass (can happen in case
1812          * of a self-referential FK relationship).
1813          */
1814         CommandCounterIncrement();
1815 }
1816
1817 /*
1818  * AlterTable
1819  *              Execute ALTER TABLE, which can be a list of subcommands
1820  *
1821  * ALTER TABLE is performed in three phases:
1822  *              1. Examine subcommands and perform pre-transformation checking.
1823  *              2. Update system catalogs.
1824  *              3. Scan table(s) to check new constraints, and optionally recopy
1825  *                 the data into new table(s).
1826  * Phase 3 is not performed unless one or more of the subcommands requires
1827  * it.  The intention of this design is to allow multiple independent
1828  * updates of the table schema to be performed with only one pass over the
1829  * data.
1830  *
1831  * ATPrepCmd performs phase 1.  A "work queue" entry is created for
1832  * each table to be affected (there may be multiple affected tables if the
1833  * commands traverse a table inheritance hierarchy).  Also we do preliminary
1834  * validation of the subcommands, including parse transformation of those
1835  * expressions that need to be evaluated with respect to the old table
1836  * schema.
1837  *
1838  * ATRewriteCatalogs performs phase 2 for each affected table (note that
1839  * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
1840  * Certain subcommands need to be performed before others to avoid
1841  * unnecessary conflicts; for example, DROP COLUMN should come before
1842  * ADD COLUMN.  Therefore phase 1 divides the subcommands into multiple
1843  * lists, one for each logical "pass" of phase 2.
1844  *
1845  * ATRewriteTables performs phase 3 for those tables that need it.
1846  *
1847  * Thanks to the magic of MVCC, an error anywhere along the way rolls back
1848  * the whole operation; we don't have to do anything special to clean up.
1849  */
1850 void
1851 AlterTable(AlterTableStmt *stmt)
1852 {
1853         ATController(relation_openrv(stmt->relation, AccessExclusiveLock),
1854                                  stmt->cmds,
1855                                  interpretInhOption(stmt->relation->inhOpt));
1856 }
1857
1858 /*
1859  * AlterTableInternal
1860  *
1861  * ALTER TABLE with target specified by OID
1862  */
1863 void
1864 AlterTableInternal(Oid relid, List *cmds, bool recurse)
1865 {
1866         ATController(relation_open(relid, AccessExclusiveLock),
1867                                  cmds,
1868                                  recurse);
1869 }
1870
1871 static void
1872 ATController(Relation rel, List *cmds, bool recurse)
1873 {
1874         List       *wqueue = NIL;
1875         ListCell   *lcmd;
1876
1877         /* Phase 1: preliminary examination of commands, create work queue */
1878         foreach(lcmd, cmds)
1879         {
1880                 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
1881
1882                 ATPrepCmd(&wqueue, rel, cmd, recurse, false);
1883         }
1884
1885         /* Close the relation, but keep lock until commit */
1886         relation_close(rel, NoLock);
1887
1888         /* Phase 2: update system catalogs */
1889         ATRewriteCatalogs(&wqueue);
1890
1891         /* Phase 3: scan/rewrite tables as needed */
1892         ATRewriteTables(&wqueue);
1893 }
1894
1895 /*
1896  * ATPrepCmd
1897  *
1898  * Traffic cop for ALTER TABLE Phase 1 operations, including simple
1899  * recursion and permission checks.
1900  *
1901  * Caller must have acquired AccessExclusiveLock on relation already.
1902  * This lock should be held until commit.
1903  */
1904 static void
1905 ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
1906                   bool recurse, bool recursing)
1907 {
1908         AlteredTableInfo *tab;
1909         int                     pass;
1910
1911         /* Find or create work queue entry for this table */
1912         tab = ATGetQueueEntry(wqueue, rel);
1913
1914         /*
1915          * Copy the original subcommand for each table.  This avoids conflicts
1916          * when different child tables need to make different parse
1917          * transformations (for example, the same column may have different column
1918          * numbers in different children).
1919          */
1920         cmd = copyObject(cmd);
1921
1922         /*
1923          * Do permissions checking, recursion to child tables if needed, and any
1924          * additional phase-1 processing needed.
1925          */
1926         switch (cmd->subtype)
1927         {
1928                 case AT_AddColumn:              /* ADD COLUMN */
1929                         ATSimplePermissions(rel, false);
1930                         /* Performs own recursion */
1931                         ATPrepAddColumn(wqueue, rel, recurse, cmd);
1932                         pass = AT_PASS_ADD_COL;
1933                         break;
1934                 case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
1935
1936                         /*
1937                          * We allow defaults on views so that INSERT into a view can have
1938                          * default-ish behavior.  This works because the rewriter
1939                          * substitutes default values into INSERTs before it expands
1940                          * rules.
1941                          */
1942                         ATSimplePermissions(rel, true);
1943                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1944                         /* No command-specific prep needed */
1945                         pass = AT_PASS_ADD_CONSTR;
1946                         break;
1947                 case AT_DropNotNull:    /* ALTER COLUMN DROP NOT NULL */
1948                         ATSimplePermissions(rel, false);
1949                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1950                         /* No command-specific prep needed */
1951                         pass = AT_PASS_DROP;
1952                         break;
1953                 case AT_SetNotNull:             /* ALTER COLUMN SET NOT NULL */
1954                         ATSimplePermissions(rel, false);
1955                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1956                         /* No command-specific prep needed */
1957                         pass = AT_PASS_ADD_CONSTR;
1958                         break;
1959                 case AT_SetStatistics:  /* ALTER COLUMN STATISTICS */
1960                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1961                         /* Performs own permission checks */
1962                         ATPrepSetStatistics(rel, cmd->name, cmd->def);
1963                         pass = AT_PASS_COL_ATTRS;
1964                         break;
1965                 case AT_SetStorage:             /* ALTER COLUMN STORAGE */
1966                         ATSimplePermissions(rel, false);
1967                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1968                         /* No command-specific prep needed */
1969                         pass = AT_PASS_COL_ATTRS;
1970                         break;
1971                 case AT_DropColumn:             /* DROP COLUMN */
1972                         ATSimplePermissions(rel, false);
1973                         /* Recursion occurs during execution phase */
1974                         /* No command-specific prep needed except saving recurse flag */
1975                         if (recurse)
1976                                 cmd->subtype = AT_DropColumnRecurse;
1977                         pass = AT_PASS_DROP;
1978                         break;
1979                 case AT_AddIndex:               /* ADD INDEX */
1980                         ATSimplePermissions(rel, false);
1981                         /* This command never recurses */
1982                         /* No command-specific prep needed */
1983                         pass = AT_PASS_ADD_INDEX;
1984                         break;
1985                 case AT_AddConstraint:  /* ADD CONSTRAINT */
1986                         ATSimplePermissions(rel, false);
1987
1988                         /*
1989                          * Currently we recurse only for CHECK constraints, never for
1990                          * foreign-key constraints.  UNIQUE/PKEY constraints won't be seen
1991                          * here.
1992                          */
1993                         if (IsA(cmd->def, Constraint))
1994                                 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1995                         /* No command-specific prep needed */
1996                         pass = AT_PASS_ADD_CONSTR;
1997                         break;
1998                 case AT_DropConstraint: /* DROP CONSTRAINT */
1999                         ATSimplePermissions(rel, false);
2000                         /* Performs own recursion */
2001                         ATPrepDropConstraint(wqueue, rel, recurse, cmd);
2002                         pass = AT_PASS_DROP;
2003                         break;
2004                 case AT_DropConstraintQuietly:  /* DROP CONSTRAINT for child */
2005                         ATSimplePermissions(rel, false);
2006                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
2007                         /* No command-specific prep needed */
2008                         pass = AT_PASS_DROP;
2009                         break;
2010                 case AT_AlterColumnType:                /* ALTER COLUMN TYPE */
2011                         ATSimplePermissions(rel, false);
2012                         /* Performs own recursion */
2013                         ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
2014                         pass = AT_PASS_ALTER_TYPE;
2015                         break;
2016                 case AT_ToastTable:             /* CREATE TOAST TABLE */
2017                         ATSimplePermissions(rel, false);
2018                         /* This command never recurses */
2019                         /* No command-specific prep needed */
2020                         pass = AT_PASS_MISC;
2021                         break;
2022                 case AT_ChangeOwner:    /* ALTER OWNER */
2023                         /* This command never recurses */
2024                         /* No command-specific prep needed */
2025                         pass = AT_PASS_MISC;
2026                         break;
2027                 case AT_ClusterOn:              /* CLUSTER ON */
2028                 case AT_DropCluster:    /* SET WITHOUT CLUSTER */
2029                         ATSimplePermissions(rel, false);
2030                         /* These commands never recurse */
2031                         /* No command-specific prep needed */
2032                         pass = AT_PASS_MISC;
2033                         break;
2034                 case AT_DropOids:               /* SET WITHOUT OIDS */
2035                         ATSimplePermissions(rel, false);
2036                         /* Performs own recursion */
2037                         if (rel->rd_rel->relhasoids)
2038                         {
2039                                 AlterTableCmd *dropCmd = makeNode(AlterTableCmd);
2040
2041                                 dropCmd->subtype = AT_DropColumn;
2042                                 dropCmd->name = pstrdup("oid");
2043                                 dropCmd->behavior = cmd->behavior;
2044                                 ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
2045                         }
2046                         pass = AT_PASS_DROP;
2047                         break;
2048                 case AT_SetTableSpace:  /* SET TABLESPACE */
2049                         /* This command never recurses */
2050                         ATPrepSetTableSpace(tab, rel, cmd->name);
2051                         pass = AT_PASS_MISC;    /* doesn't actually matter */
2052                         break;
2053                 case AT_EnableTrig:             /* ENABLE TRIGGER variants */
2054                 case AT_EnableTrigAll:
2055                 case AT_EnableTrigUser:
2056                 case AT_DisableTrig:    /* DISABLE TRIGGER variants */
2057                 case AT_DisableTrigAll:
2058                 case AT_DisableTrigUser:
2059                         ATSimplePermissions(rel, false);
2060                         /* These commands never recurse */
2061                         /* No command-specific prep needed */
2062                         pass = AT_PASS_MISC;
2063                         break;
2064                 default:                                /* oops */
2065                         elog(ERROR, "unrecognized alter table type: %d",
2066                                  (int) cmd->subtype);
2067                         pass = 0;                       /* keep compiler quiet */
2068                         break;
2069         }
2070
2071         /* Add the subcommand to the appropriate list for phase 2 */
2072         tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd);
2073 }
2074
2075 /*
2076  * ATRewriteCatalogs
2077  *
2078  * Traffic cop for ALTER TABLE Phase 2 operations.      Subcommands are
2079  * dispatched in a "safe" execution order (designed to avoid unnecessary
2080  * conflicts).
2081  */
2082 static void
2083 ATRewriteCatalogs(List **wqueue)
2084 {
2085         int                     pass;
2086         ListCell   *ltab;
2087
2088         /*
2089          * We process all the tables "in parallel", one pass at a time.  This is
2090          * needed because we may have to propagate work from one table to another
2091          * (specifically, ALTER TYPE on a foreign key's PK has to dispatch the
2092          * re-adding of the foreign key constraint to the other table).  Work can
2093          * only be propagated into later passes, however.
2094          */
2095         for (pass = 0; pass < AT_NUM_PASSES; pass++)
2096         {
2097                 /* Go through each table that needs to be processed */
2098                 foreach(ltab, *wqueue)
2099                 {
2100                         AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2101                         List       *subcmds = tab->subcmds[pass];
2102                         Relation        rel;
2103                         ListCell   *lcmd;
2104
2105                         if (subcmds == NIL)
2106                                 continue;
2107
2108                         /*
2109                          * Exclusive lock was obtained by phase 1, needn't get it again
2110                          */
2111                         rel = relation_open(tab->relid, NoLock);
2112
2113                         foreach(lcmd, subcmds)
2114                                 ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
2115
2116                         /*
2117                          * After the ALTER TYPE pass, do cleanup work (this is not done in
2118                          * ATExecAlterColumnType since it should be done only once if
2119                          * multiple columns of a table are altered).
2120                          */
2121                         if (pass == AT_PASS_ALTER_TYPE)
2122                                 ATPostAlterTypeCleanup(wqueue, tab);
2123
2124                         relation_close(rel, NoLock);
2125                 }
2126         }
2127
2128         /*
2129          * Do an implicit CREATE TOAST TABLE if we executed any subcommands that
2130          * might have added a column or changed column storage.
2131          */
2132         foreach(ltab, *wqueue)
2133         {
2134                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2135
2136                 if (tab->relkind == RELKIND_RELATION &&
2137                         (tab->subcmds[AT_PASS_ADD_COL] ||
2138                          tab->subcmds[AT_PASS_ALTER_TYPE] ||
2139                          tab->subcmds[AT_PASS_COL_ATTRS]))
2140                         AlterTableCreateToastTable(tab->relid, true);
2141         }
2142 }
2143
2144 /*
2145  * ATExecCmd: dispatch a subcommand to appropriate execution routine
2146  */
2147 static void
2148 ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
2149 {
2150         switch (cmd->subtype)
2151         {
2152                 case AT_AddColumn:              /* ADD COLUMN */
2153                         ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
2154                         break;
2155                 case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
2156                         ATExecColumnDefault(rel, cmd->name, cmd->def);
2157                         break;
2158                 case AT_DropNotNull:    /* ALTER COLUMN DROP NOT NULL */
2159                         ATExecDropNotNull(rel, cmd->name);
2160                         break;
2161                 case AT_SetNotNull:             /* ALTER COLUMN SET NOT NULL */
2162                         ATExecSetNotNull(tab, rel, cmd->name);
2163                         break;
2164                 case AT_SetStatistics:  /* ALTER COLUMN STATISTICS */
2165                         ATExecSetStatistics(rel, cmd->name, cmd->def);
2166                         break;
2167                 case AT_SetStorage:             /* ALTER COLUMN STORAGE */
2168                         ATExecSetStorage(rel, cmd->name, cmd->def);
2169                         break;
2170                 case AT_DropColumn:             /* DROP COLUMN */
2171                         ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
2172                         break;
2173                 case AT_DropColumnRecurse:              /* DROP COLUMN with recursion */
2174                         ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
2175                         break;
2176                 case AT_AddIndex:               /* ADD INDEX */
2177                         ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
2178                         break;
2179                 case AT_ReAddIndex:             /* ADD INDEX */
2180                         ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
2181                         break;
2182                 case AT_AddConstraint:  /* ADD CONSTRAINT */
2183                         ATExecAddConstraint(tab, rel, cmd->def);
2184                         break;
2185                 case AT_DropConstraint: /* DROP CONSTRAINT */
2186                         ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
2187                         break;
2188                 case AT_DropConstraintQuietly:  /* DROP CONSTRAINT for child */
2189                         ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
2190                         break;
2191                 case AT_AlterColumnType:                /* ALTER COLUMN TYPE */
2192                         ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
2193                         break;
2194                 case AT_ToastTable:             /* CREATE TOAST TABLE */
2195                         AlterTableCreateToastTable(RelationGetRelid(rel), false);
2196                         break;
2197                 case AT_ChangeOwner:    /* ALTER OWNER */
2198                         ATExecChangeOwner(RelationGetRelid(rel),
2199                                                           get_roleid_checked(cmd->name),
2200                                                           false);
2201                         break;
2202                 case AT_ClusterOn:              /* CLUSTER ON */
2203                         ATExecClusterOn(rel, cmd->name);
2204                         break;
2205                 case AT_DropCluster:    /* SET WITHOUT CLUSTER */
2206                         ATExecDropCluster(rel);
2207                         break;
2208                 case AT_DropOids:               /* SET WITHOUT OIDS */
2209
2210                         /*
2211                          * Nothing to do here; we'll have generated a DropColumn
2212                          * subcommand to do the real work
2213                          */
2214                         break;
2215                 case AT_SetTableSpace:  /* SET TABLESPACE */
2216
2217                         /*
2218                          * Nothing to do here; Phase 3 does the work
2219                          */
2220                         break;
2221                 case AT_EnableTrig:             /* ENABLE TRIGGER name */
2222                         ATExecEnableDisableTrigger(rel, cmd->name, true, false);
2223                         break;
2224                 case AT_DisableTrig:    /* DISABLE TRIGGER name */
2225                         ATExecEnableDisableTrigger(rel, cmd->name, false, false);
2226                         break;
2227                 case AT_EnableTrigAll:  /* ENABLE TRIGGER ALL */
2228                         ATExecEnableDisableTrigger(rel, NULL, true, false);
2229                         break;
2230                 case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */
2231                         ATExecEnableDisableTrigger(rel, NULL, false, false);
2232                         break;
2233                 case AT_EnableTrigUser: /* ENABLE TRIGGER USER */
2234                         ATExecEnableDisableTrigger(rel, NULL, true, true);
2235                         break;
2236                 case AT_DisableTrigUser:                /* DISABLE TRIGGER USER */
2237                         ATExecEnableDisableTrigger(rel, NULL, false, true);
2238                         break;
2239                 default:                                /* oops */
2240                         elog(ERROR, "unrecognized alter table type: %d",
2241                                  (int) cmd->subtype);
2242                         break;
2243         }
2244
2245         /*
2246          * Bump the command counter to ensure the next subcommand in the sequence
2247          * can see the changes so far
2248          */
2249         CommandCounterIncrement();
2250 }
2251
2252 /*
2253  * ATRewriteTables: ALTER TABLE phase 3
2254  */
2255 static void
2256 ATRewriteTables(List **wqueue)
2257 {
2258         ListCell   *ltab;
2259
2260         /* Go through each table that needs to be checked or rewritten */
2261         foreach(ltab, *wqueue)
2262         {
2263                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2264
2265                 /*
2266                  * We only need to rewrite the table if at least one column needs to
2267                  * be recomputed.
2268                  */
2269                 if (tab->newvals != NIL)
2270                 {
2271                         /* Build a temporary relation and copy data */
2272                         Oid                     OIDNewHeap;
2273                         char            NewHeapName[NAMEDATALEN];
2274                         Oid                     NewTableSpace;
2275                         Relation        OldHeap;
2276                         ObjectAddress object;
2277
2278                         OldHeap = heap_open(tab->relid, NoLock);
2279
2280                         /*
2281                          * We can never allow rewriting of shared or nailed-in-cache
2282                          * relations, because we can't support changing their relfilenode
2283                          * values.
2284                          */
2285                         if (OldHeap->rd_rel->relisshared || OldHeap->rd_isnailed)
2286                                 ereport(ERROR,
2287                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2288                                                  errmsg("cannot rewrite system relation \"%s\"",
2289                                                                 RelationGetRelationName(OldHeap))));
2290
2291                         /*
2292                          * Don't allow rewrite on temp tables of other backends ... their
2293                          * local buffer manager is not going to cope.
2294                          */
2295                         if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
2296                                 ereport(ERROR,
2297                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2298                                 errmsg("cannot rewrite temporary tables of other sessions")));
2299
2300                         /*
2301                          * Select destination tablespace (same as original unless user
2302                          * requested a change)
2303                          */
2304                         if (tab->newTableSpace)
2305                                 NewTableSpace = tab->newTableSpace;
2306                         else
2307                                 NewTableSpace = OldHeap->rd_rel->reltablespace;
2308
2309                         heap_close(OldHeap, NoLock);
2310
2311                         /*
2312                          * Create the new heap, using a temporary name in the same
2313                          * namespace as the existing table.  NOTE: there is some risk of
2314                          * collision with user relnames.  Working around this seems more
2315                          * trouble than it's worth; in particular, we can't create the new
2316                          * heap in a different namespace from the old, or we will have
2317                          * problems with the TEMP status of temp tables.
2318                          */
2319                         snprintf(NewHeapName, sizeof(NewHeapName),
2320                                          "pg_temp_%u", tab->relid);
2321
2322                         OIDNewHeap = make_new_heap(tab->relid, NewHeapName, NewTableSpace);
2323
2324                         /*
2325                          * Copy the heap data into the new table with the desired
2326                          * modifications, and test the current data within the table
2327                          * against new constraints generated by ALTER TABLE commands.
2328                          */
2329                         ATRewriteTable(tab, OIDNewHeap);
2330
2331                         /* Swap the physical files of the old and new heaps. */
2332                         swap_relation_files(tab->relid, OIDNewHeap);
2333
2334                         CommandCounterIncrement();
2335
2336                         /* Destroy new heap with old filenode */
2337                         object.classId = RelationRelationId;
2338                         object.objectId = OIDNewHeap;
2339                         object.objectSubId = 0;
2340
2341                         /*
2342                          * The new relation is local to our transaction and we know
2343                          * nothing depends on it, so DROP_RESTRICT should be OK.
2344                          */
2345                         performDeletion(&object, DROP_RESTRICT);
2346                         /* performDeletion does CommandCounterIncrement at end */
2347
2348                         /*
2349                          * Rebuild each index on the relation (but not the toast table,
2350                          * which is all-new anyway).  We do not need
2351                          * CommandCounterIncrement() because reindex_relation does it.
2352                          */
2353                         reindex_relation(tab->relid, false);
2354                 }
2355                 else
2356                 {
2357                         /*
2358                          * Test the current data within the table against new constraints
2359                          * generated by ALTER TABLE commands, but don't rebuild data.
2360                          */
2361                         if (tab->constraints != NIL)
2362                                 ATRewriteTable(tab, InvalidOid);
2363
2364                         /*
2365                          * If we had SET TABLESPACE but no reason to reconstruct tuples,
2366                          * just do a block-by-block copy.
2367                          */
2368                         if (tab->newTableSpace)
2369                                 ATExecSetTableSpace(tab->relid, tab->newTableSpace);
2370                 }
2371         }
2372
2373         /*
2374          * Foreign key constraints are checked in a final pass, since (a) it's
2375          * generally best to examine each one separately, and (b) it's at least
2376          * theoretically possible that we have changed both relations of the
2377          * foreign key, and we'd better have finished both rewrites before we try
2378          * to read the tables.
2379          */
2380         foreach(ltab, *wqueue)
2381         {
2382                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2383                 Relation        rel = NULL;
2384                 ListCell   *lcon;
2385
2386                 foreach(lcon, tab->constraints)
2387                 {
2388                         NewConstraint *con = lfirst(lcon);
2389
2390                         if (con->contype == CONSTR_FOREIGN)
2391                         {
2392                                 FkConstraint *fkconstraint = (FkConstraint *) con->qual;
2393                                 Relation        refrel;
2394
2395                                 if (rel == NULL)
2396                                 {
2397                                         /* Long since locked, no need for another */
2398                                         rel = heap_open(tab->relid, NoLock);
2399                                 }
2400
2401                                 refrel = heap_open(con->refrelid, RowShareLock);
2402
2403                                 validateForeignKeyConstraint(fkconstraint, rel, refrel);
2404
2405                                 heap_close(refrel, NoLock);
2406                         }
2407                 }
2408
2409                 if (rel)
2410                         heap_close(rel, NoLock);
2411         }
2412 }
2413
2414 /*
2415  * ATRewriteTable: scan or rewrite one table
2416  *
2417  * OIDNewHeap is InvalidOid if we don't need to rewrite
2418  */
2419 static void
2420 ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
2421 {
2422         Relation        oldrel;
2423         Relation        newrel;
2424         TupleDesc       oldTupDesc;
2425         TupleDesc       newTupDesc;
2426         bool            needscan = false;
2427         int                     i;
2428         ListCell   *l;
2429         EState     *estate;
2430
2431         /*
2432          * Open the relation(s).  We have surely already locked the existing
2433          * table.
2434          */
2435         oldrel = heap_open(tab->relid, NoLock);
2436         oldTupDesc = tab->oldDesc;
2437         newTupDesc = RelationGetDescr(oldrel);          /* includes all mods */
2438
2439         if (OidIsValid(OIDNewHeap))
2440                 newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
2441         else
2442                 newrel = NULL;
2443
2444         /*
2445          * If we need to rewrite the table, the operation has to be propagated to
2446          * tables that use this table's rowtype as a column type.
2447          *
2448          * (Eventually this will probably become true for scans as well, but at
2449          * the moment a composite type does not enforce any constraints, so it's
2450          * not necessary/appropriate to enforce them just during ALTER.)
2451          */
2452         if (newrel)
2453                 find_composite_type_dependencies(oldrel->rd_rel->reltype,
2454                                                                                  RelationGetRelationName(oldrel));
2455
2456         /*
2457          * Generate the constraint and default execution states
2458          */
2459
2460         estate = CreateExecutorState();
2461
2462         /* Build the needed expression execution states */
2463         foreach(l, tab->constraints)
2464         {
2465                 NewConstraint *con = lfirst(l);
2466
2467                 switch (con->contype)
2468                 {
2469                         case CONSTR_CHECK:
2470                                 needscan = true;
2471                                 con->qualstate = (List *)
2472                                         ExecPrepareExpr((Expr *) con->qual, estate);
2473                                 break;
2474                         case CONSTR_FOREIGN:
2475                                 /* Nothing to do here */
2476                                 break;
2477                         case CONSTR_NOTNULL:
2478                                 needscan = true;
2479                                 break;
2480                         default:
2481                                 elog(ERROR, "unrecognized constraint type: %d",
2482                                          (int) con->contype);
2483                 }
2484         }
2485
2486         foreach(l, tab->newvals)
2487         {
2488                 NewColumnValue *ex = lfirst(l);
2489
2490                 needscan = true;
2491
2492                 ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
2493         }
2494
2495         if (needscan)
2496         {
2497                 ExprContext *econtext;
2498                 Datum      *values;
2499                 bool       *isnull;
2500                 TupleTableSlot *oldslot;
2501                 TupleTableSlot *newslot;
2502                 HeapScanDesc scan;
2503                 HeapTuple       tuple;
2504                 MemoryContext oldCxt;
2505                 List       *dropped_attrs = NIL;
2506                 ListCell   *lc;
2507
2508                 econtext = GetPerTupleExprContext(estate);
2509
2510                 /*
2511                  * Make tuple slots for old and new tuples.  Note that even when the
2512                  * tuples are the same, the tupDescs might not be (consider ADD COLUMN
2513                  * without a default).
2514                  */
2515                 oldslot = MakeSingleTupleTableSlot(oldTupDesc);
2516                 newslot = MakeSingleTupleTableSlot(newTupDesc);
2517
2518                 /* Preallocate values/isnull arrays */
2519                 i = Max(newTupDesc->natts, oldTupDesc->natts);
2520                 values = (Datum *) palloc(i * sizeof(Datum));
2521                 isnull = (bool *) palloc(i * sizeof(bool));
2522                 memset(values, 0, i * sizeof(Datum));
2523                 memset(isnull, true, i * sizeof(bool));
2524
2525                 /*
2526                  * Any attributes that are dropped according to the new tuple
2527                  * descriptor can be set to NULL. We precompute the list of dropped
2528                  * attributes to avoid needing to do so in the per-tuple loop.
2529                  */
2530                 for (i = 0; i < newTupDesc->natts; i++)
2531                 {
2532                         if (newTupDesc->attrs[i]->attisdropped)
2533                                 dropped_attrs = lappend_int(dropped_attrs, i);
2534                 }
2535
2536                 /*
2537                  * Scan through the rows, generating a new row if needed and then
2538                  * checking all the constraints.
2539                  */
2540                 scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL);
2541
2542                 /*
2543                  * Switch to per-tuple memory context and reset it for each tuple
2544                  * produced, so we don't leak memory.
2545                  */
2546                 oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2547
2548                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2549                 {
2550                         if (newrel)
2551                         {
2552                                 Oid                     tupOid = InvalidOid;
2553
2554                                 /* Extract data from old tuple */
2555                                 heap_deform_tuple(tuple, oldTupDesc, values, isnull);
2556                                 if (oldTupDesc->tdhasoid)
2557                                         tupOid = HeapTupleGetOid(tuple);
2558
2559                                 /* Set dropped attributes to null in new tuple */
2560                                 foreach(lc, dropped_attrs)
2561                                         isnull[lfirst_int(lc)] = true;
2562
2563                                 /*
2564                                  * Process supplied expressions to replace selected columns.
2565                                  * Expression inputs come from the old tuple.
2566                                  */
2567                                 ExecStoreTuple(tuple, oldslot, InvalidBuffer, false);
2568                                 econtext->ecxt_scantuple = oldslot;
2569
2570                                 foreach(l, tab->newvals)
2571                                 {
2572                                         NewColumnValue *ex = lfirst(l);
2573
2574                                         values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate,
2575                                                                                                                   econtext,
2576                                                                                                          &isnull[ex->attnum - 1],
2577                                                                                                                   NULL);
2578                                 }
2579
2580                                 /*
2581                                  * Form the new tuple. Note that we don't explicitly pfree it,
2582                                  * since the per-tuple memory context will be reset shortly.
2583                                  */
2584                                 tuple = heap_form_tuple(newTupDesc, values, isnull);
2585
2586                                 /* Preserve OID, if any */
2587                                 if (newTupDesc->tdhasoid)
2588                                         HeapTupleSetOid(tuple, tupOid);
2589                         }
2590
2591                         /* Now check any constraints on the possibly-changed tuple */
2592                         ExecStoreTuple(tuple, newslot, InvalidBuffer, false);
2593                         econtext->ecxt_scantuple = newslot;
2594
2595                         foreach(l, tab->constraints)
2596                         {
2597                                 NewConstraint *con = lfirst(l);
2598
2599                                 switch (con->contype)
2600                                 {
2601                                         case CONSTR_CHECK:
2602                                                 if (!ExecQual(con->qualstate, econtext, true))
2603                                                         ereport(ERROR,
2604                                                                         (errcode(ERRCODE_CHECK_VIOLATION),
2605                                                                          errmsg("check constraint \"%s\" is violated by some row",
2606                                                                                         con->name)));
2607                                                 break;
2608                                         case CONSTR_NOTNULL:
2609                                                 {
2610                                                         Datum           d;
2611                                                         bool            isnull;
2612
2613                                                         d = heap_getattr(tuple, con->attnum, newTupDesc,
2614                                                                                          &isnull);
2615                                                         if (isnull)
2616                                                                 ereport(ERROR,
2617                                                                                 (errcode(ERRCODE_NOT_NULL_VIOLATION),
2618                                                                  errmsg("column \"%s\" contains null values",
2619                                                                                 get_attname(tab->relid,
2620                                                                                                         con->attnum))));
2621                                                 }
2622                                                 break;
2623                                         case CONSTR_FOREIGN:
2624                                                 /* Nothing to do here */
2625                                                 break;
2626                                         default:
2627                                                 elog(ERROR, "unrecognized constraint type: %d",
2628                                                          (int) con->contype);
2629                                 }
2630                         }
2631
2632                         /* Write the tuple out to the new relation */
2633                         if (newrel)
2634                                 simple_heap_insert(newrel, tuple);
2635
2636                         ResetExprContext(econtext);
2637
2638                         CHECK_FOR_INTERRUPTS();
2639                 }
2640
2641                 MemoryContextSwitchTo(oldCxt);
2642                 heap_endscan(scan);
2643         }
2644
2645         FreeExecutorState(estate);
2646
2647         heap_close(oldrel, NoLock);
2648         if (newrel)
2649                 heap_close(newrel, NoLock);
2650 }
2651
2652 /*
2653  * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue
2654  */
2655 static AlteredTableInfo *
2656 ATGetQueueEntry(List **wqueue, Relation rel)
2657 {
2658         Oid                     relid = RelationGetRelid(rel);
2659         AlteredTableInfo *tab;
2660         ListCell   *ltab;
2661
2662         foreach(ltab, *wqueue)
2663         {
2664                 tab = (AlteredTableInfo *) lfirst(ltab);
2665                 if (tab->relid == relid)
2666                         return tab;
2667         }
2668
2669         /*
2670          * Not there, so add it.  Note that we make a copy of the relation's
2671          * existing descriptor before anything interesting can happen to it.
2672          */
2673         tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
2674         tab->relid = relid;
2675         tab->relkind = rel->rd_rel->relkind;
2676         tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel));
2677
2678         *wqueue = lappend(*wqueue, tab);
2679
2680         return tab;
2681 }
2682
2683 /*
2684  * ATSimplePermissions
2685  *
2686  * - Ensure that it is a relation (or possibly a view)
2687  * - Ensure this user is the owner
2688  * - Ensure that it is not a system table
2689  */
2690 static void
2691 ATSimplePermissions(Relation rel, bool allowView)
2692 {
2693         if (rel->rd_rel->relkind != RELKIND_RELATION)
2694         {
2695                 if (allowView)
2696                 {
2697                         if (rel->rd_rel->relkind != RELKIND_VIEW)
2698                                 ereport(ERROR,
2699                                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2700                                                  errmsg("\"%s\" is not a table or view",
2701                                                                 RelationGetRelationName(rel))));
2702                 }
2703                 else
2704                         ereport(ERROR,
2705                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2706                                          errmsg("\"%s\" is not a table",
2707                                                         RelationGetRelationName(rel))));
2708         }
2709
2710         /* Permissions checks */
2711         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
2712                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
2713                                            RelationGetRelationName(rel));
2714
2715         if (!allowSystemTableMods && IsSystemRelation(rel))
2716                 ereport(ERROR,
2717                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2718                                  errmsg("permission denied: \"%s\" is a system catalog",
2719                                                 RelationGetRelationName(rel))));
2720 }
2721
2722 /*
2723  * ATSimpleRecursion
2724  *
2725  * Simple table recursion sufficient for most ALTER TABLE operations.
2726  * All direct and indirect children are processed in an unspecified order.
2727  * Note that if a child inherits from the original table via multiple
2728  * inheritance paths, it will be visited just once.
2729  */
2730 static void
2731 ATSimpleRecursion(List **wqueue, Relation rel,
2732                                   AlterTableCmd *cmd, bool recurse)
2733 {
2734         /*
2735          * Propagate to children if desired.  Non-table relations never have
2736          * children, so no need to search in that case.
2737          */
2738         if (recurse && rel->rd_rel->relkind == RELKIND_RELATION)
2739         {
2740                 Oid                     relid = RelationGetRelid(rel);
2741                 ListCell   *child;
2742                 List       *children;
2743
2744                 /* this routine is actually in the planner */
2745                 children = find_all_inheritors(relid);
2746
2747                 /*
2748                  * find_all_inheritors does the recursive search of the inheritance
2749                  * hierarchy, so all we have to do is process all of the relids in the
2750                  * list that it returns.
2751                  */
2752                 foreach(child, children)
2753                 {
2754                         Oid                     childrelid = lfirst_oid(child);
2755                         Relation        childrel;
2756
2757                         if (childrelid == relid)
2758                                 continue;
2759                         childrel = relation_open(childrelid, AccessExclusiveLock);
2760                         ATPrepCmd(wqueue, childrel, cmd, false, true);
2761                         relation_close(childrel, NoLock);
2762                 }
2763         }
2764 }
2765
2766 /*
2767  * ATOneLevelRecursion
2768  *
2769  * Here, we visit only direct inheritance children.  It is expected that
2770  * the command's prep routine will recurse again to find indirect children.
2771  * When using this technique, a multiply-inheriting child will be visited
2772  * multiple times.
2773  */
2774 static void
2775 ATOneLevelRecursion(List **wqueue, Relation rel,
2776                                         AlterTableCmd *cmd)
2777 {
2778         Oid                     relid = RelationGetRelid(rel);
2779         ListCell   *child;
2780         List       *children;
2781
2782         /* this routine is actually in the planner */
2783         children = find_inheritance_children(relid);
2784
2785         foreach(child, children)
2786         {
2787                 Oid                     childrelid = lfirst_oid(child);
2788                 Relation        childrel;
2789
2790                 childrel = relation_open(childrelid, AccessExclusiveLock);
2791                 ATPrepCmd(wqueue, childrel, cmd, true, true);
2792                 relation_close(childrel, NoLock);
2793         }
2794 }
2795
2796
2797 /*
2798  * find_composite_type_dependencies
2799  *
2800  * Check to see if a table's rowtype is being used as a column in some
2801  * other table (possibly nested several levels deep in composite types!).
2802  * Eventually, we'd like to propagate the check or rewrite operation
2803  * into other such tables, but for now, just error out if we find any.
2804  *
2805  * We assume that functions and views depending on the type are not reasons
2806  * to reject the ALTER.  (How safe is this really?)
2807  */
2808 static void
2809 find_composite_type_dependencies(Oid typeOid, const char *origTblName)
2810 {
2811         Relation        depRel;
2812         ScanKeyData key[2];
2813         SysScanDesc depScan;
2814         HeapTuple       depTup;
2815
2816         /*
2817          * We scan pg_depend to find those things that depend on the rowtype. (We
2818          * assume we can ignore refobjsubid for a rowtype.)
2819          */
2820         depRel = heap_open(DependRelationId, AccessShareLock);
2821
2822         ScanKeyInit(&key[0],
2823                                 Anum_pg_depend_refclassid,
2824                                 BTEqualStrategyNumber, F_OIDEQ,
2825                                 ObjectIdGetDatum(TypeRelationId));
2826         ScanKeyInit(&key[1],
2827                                 Anum_pg_depend_refobjid,
2828                                 BTEqualStrategyNumber, F_OIDEQ,
2829                                 ObjectIdGetDatum(typeOid));
2830
2831         depScan = systable_beginscan(depRel, DependReferenceIndexId, true,
2832                                                                  SnapshotNow, 2, key);
2833
2834         while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
2835         {
2836                 Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
2837                 Relation        rel;
2838                 Form_pg_attribute att;
2839
2840                 /* Ignore dependees that aren't user columns of relations */
2841                 /* (we assume system columns are never of rowtypes) */
2842                 if (pg_depend->classid != RelationRelationId ||
2843                         pg_depend->objsubid <= 0)
2844                         continue;
2845
2846                 rel = relation_open(pg_depend->objid, AccessShareLock);
2847                 att = rel->rd_att->attrs[pg_depend->objsubid - 1];
2848
2849                 if (rel->rd_rel->relkind == RELKIND_RELATION)
2850                 {
2851                         ereport(ERROR,
2852                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2853                                          errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype",
2854                                                         origTblName,
2855                                                         RelationGetRelationName(rel),
2856                                                         NameStr(att->attname))));
2857                 }
2858                 else if (OidIsValid(rel->rd_rel->reltype))
2859                 {
2860                         /*
2861                          * A view or composite type itself isn't a problem, but we must
2862                          * recursively check for indirect dependencies via its rowtype.
2863                          */
2864                         find_composite_type_dependencies(rel->rd_rel->reltype,
2865                                                                                          origTblName);
2866                 }
2867
2868                 relation_close(rel, AccessShareLock);
2869         }
2870
2871         systable_endscan(depScan);
2872
2873         relation_close(depRel, AccessShareLock);
2874 }
2875
2876
2877 /*
2878  * ALTER TABLE ADD COLUMN
2879  *
2880  * Adds an additional attribute to a relation making the assumption that
2881  * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
2882  * AT_AddColumn AlterTableCmd by analyze.c and added as independent
2883  * AlterTableCmd's.
2884  */
2885 static void
2886 ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
2887                                 AlterTableCmd *cmd)
2888 {
2889         /*
2890          * Recurse to add the column to child classes, if requested.
2891          *
2892          * We must recurse one level at a time, so that multiply-inheriting
2893          * children are visited the right number of times and end up with the
2894          * right attinhcount.
2895          */
2896         if (recurse)
2897         {
2898                 AlterTableCmd *childCmd = copyObject(cmd);
2899                 ColumnDef  *colDefChild = (ColumnDef *) childCmd->def;
2900
2901                 /* Child should see column as singly inherited */
2902                 colDefChild->inhcount = 1;
2903                 colDefChild->is_local = false;
2904                 /* and don't make a support dependency on the child */
2905                 colDefChild->support = NULL;
2906
2907                 ATOneLevelRecursion(wqueue, rel, childCmd);
2908         }
2909         else
2910         {
2911                 /*
2912                  * If we are told not to recurse, there had better not be any child
2913                  * tables; else the addition would put them out of step.
2914                  */
2915                 if (find_inheritance_children(RelationGetRelid(rel)) != NIL)
2916                         ereport(ERROR,
2917                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2918                                          errmsg("column must be added to child tables too")));
2919         }
2920 }
2921
2922 static void
2923 ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
2924                                 ColumnDef *colDef)
2925 {
2926         Oid                     myrelid = RelationGetRelid(rel);
2927         Relation        pgclass,
2928                                 attrdesc;
2929         HeapTuple       reltup;
2930         HeapTuple       attributeTuple;
2931         Form_pg_attribute attribute;
2932         FormData_pg_attribute attributeD;
2933         int                     i;
2934         int                     minattnum,
2935                                 maxatts;
2936         HeapTuple       typeTuple;
2937         Oid                     typeOid;
2938         Form_pg_type tform;
2939         Expr       *defval;
2940
2941         attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
2942
2943         /*
2944          * Are we adding the column to a recursion child?  If so, check whether to
2945          * merge with an existing definition for the column.
2946          */
2947         if (colDef->inhcount > 0)
2948         {
2949                 HeapTuple       tuple;
2950
2951                 /* Does child already have a column by this name? */
2952                 tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname);
2953                 if (HeapTupleIsValid(tuple))
2954                 {
2955                         Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
2956
2957                         /* Okay if child matches by type */
2958                         if (typenameTypeId(colDef->typename) != childatt->atttypid ||
2959                                 colDef->typename->typmod != childatt->atttypmod)
2960                                 ereport(ERROR,
2961                                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
2962                                                  errmsg("child table \"%s\" has different type for column \"%s\"",
2963                                                         RelationGetRelationName(rel), colDef->colname)));
2964
2965                         /* Bump the existing child att's inhcount */
2966                         childatt->attinhcount++;
2967                         simple_heap_update(attrdesc, &tuple->t_self, tuple);
2968                         CatalogUpdateIndexes(attrdesc, tuple);
2969
2970                         heap_freetuple(tuple);
2971
2972                         /* Inform the user about the merge */
2973                         ereport(NOTICE,
2974                           (errmsg("merging definition of column \"%s\" for child \"%s\"",
2975                                           colDef->colname, RelationGetRelationName(rel))));
2976
2977                         heap_close(attrdesc, RowExclusiveLock);
2978                         return;
2979                 }
2980         }
2981
2982         pgclass = heap_open(RelationRelationId, RowExclusiveLock);
2983
2984         reltup = SearchSysCacheCopy(RELOID,
2985                                                                 ObjectIdGetDatum(myrelid),
2986                                                                 0, 0, 0);
2987         if (!HeapTupleIsValid(reltup))
2988                 elog(ERROR, "cache lookup failed for relation %u", myrelid);
2989
2990         /*
2991          * this test is deliberately not attisdropped-aware, since if one tries to
2992          * add a column matching a dropped column name, it's gonna fail anyway.
2993          */
2994         if (SearchSysCacheExists(ATTNAME,
2995                                                          ObjectIdGetDatum(myrelid),
2996                                                          PointerGetDatum(colDef->colname),
2997                                                          0, 0))
2998                 ereport(ERROR,
2999                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
3000                                  errmsg("column \"%s\" of relation \"%s\" already exists",
3001                                                 colDef->colname, RelationGetRelationName(rel))));
3002
3003         minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts;
3004         maxatts = minattnum + 1;
3005         if (maxatts > MaxHeapAttributeNumber)
3006                 ereport(ERROR,
3007                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
3008                                  errmsg("tables can have at most %d columns",
3009                                                 MaxHeapAttributeNumber)));
3010         i = minattnum + 1;
3011
3012         typeTuple = typenameType(colDef->typename);
3013         tform = (Form_pg_type) GETSTRUCT(typeTuple);
3014         typeOid = HeapTupleGetOid(typeTuple);
3015
3016         /* make sure datatype is legal for a column */
3017         CheckAttributeType(colDef->colname, typeOid);
3018
3019         attributeTuple = heap_addheader(Natts_pg_attribute,
3020                                                                         false,
3021                                                                         ATTRIBUTE_TUPLE_SIZE,
3022                                                                         (void *) &attributeD);
3023
3024         attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple);
3025
3026         attribute->attrelid = myrelid;
3027         namestrcpy(&(attribute->attname), colDef->colname);
3028         attribute->atttypid = typeOid;
3029         attribute->attstattarget = -1;
3030         attribute->attlen = tform->typlen;
3031         attribute->attcacheoff = -1;
3032         attribute->atttypmod = colDef->typename->typmod;
3033         attribute->attnum = i;
3034         attribute->attbyval = tform->typbyval;
3035         attribute->attndims = list_length(colDef->typename->arrayBounds);
3036         attribute->attstorage = tform->typstorage;
3037         attribute->attalign = tform->typalign;
3038         attribute->attnotnull = colDef->is_not_null;
3039         attribute->atthasdef = false;
3040         attribute->attisdropped = false;
3041         attribute->attislocal = colDef->is_local;
3042         attribute->attinhcount = colDef->inhcount;
3043
3044         ReleaseSysCache(typeTuple);
3045
3046         simple_heap_insert(attrdesc, attributeTuple);
3047
3048         /* Update indexes on pg_attribute */
3049         CatalogUpdateIndexes(attrdesc, attributeTuple);
3050
3051         heap_close(attrdesc, RowExclusiveLock);
3052
3053         /*
3054          * Update number of attributes in pg_class tuple
3055          */
3056         ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts;
3057
3058         simple_heap_update(pgclass, &reltup->t_self, reltup);
3059
3060         /* keep catalog indexes current */
3061         CatalogUpdateIndexes(pgclass, reltup);
3062
3063         heap_freetuple(reltup);
3064
3065         heap_close(pgclass, RowExclusiveLock);
3066
3067         /* Make the attribute's catalog entry visible */
3068         CommandCounterIncrement();
3069
3070         /*
3071          * Store the DEFAULT, if any, in the catalogs
3072          */
3073         if (colDef->raw_default)
3074         {
3075                 RawColumnDefault *rawEnt;
3076
3077                 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3078                 rawEnt->attnum = attribute->attnum;
3079                 rawEnt->raw_default = copyObject(colDef->raw_default);
3080
3081                 /*
3082                  * This function is intended for CREATE TABLE, so it processes a
3083                  * _list_ of defaults, but we just do one.
3084                  */
3085                 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3086
3087                 /* Make the additional catalog changes visible */
3088                 CommandCounterIncrement();
3089         }
3090
3091         /*
3092          * Tell Phase 3 to fill in the default expression, if there is one.
3093          *
3094          * If there is no default, Phase 3 doesn't have to do anything, because
3095          * that effectively means that the default is NULL.  The heap tuple access
3096          * routines always check for attnum > # of attributes in tuple, and return
3097          * NULL if so, so without any modification of the tuple data we will get
3098          * the effect of NULL values in the new column.
3099          *
3100          * An exception occurs when the new column is of a domain type: the domain
3101          * might have a NOT NULL constraint, or a check constraint that indirectly
3102          * rejects nulls.  If there are any domain constraints then we construct
3103          * an explicit NULL default value that will be passed through
3104          * CoerceToDomain processing.  (This is a tad inefficient, since it causes
3105          * rewriting the table which we really don't have to do, but the present
3106          * design of domain processing doesn't offer any simple way of checking
3107          * the constraints more directly.)
3108          *
3109          * Note: we use build_column_default, and not just the cooked default
3110          * returned by AddRelationRawConstraints, so that the right thing happens
3111          * when a datatype's default applies.
3112          */
3113         defval = (Expr *) build_column_default(rel, attribute->attnum);
3114
3115         if (!defval && GetDomainConstraints(typeOid) != NIL)
3116         {
3117                 Oid                     basetype = getBaseType(typeOid);
3118
3119                 defval = (Expr *) makeNullConst(basetype);
3120                 defval = (Expr *) coerce_to_target_type(NULL,
3121                                                                                                 (Node *) defval,
3122                                                                                                 basetype,
3123                                                                                                 typeOid,
3124                                                                                                 colDef->typename->typmod,
3125                                                                                                 COERCION_ASSIGNMENT,
3126                                                                                                 COERCE_IMPLICIT_CAST);
3127                 if (defval == NULL)             /* should not happen */
3128                         elog(ERROR, "failed to coerce base type to domain");
3129         }
3130
3131         if (defval)
3132         {
3133                 NewColumnValue *newval;
3134
3135                 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
3136                 newval->attnum = attribute->attnum;
3137                 newval->expr = defval;
3138
3139                 tab->newvals = lappend(tab->newvals, newval);
3140         }
3141
3142         /*
3143          * Add needed dependency entries for the new column.
3144          */
3145         add_column_datatype_dependency(myrelid, i, attribute->atttypid);
3146         if (colDef->support != NULL)
3147                 add_column_support_dependency(myrelid, i, colDef->support);
3148 }
3149
3150 /*
3151  * Install a column's dependency on its datatype.
3152  */
3153 static void
3154 add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
3155 {
3156         ObjectAddress myself,
3157                                 referenced;
3158
3159         myself.classId = RelationRelationId;
3160         myself.objectId = relid;
3161         myself.objectSubId = attnum;
3162         referenced.classId = TypeRelationId;
3163         referenced.objectId = typid;
3164         referenced.objectSubId = 0;
3165         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3166 }
3167
3168 /*
3169  * Install a dependency for a column's supporting relation (serial sequence).
3170  */
3171 static void
3172 add_column_support_dependency(Oid relid, int32 attnum, RangeVar *support)
3173 {
3174         ObjectAddress colobject,
3175                                 suppobject;
3176
3177         colobject.classId = RelationRelationId;
3178         colobject.objectId = relid;
3179         colobject.objectSubId = attnum;
3180         suppobject.classId = RelationRelationId;
3181         suppobject.objectId = RangeVarGetRelid(support, false);
3182         suppobject.objectSubId = 0;
3183         recordDependencyOn(&suppobject, &colobject, DEPENDENCY_INTERNAL);
3184 }
3185
3186 /*
3187  * ALTER TABLE ALTER COLUMN DROP NOT NULL
3188  */
3189 static void
3190 ATExecDropNotNull(Relation rel, const char *colName)
3191 {
3192         HeapTuple       tuple;
3193         AttrNumber      attnum;
3194         Relation        attr_rel;
3195         List       *indexoidlist;
3196         ListCell   *indexoidscan;
3197
3198         /*
3199          * lookup the attribute
3200          */
3201         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3202
3203         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3204
3205         if (!HeapTupleIsValid(tuple))
3206                 ereport(ERROR,
3207                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3208                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3209                                                 colName, RelationGetRelationName(rel))));
3210
3211         attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3212
3213         /* Prevent them from altering a system attribute */
3214         if (attnum <= 0)
3215                 ereport(ERROR,
3216                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3217                                  errmsg("cannot alter system column \"%s\"",
3218                                                 colName)));
3219
3220         /*
3221          * Check that the attribute is not in a primary key
3222          */
3223
3224         /* Loop over all indexes on the relation */
3225         indexoidlist = RelationGetIndexList(rel);
3226
3227         foreach(indexoidscan, indexoidlist)
3228         {
3229                 Oid                     indexoid = lfirst_oid(indexoidscan);
3230                 HeapTuple       indexTuple;
3231                 Form_pg_index indexStruct;
3232                 int                     i;
3233
3234                 indexTuple = SearchSysCache(INDEXRELID,
3235                                                                         ObjectIdGetDatum(indexoid),
3236                                                                         0, 0, 0);
3237                 if (!HeapTupleIsValid(indexTuple))
3238                         elog(ERROR, "cache lookup failed for index %u", indexoid);
3239                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
3240
3241                 /* If the index is not a primary key, skip the check */
3242                 if (indexStruct->indisprimary)
3243                 {
3244                         /*
3245                          * Loop over each attribute in the primary key and see if it
3246                          * matches the to-be-altered attribute
3247                          */
3248                         for (i = 0; i < indexStruct->indnatts; i++)
3249                         {
3250                                 if (indexStruct->indkey.values[i] == attnum)
3251                                         ereport(ERROR,
3252                                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3253                                                          errmsg("column \"%s\" is in a primary key",
3254                                                                         colName)));
3255                         }
3256                 }
3257
3258                 ReleaseSysCache(indexTuple);
3259         }
3260
3261         list_free(indexoidlist);
3262
3263         /*
3264          * Okay, actually perform the catalog change ... if needed
3265          */
3266         if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3267         {
3268                 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE;
3269
3270                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3271
3272                 /* keep the system catalog indexes current */
3273                 CatalogUpdateIndexes(attr_rel, tuple);
3274         }
3275
3276         heap_close(attr_rel, RowExclusiveLock);
3277 }
3278
3279 /*
3280  * ALTER TABLE ALTER COLUMN SET NOT NULL
3281  */
3282 static void
3283 ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
3284                                  const char *colName)
3285 {
3286         HeapTuple       tuple;
3287         AttrNumber      attnum;
3288         Relation        attr_rel;
3289         NewConstraint *newcon;
3290
3291         /*
3292          * lookup the attribute
3293          */
3294         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3295
3296         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3297
3298         if (!HeapTupleIsValid(tuple))
3299                 ereport(ERROR,
3300                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3301                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3302                                                 colName, RelationGetRelationName(rel))));
3303
3304         attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3305
3306         /* Prevent them from altering a system attribute */
3307         if (attnum <= 0)
3308                 ereport(ERROR,
3309                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3310                                  errmsg("cannot alter system column \"%s\"",
3311                                                 colName)));
3312
3313         /*
3314          * Okay, actually perform the catalog change ... if needed
3315          */
3316         if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3317         {
3318                 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE;
3319
3320                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3321
3322                 /* keep the system catalog indexes current */
3323                 CatalogUpdateIndexes(attr_rel, tuple);
3324
3325                 /* Tell Phase 3 to test the constraint */
3326                 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3327                 newcon->contype = CONSTR_NOTNULL;
3328                 newcon->attnum = attnum;
3329                 newcon->name = "NOT NULL";
3330
3331                 tab->constraints = lappend(tab->constraints, newcon);
3332         }
3333
3334         heap_close(attr_rel, RowExclusiveLock);
3335 }
3336
3337 /*
3338  * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT
3339  */
3340 static void
3341 ATExecColumnDefault(Relation rel, const char *colName,
3342                                         Node *newDefault)
3343 {
3344         AttrNumber      attnum;
3345
3346         /*
3347          * get the number of the attribute
3348          */
3349         attnum = get_attnum(RelationGetRelid(rel), colName);
3350         if (attnum == InvalidAttrNumber)
3351                 ereport(ERROR,
3352                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3353                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3354                                                 colName, RelationGetRelationName(rel))));
3355
3356         /* Prevent them from altering a system attribute */
3357         if (attnum <= 0)
3358                 ereport(ERROR,
3359                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3360                                  errmsg("cannot alter system column \"%s\"",
3361                                                 colName)));
3362
3363         /*
3364          * Remove any old default for the column.  We use RESTRICT here for
3365          * safety, but at present we do not expect anything to depend on the
3366          * default.
3367          */
3368         RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false);
3369
3370         if (newDefault)
3371         {
3372                 /* SET DEFAULT */
3373                 RawColumnDefault *rawEnt;
3374
3375                 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3376                 rawEnt->attnum = attnum;
3377                 rawEnt->raw_default = newDefault;
3378
3379                 /*
3380                  * This function is intended for CREATE TABLE, so it processes a
3381                  * _list_ of defaults, but we just do one.
3382                  */
3383                 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3384         }
3385 }
3386
3387 /*
3388  * ALTER TABLE ALTER COLUMN SET STATISTICS
3389  */
3390 static void
3391 ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue)
3392 {
3393         /*
3394          * We do our own permission checking because (a) we want to allow SET
3395          * STATISTICS on indexes (for expressional index columns), and (b) we want
3396          * to allow SET STATISTICS on system catalogs without requiring
3397          * allowSystemTableMods to be turned on.
3398          */
3399         if (rel->rd_rel->relkind != RELKIND_RELATION &&
3400                 rel->rd_rel->relkind != RELKIND_INDEX)
3401                 ereport(ERROR,
3402                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3403                                  errmsg("\"%s\" is not a table or index",
3404                                                 RelationGetRelationName(rel))));
3405
3406         /* Permissions checks */
3407         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
3408                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
3409                                            RelationGetRelationName(rel));
3410 }
3411
3412 static void
3413 ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
3414 {
3415         int                     newtarget;
3416         Relation        attrelation;
3417         HeapTuple       tuple;
3418         Form_pg_attribute attrtuple;
3419
3420         Assert(IsA(newValue, Integer));
3421         newtarget = intVal(newValue);
3422
3423         /*
3424          * Limit target to a sane range
3425          */
3426         if (newtarget < -1)
3427         {
3428                 ereport(ERROR,
3429                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3430                                  errmsg("statistics target %d is too low",
3431                                                 newtarget)));
3432         }
3433         else if (newtarget > 1000)
3434         {
3435                 newtarget = 1000;
3436                 ereport(WARNING,
3437                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3438                                  errmsg("lowering statistics target to %d",
3439                                                 newtarget)));
3440         }
3441
3442         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3443
3444         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3445
3446         if (!HeapTupleIsValid(tuple))
3447                 ereport(ERROR,
3448                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3449                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3450                                                 colName, RelationGetRelationName(rel))));
3451         attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3452
3453         if (attrtuple->attnum <= 0)
3454                 ereport(ERROR,
3455                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3456                                  errmsg("cannot alter system column \"%s\"",
3457                                                 colName)));
3458
3459         attrtuple->attstattarget = newtarget;
3460
3461         simple_heap_update(attrelation, &tuple->t_self, tuple);
3462
3463         /* keep system catalog indexes current */
3464         CatalogUpdateIndexes(attrelation, tuple);
3465
3466         heap_freetuple(tuple);
3467
3468         heap_close(attrelation, RowExclusiveLock);
3469 }
3470
3471 /*
3472  * ALTER TABLE ALTER COLUMN SET STORAGE
3473  */
3474 static void
3475 ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
3476 {
3477         char       *storagemode;
3478         char            newstorage;
3479         Relation        attrelation;
3480         HeapTuple       tuple;
3481         Form_pg_attribute attrtuple;
3482
3483         Assert(IsA(newValue, String));
3484         storagemode = strVal(newValue);
3485
3486         if (pg_strcasecmp(storagemode, "plain") == 0)
3487                 newstorage = 'p';
3488         else if (pg_strcasecmp(storagemode, "external") == 0)
3489                 newstorage = 'e';
3490         else if (pg_strcasecmp(storagemode, "extended") == 0)
3491                 newstorage = 'x';
3492         else if (pg_strcasecmp(storagemode, "main") == 0)
3493                 newstorage = 'm';
3494         else
3495         {
3496                 ereport(ERROR,
3497                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3498                                  errmsg("invalid storage type \"%s\"",
3499                                                 storagemode)));
3500                 newstorage = 0;                 /* keep compiler quiet */
3501         }
3502
3503         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3504
3505         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3506
3507         if (!HeapTupleIsValid(tuple))
3508                 ereport(ERROR,
3509                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3510                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3511                                                 colName, RelationGetRelationName(rel))));
3512         attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3513
3514         if (attrtuple->attnum <= 0)
3515                 ereport(ERROR,
3516                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3517                                  errmsg("cannot alter system column \"%s\"",
3518                                                 colName)));
3519
3520         /*
3521          * safety check: do not allow toasted storage modes unless column datatype
3522          * is TOAST-aware.
3523          */
3524         if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid))
3525                 attrtuple->attstorage = newstorage;
3526         else
3527                 ereport(ERROR,
3528                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3529                                  errmsg("column data type %s can only have storage PLAIN",
3530                                                 format_type_be(attrtuple->atttypid))));
3531
3532         simple_heap_update(attrelation, &tuple->t_self, tuple);
3533
3534         /* keep system catalog indexes current */
3535         CatalogUpdateIndexes(attrelation, tuple);
3536
3537         heap_freetuple(tuple);
3538
3539         heap_close(attrelation, RowExclusiveLock);
3540 }
3541
3542
3543 /*
3544  * ALTER TABLE DROP COLUMN
3545  *
3546  * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism,
3547  * because we have to decide at runtime whether to recurse or not depending
3548  * on whether attinhcount goes to zero or not.  (We can't check this in a
3549  * static pre-pass because it won't handle multiple inheritance situations
3550  * correctly.)  Since DROP COLUMN doesn't need to create any work queue
3551  * entries for Phase 3, it's okay to recurse internally in this routine
3552  * without considering the work queue.
3553  */
3554 static void
3555 ATExecDropColumn(Relation rel, const char *colName,
3556                                  DropBehavior behavior,
3557                                  bool recurse, bool recursing)
3558 {
3559         HeapTuple       tuple;
3560         Form_pg_attribute targetatt;
3561         AttrNumber      attnum;
3562         List       *children;
3563         ObjectAddress object;
3564
3565         /* At top level, permission check was done in ATPrepCmd, else do it */
3566         if (recursing)
3567                 ATSimplePermissions(rel, false);
3568
3569         /*
3570          * get the number of the attribute
3571          */
3572         tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
3573         if (!HeapTupleIsValid(tuple))
3574                 ereport(ERROR,
3575                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3576                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3577                                                 colName, RelationGetRelationName(rel))));
3578         targetatt = (Form_pg_attribute) GETSTRUCT(tuple);
3579
3580         attnum = targetatt->attnum;
3581
3582         /* Can't drop a system attribute, except OID */
3583         if (attnum <= 0 && attnum != ObjectIdAttributeNumber)
3584                 ereport(ERROR,
3585                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3586                                  errmsg("cannot drop system column \"%s\"",
3587                                                 colName)));
3588
3589         /* Don't drop inherited columns */
3590         if (targetatt->attinhcount > 0 && !recursing)
3591                 ereport(ERROR,
3592                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3593                                  errmsg("cannot drop inherited column \"%s\"",
3594                                                 colName)));
3595
3596         ReleaseSysCache(tuple);
3597
3598         /*
3599          * Propagate to children as appropriate.  Unlike most other ALTER
3600          * routines, we have to do this one level of recursion at a time; we can't
3601          * use find_all_inheritors to do it in one pass.
3602          */
3603         children = find_inheritance_children(RelationGetRelid(rel));
3604
3605         if (children)
3606         {
3607                 Relation        attr_rel;
3608                 ListCell   *child;
3609
3610                 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3611                 foreach(child, children)
3612                 {
3613                         Oid                     childrelid = lfirst_oid(child);
3614                         Relation        childrel;
3615                         Form_pg_attribute childatt;
3616
3617                         childrel = heap_open(childrelid, AccessExclusiveLock);
3618
3619                         tuple = SearchSysCacheCopyAttName(childrelid, colName);
3620                         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
3621                                 elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
3622                                          colName, childrelid);
3623                         childatt = (Form_pg_attribute) GETSTRUCT(tuple);
3624
3625                         if (childatt->attinhcount <= 0)         /* shouldn't happen */
3626                                 elog(ERROR, "relation %u has non-inherited attribute \"%s\"",
3627                                          childrelid, colName);
3628
3629                         if (recurse)
3630                         {
3631                                 /*
3632                                  * If the child column has other definition sources, just
3633                                  * decrement its inheritance count; if not, recurse to delete
3634                                  * it.
3635                                  */
3636                                 if (childatt->attinhcount == 1 && !childatt->attislocal)
3637                                 {
3638                                         /* Time to delete this child column, too */
3639                                         ATExecDropColumn(childrel, colName, behavior, true, true);
3640                                 }
3641                                 else
3642                                 {
3643                                         /* Child column must survive my deletion */
3644                                         childatt->attinhcount--;
3645
3646                                         simple_heap_update(attr_rel, &tuple->t_self, tuple);
3647
3648                                         /* keep the system catalog indexes current */
3649                                         CatalogUpdateIndexes(attr_rel, tuple);
3650
3651                                         /* Make update visible */
3652                                         CommandCounterIncrement();
3653                                 }
3654                         }
3655                         else
3656                         {
3657                                 /*
3658                                  * If we were told to drop ONLY in this table (no recursion),
3659                                  * we need to mark the inheritors' attribute as locally
3660                                  * defined rather than inherited.
3661                                  */
3662                                 childatt->attinhcount--;
3663                                 childatt->attislocal = true;
3664
3665                                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3666
3667                                 /* keep the system catalog indexes current */
3668                                 CatalogUpdateIndexes(attr_rel, tuple);
3669
3670                                 /* Make update visible */
3671                                 CommandCounterIncrement();
3672                         }
3673
3674                         heap_freetuple(tuple);
3675
3676                         heap_close(childrel, NoLock);
3677                 }
3678                 heap_close(attr_rel, RowExclusiveLock);
3679         }
3680
3681         /*
3682          * Perform the actual column deletion
3683          */
3684         object.classId = RelationRelationId;
3685         object.objectId = RelationGetRelid(rel);
3686         object.objectSubId = attnum;
3687
3688         performDeletion(&object, behavior);
3689
3690         /*
3691          * If we dropped the OID column, must adjust pg_class.relhasoids
3692          */
3693         if (attnum == ObjectIdAttributeNumber)
3694         {
3695                 Relation        class_rel;
3696                 Form_pg_class tuple_class;
3697
3698                 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
3699
3700                 tuple = SearchSysCacheCopy(RELOID,
3701                                                                    ObjectIdGetDatum(RelationGetRelid(rel)),
3702                                                                    0, 0, 0);
3703                 if (!HeapTupleIsValid(tuple))
3704                         elog(ERROR, "cache lookup failed for relation %u",
3705                                  RelationGetRelid(rel));
3706                 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
3707
3708                 tuple_class->relhasoids = false;
3709                 simple_heap_update(class_rel, &tuple->t_self, tuple);
3710
3711                 /* Keep the catalog indexes up to date */
3712                 CatalogUpdateIndexes(class_rel, tuple);
3713
3714                 heap_close(class_rel, RowExclusiveLock);
3715         }
3716 }
3717
3718 /*
3719  * ALTER TABLE ADD INDEX
3720  *
3721  * There is no such command in the grammar, but the parser converts UNIQUE
3722  * and PRIMARY KEY constraints into AT_AddIndex subcommands.  This lets us
3723  * schedule creation of the index at the appropriate time during ALTER.
3724  */
3725 static void
3726 ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
3727                            IndexStmt *stmt, bool is_rebuild)
3728 {
3729         bool            check_rights;
3730         bool            skip_build;
3731         bool            quiet;
3732
3733         Assert(IsA(stmt, IndexStmt));
3734
3735         /* suppress schema rights check when rebuilding existing index */
3736         check_rights = !is_rebuild;
3737         /* skip index build if phase 3 will have to rewrite table anyway */
3738         skip_build = (tab->newvals != NIL);
3739         /* suppress notices when rebuilding existing index */
3740         quiet = is_rebuild;
3741
3742         DefineIndex(stmt->relation, /* relation */
3743                                 stmt->idxname,  /* index name */
3744                                 InvalidOid,             /* no predefined OID */
3745                                 stmt->accessMethod,             /* am name */
3746                                 stmt->tableSpace,
3747                                 stmt->indexParams,              /* parameters */
3748                                 (Expr *) stmt->whereClause,
3749                                 stmt->rangetable,
3750                                 stmt->unique,
3751                                 stmt->primary,
3752                                 stmt->isconstraint,
3753                                 true,                   /* is_alter_table */
3754                                 check_rights,
3755                                 skip_build,
3756                                 quiet);
3757 }
3758
3759 /*
3760  * ALTER TABLE ADD CONSTRAINT
3761  */
3762 static void
3763 ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
3764 {
3765         switch (nodeTag(newConstraint))
3766         {
3767                 case T_Constraint:
3768                         {
3769                                 Constraint *constr = (Constraint *) newConstraint;
3770
3771                                 /*
3772                                  * Currently, we only expect to see CONSTR_CHECK nodes
3773                                  * arriving here (see the preprocessing done in
3774                                  * parser/analyze.c).  Use a switch anyway to make it easier
3775                                  * to add more code later.
3776                                  */
3777                                 switch (constr->contype)
3778                                 {
3779                                         case CONSTR_CHECK:
3780                                                 {
3781                                                         List       *newcons;
3782                                                         ListCell   *lcon;
3783
3784                                                         /*
3785                                                          * Call AddRelationRawConstraints to do the work.
3786                                                          * It returns a list of cooked constraints.
3787                                                          */
3788                                                         newcons = AddRelationRawConstraints(rel, NIL,
3789                                                                                                                  list_make1(constr));
3790                                                         /* Add each constraint to Phase 3's queue */
3791                                                         foreach(lcon, newcons)
3792                                                         {
3793                                                                 CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
3794                                                                 NewConstraint *newcon;
3795
3796                                                                 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3797                                                                 newcon->name = ccon->name;
3798                                                                 newcon->contype = ccon->contype;
3799                                                                 newcon->attnum = ccon->attnum;
3800                                                                 /* ExecQual wants implicit-AND format */
3801                                                                 newcon->qual = (Node *)
3802                                                                         make_ands_implicit((Expr *) ccon->expr);
3803
3804                                                                 tab->constraints = lappend(tab->constraints,
3805                                                                                                                    newcon);
3806                                                         }
3807                                                         break;
3808                                                 }
3809                                         default:
3810                                                 elog(ERROR, "unrecognized constraint type: %d",
3811                                                          (int) constr->contype);
3812                                 }
3813                                 break;
3814                         }
3815                 case T_FkConstraint:
3816                         {
3817                                 FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
3818
3819                                 /*
3820                                  * Assign or validate constraint name
3821                                  */
3822                                 if (fkconstraint->constr_name)
3823                                 {
3824                                         if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
3825                                                                                          RelationGetRelid(rel),
3826                                                                                          RelationGetNamespace(rel),
3827                                                                                          fkconstraint->constr_name))
3828                                                 ereport(ERROR,
3829                                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
3830                                                                  errmsg("constraint \"%s\" for relation \"%s\" already exists",
3831                                                                                 fkconstraint->constr_name,
3832                                                                                 RelationGetRelationName(rel))));
3833                                 }
3834                                 else
3835                                         fkconstraint->constr_name =
3836                                                 ChooseConstraintName(RelationGetRelationName(rel),
3837                                                                         strVal(linitial(fkconstraint->fk_attrs)),
3838                                                                                          "fkey",
3839                                                                                          RelationGetNamespace(rel),
3840                                                                                          NIL);
3841
3842                                 ATAddForeignKeyConstraint(tab, rel, fkconstraint);
3843
3844                                 break;
3845                         }
3846                 default:
3847                         elog(ERROR, "unrecognized node type: %d",
3848                                  (int) nodeTag(newConstraint));
3849         }
3850 }
3851
3852 /*
3853  * Add a foreign-key constraint to a single table
3854  *
3855  * Subroutine for ATExecAddConstraint.  Must already hold exclusive
3856  * lock on the rel, and have done appropriate validity/permissions checks
3857  * for it.
3858  */
3859 static void
3860 ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
3861                                                   FkConstraint *fkconstraint)
3862 {
3863         Relation        pkrel;
3864         AclResult       aclresult;
3865         int16           pkattnum[INDEX_MAX_KEYS];
3866         int16           fkattnum[INDEX_MAX_KEYS];
3867         Oid                     pktypoid[INDEX_MAX_KEYS];
3868         Oid                     fktypoid[INDEX_MAX_KEYS];
3869         Oid                     opclasses[INDEX_MAX_KEYS];
3870         int                     i;
3871         int                     numfks,
3872                                 numpks;
3873         Oid                     indexOid;
3874         Oid                     constrOid;
3875
3876         /*
3877          * Grab an exclusive lock on the pk table, so that someone doesn't delete
3878          * rows out from under us. (Although a lesser lock would do for that
3879          * purpose, we'll need exclusive lock anyway to add triggers to the pk
3880          * table; trying to start with a lesser lock will just create a risk of
3881          * deadlock.)
3882          */
3883         pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
3884
3885         /*
3886          * Validity and permissions checks
3887          *
3888          * Note: REFERENCES permissions checks are redundant with CREATE TRIGGER,
3889          * but we may as well error out sooner instead of later.
3890          */
3891         if (pkrel->rd_rel->relkind != RELKIND_RELATION)
3892                 ereport(ERROR,
3893                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3894                                  errmsg("referenced relation \"%s\" is not a table",
3895                                                 RelationGetRelationName(pkrel))));
3896
3897         aclresult = pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(),
3898                                                                   ACL_REFERENCES);
3899         if (aclresult != ACLCHECK_OK)
3900                 aclcheck_error(aclresult, ACL_KIND_CLASS,
3901                                            RelationGetRelationName(pkrel));
3902
3903         if (!allowSystemTableMods && IsSystemRelation(pkrel))
3904                 ereport(ERROR,
3905                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
3906                                  errmsg("permission denied: \"%s\" is a system catalog",
3907                                                 RelationGetRelationName(pkrel))));
3908
3909         aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
3910                                                                   ACL_REFERENCES);
3911         if (aclresult != ACLCHECK_OK)
3912                 aclcheck_error(aclresult, ACL_KIND_CLASS,
3913                                            RelationGetRelationName(rel));
3914
3915         /*
3916          * Disallow reference from permanent table to temp table or vice versa.
3917          * (The ban on perm->temp is for fairly obvious reasons.  The ban on
3918          * temp->perm is because other backends might need to run the RI triggers
3919          * on the perm table, but they can't reliably see tuples the owning
3920          * backend has created in the temp table, because non-shared buffers are
3921          * used for temp tables.)
3922          */
3923         if (isTempNamespace(RelationGetNamespace(pkrel)))
3924         {
3925                 if (!isTempNamespace(RelationGetNamespace(rel)))
3926                         ereport(ERROR,
3927                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3928                                          errmsg("cannot reference temporary table from permanent table constraint")));
3929         }
3930         else
3931         {
3932                 if (isTempNamespace(RelationGetNamespace(rel)))
3933                         ereport(ERROR,
3934                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3935                                          errmsg("cannot reference permanent table from temporary table constraint")));
3936         }
3937
3938         /*
3939          * Look up the referencing attributes to make sure they exist, and record
3940          * their attnums and type OIDs.
3941          */
3942         MemSet(pkattnum, 0, sizeof(pkattnum));
3943         MemSet(fkattnum, 0, sizeof(fkattnum));
3944         MemSet(pktypoid, 0, sizeof(pktypoid));
3945         MemSet(fktypoid, 0, sizeof(fktypoid));
3946         MemSet(opclasses, 0, sizeof(opclasses));
3947
3948         numfks = transformColumnNameList(RelationGetRelid(rel),
3949                                                                          fkconstraint->fk_attrs,
3950                                                                          fkattnum, fktypoid);
3951
3952         /*
3953          * If the attribute list for the referenced table was omitted, lookup the
3954          * definition of the primary key and use it.  Otherwise, validate the
3955          * supplied attribute list.  In either case, discover the index OID and
3956          * index opclasses, and the attnums and type OIDs of the attributes.
3957          */
3958         if (fkconstraint->pk_attrs == NIL)
3959         {
3960                 numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
3961                                                                                         &fkconstraint->pk_attrs,
3962                                                                                         pkattnum, pktypoid,
3963                                                                                         opclasses);
3964         }
3965         else
3966         {
3967                 numpks = transformColumnNameList(RelationGetRelid(pkrel),
3968                                                                                  fkconstraint->pk_attrs,
3969                                                                                  pkattnum, pktypoid);
3970                 /* Look for an index matching the column list */
3971                 indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum,
3972                                                                                    opclasses);
3973         }
3974
3975         /* Be sure referencing and referenced column types are comparable */
3976         if (numfks != numpks)
3977                 ereport(ERROR,
3978                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
3979                                  errmsg("number of referencing and referenced columns for foreign key disagree")));
3980
3981         for (i = 0; i < numpks; i++)
3982         {
3983                 /*
3984                  * pktypoid[i] is the primary key table's i'th key's type fktypoid[i]
3985                  * is the foreign key table's i'th key's type
3986                  *
3987                  * Note that we look for an operator with the PK type on the left;
3988                  * when the types are different this is critical because the PK index
3989                  * will need operators with the indexkey on the left. (Ordinarily both
3990                  * commutator operators will exist if either does, but we won't get
3991                  * the right answer from the test below on opclass membership unless
3992                  * we select the proper operator.)
3993                  */
3994                 Operator        o = oper(list_make1(makeString("=")),
3995                                                          pktypoid[i], fktypoid[i], true);
3996
3997                 if (o == NULL)
3998                         ereport(ERROR,
3999                                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
4000                                          errmsg("foreign key constraint \"%s\" "
4001                                                         "cannot be implemented",
4002                                                         fkconstraint->constr_name),
4003                                          errdetail("Key columns \"%s\" and \"%s\" "
4004                                                            "are of incompatible types: %s and %s.",
4005                                                            strVal(list_nth(fkconstraint->fk_attrs, i)),
4006                                                            strVal(list_nth(fkconstraint->pk_attrs, i)),
4007                                                            format_type_be(fktypoid[i]),
4008                                                            format_type_be(pktypoid[i]))));
4009
4010                 /*
4011                  * Check that the found operator is compatible with the PK index, and
4012                  * generate a warning if not, since otherwise costly seqscans will be
4013                  * incurred to check FK validity.
4014                  */
4015                 if (!op_in_opclass(oprid(o), opclasses[i]))
4016                         ereport(WARNING,
4017                                         (errmsg("foreign key constraint \"%s\" "
4018                                                         "will require costly sequential scans",
4019                                                         fkconstraint->constr_name),
4020                                          errdetail("Key columns \"%s\" and \"%s\" "
4021                                                            "are of different types: %s and %s.",
4022                                                            strVal(list_nth(fkconstraint->fk_attrs, i)),
4023                                                            strVal(list_nth(fkconstraint->pk_attrs, i)),
4024                                                            format_type_be(fktypoid[i]),
4025                                                            format_type_be(pktypoid[i]))));
4026
4027                 ReleaseSysCache(o);
4028         }
4029
4030         /*
4031          * Tell Phase 3 to check that the constraint is satisfied by existing rows
4032          * (we can skip this during table creation).
4033          */
4034         if (!fkconstraint->skip_validation)
4035         {
4036                 NewConstraint *newcon;
4037
4038                 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
4039                 newcon->name = fkconstraint->constr_name;
4040                 newcon->contype = CONSTR_FOREIGN;
4041                 newcon->refrelid = RelationGetRelid(pkrel);
4042                 newcon->qual = (Node *) fkconstraint;
4043
4044                 tab->constraints = lappend(tab->constraints, newcon);
4045         }
4046
4047         /*
4048          * Record the FK constraint in pg_constraint.
4049          */
4050         constrOid = CreateConstraintEntry(fkconstraint->constr_name,
4051                                                                           RelationGetNamespace(rel),
4052                                                                           CONSTRAINT_FOREIGN,
4053                                                                           fkconstraint->deferrable,
4054                                                                           fkconstraint->initdeferred,
4055                                                                           RelationGetRelid(rel),
4056                                                                           fkattnum,
4057                                                                           numfks,
4058                                                                           InvalidOid,           /* not a domain
4059                                                                                                                  * constraint */
4060                                                                           RelationGetRelid(pkrel),
4061                                                                           pkattnum,
4062                                                                           numpks,
4063                                                                           fkconstraint->fk_upd_action,
4064                                                                           fkconstraint->fk_del_action,
4065                                                                           fkconstraint->fk_matchtype,
4066                                                                           indexOid,
4067                                                                           NULL,         /* no check constraint */
4068                                                                           NULL,
4069                                                                           NULL);
4070
4071         /*
4072          * Create the triggers that will enforce the constraint.
4073          */
4074         createForeignKeyTriggers(rel, fkconstraint, constrOid);
4075
4076         /*
4077          * Close pk table, but keep lock until we've committed.
4078          */
4079         heap_close(pkrel, NoLock);
4080 }
4081
4082
4083 /*
4084  * transformColumnNameList - transform list of column names
4085  *
4086  * Lookup each name and return its attnum and type OID
4087  */
4088 static int
4089 transformColumnNameList(Oid relId, List *colList,
4090                                                 int16 *attnums, Oid *atttypids)
4091 {
4092         ListCell   *l;
4093         int                     attnum;
4094
4095         attnum = 0;
4096         foreach(l, colList)
4097         {
4098                 char       *attname = strVal(lfirst(l));
4099                 HeapTuple       atttuple;
4100
4101                 atttuple = SearchSysCacheAttName(relId, attname);
4102                 if (!HeapTupleIsValid(atttuple))
4103                         ereport(ERROR,
4104                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
4105                                          errmsg("column \"%s\" referenced in foreign key constraint does not exist",
4106                                                         attname)));
4107                 if (attnum >= INDEX_MAX_KEYS)
4108                         ereport(ERROR,
4109                                         (errcode(ERRCODE_TOO_MANY_COLUMNS),
4110                                          errmsg("cannot have more than %d keys in a foreign key",
4111                                                         INDEX_MAX_KEYS)));
4112                 attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum;
4113                 atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
4114                 ReleaseSysCache(atttuple);
4115                 attnum++;
4116         }
4117
4118         return attnum;
4119 }
4120
4121 /*
4122  * transformFkeyGetPrimaryKey -
4123  *
4124  *      Look up the names, attnums, and types of the primary key attributes
4125  *      for the pkrel.  Also return the index OID and index opclasses of the
4126  *      index supporting the primary key.
4127  *
4128  *      All parameters except pkrel are output parameters.      Also, the function
4129  *      return value is the number of attributes in the primary key.
4130  *
4131  *      Used when the column list in the REFERENCES specification is omitted.
4132  */
4133 static int
4134 transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
4135                                                    List **attnamelist,
4136                                                    int16 *attnums, Oid *atttypids,
4137                                                    Oid *opclasses)
4138 {
4139         List       *indexoidlist;
4140         ListCell   *indexoidscan;
4141         HeapTuple       indexTuple = NULL;
4142         Form_pg_index indexStruct = NULL;
4143         Datum           indclassDatum;
4144         bool            isnull;
4145         oidvector  *indclass;
4146         int                     i;
4147
4148         /*
4149          * Get the list of index OIDs for the table from the relcache, and look up
4150          * each one in the pg_index syscache until we find one marked primary key
4151          * (hopefully there isn't more than one such).
4152          */
4153         *indexOid = InvalidOid;
4154
4155         indexoidlist = RelationGetIndexList(pkrel);
4156
4157         foreach(indexoidscan, indexoidlist)
4158         {
4159                 Oid                     indexoid = lfirst_oid(indexoidscan);
4160
4161                 indexTuple = SearchSysCache(INDEXRELID,
4162                                                                         ObjectIdGetDatum(indexoid),
4163                                                                         0, 0, 0);
4164                 if (!HeapTupleIsValid(indexTuple))
4165                         elog(ERROR, "cache lookup failed for index %u", indexoid);
4166                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4167                 if (indexStruct->indisprimary)
4168                 {
4169                         *indexOid = indexoid;
4170                         break;
4171                 }
4172                 ReleaseSysCache(indexTuple);
4173         }
4174
4175         list_free(indexoidlist);
4176
4177         /*
4178          * Check that we found it
4179          */
4180         if (!OidIsValid(*indexOid))
4181                 ereport(ERROR,
4182                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
4183                                  errmsg("there is no primary key for referenced table \"%s\"",
4184                                                 RelationGetRelationName(pkrel))));
4185
4186         /* Must get indclass the hard way */
4187         indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4188                                                                         Anum_pg_index_indclass, &isnull);
4189         Assert(!isnull);
4190         indclass = (oidvector *) DatumGetPointer(indclassDatum);
4191
4192         /*
4193          * Now build the list of PK attributes from the indkey definition (we
4194          * assume a primary key cannot have expressional elements)
4195          */
4196         *attnamelist = NIL;
4197         for (i = 0; i < indexStruct->indnatts; i++)
4198         {
4199                 int                     pkattno = indexStruct->indkey.values[i];
4200
4201                 attnums[i] = pkattno;
4202                 atttypids[i] = attnumTypeId(pkrel, pkattno);
4203                 opclasses[i] = indclass->values[i];
4204                 *attnamelist = lappend(*attnamelist,
4205                            makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
4206         }
4207
4208         ReleaseSysCache(indexTuple);
4209
4210         return i;
4211 }
4212
4213 /*
4214  * transformFkeyCheckAttrs -
4215  *
4216  *      Make sure that the attributes of a referenced table belong to a unique
4217  *      (or primary key) constraint.  Return the OID of the index supporting
4218  *      the constraint, as well as the opclasses associated with the index
4219  *      columns.
4220  */
4221 static Oid
4222 transformFkeyCheckAttrs(Relation pkrel,
4223                                                 int numattrs, int16 *attnums,
4224                                                 Oid *opclasses) /* output parameter */
4225 {
4226         Oid                     indexoid = InvalidOid;
4227         bool            found = false;
4228         List       *indexoidlist;
4229         ListCell   *indexoidscan;
4230
4231         /*
4232          * Get the list of index OIDs for the table from the relcache, and look up
4233          * each one in the pg_index syscache, and match unique indexes to the list
4234          * of attnums we are given.
4235          */
4236         indexoidlist = RelationGetIndexList(pkrel);
4237
4238         foreach(indexoidscan, indexoidlist)
4239         {
4240                 HeapTuple       indexTuple;
4241                 Form_pg_index indexStruct;
4242                 int                     i,
4243                                         j;
4244
4245                 indexoid = lfirst_oid(indexoidscan);
4246                 indexTuple = SearchSysCache(INDEXRELID,
4247                                                                         ObjectIdGetDatum(indexoid),
4248                                                                         0, 0, 0);
4249                 if (!HeapTupleIsValid(indexTuple))
4250                         elog(ERROR, "cache lookup failed for index %u", indexoid);
4251                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4252
4253                 /*
4254                  * Must have the right number of columns; must be unique and not a
4255                  * partial index; forget it if there are any expressions, too
4256                  */
4257                 if (indexStruct->indnatts == numattrs &&
4258                         indexStruct->indisunique &&
4259                         heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
4260                         heap_attisnull(indexTuple, Anum_pg_index_indexprs))
4261                 {
4262                         /* Must get indclass the hard way */
4263                         Datum           indclassDatum;
4264                         bool            isnull;
4265                         oidvector  *indclass;
4266
4267                         indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4268                                                                                         Anum_pg_index_indclass, &isnull);
4269                         Assert(!isnull);
4270                         indclass = (oidvector *) DatumGetPointer(indclassDatum);
4271
4272                         /*
4273                          * The given attnum list may match the index columns in any order.
4274                          * Check that each list is a subset of the other.
4275                          */
4276                         for (i = 0; i < numattrs; i++)
4277                         {
4278                                 found = false;
4279                                 for (j = 0; j < numattrs; j++)
4280                                 {
4281                                         if (attnums[i] == indexStruct->indkey.values[j])
4282                                         {
4283                                                 found = true;
4284                                                 break;
4285                                         }
4286                                 }
4287                                 if (!found)
4288                                         break;
4289                         }
4290                         if (found)
4291                         {
4292                                 for (i = 0; i < numattrs; i++)
4293                                 {
4294                                         found = false;
4295                                         for (j = 0; j < numattrs; j++)
4296                                         {
4297                                                 if (attnums[j] == indexStruct->indkey.values[i])
4298                                                 {
4299                                                         opclasses[j] = indclass->values[i];
4300                                                         found = true;
4301                                                         break;
4302                                                 }
4303                                         }
4304                                         if (!found)
4305                                                 break;
4306                                 }
4307                         }
4308                 }
4309                 ReleaseSysCache(indexTuple);
4310                 if (found)
4311                         break;
4312         }
4313
4314         if (!found)
4315                 ereport(ERROR,
4316                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4317                                  errmsg("there is no unique constraint matching given keys for referenced table \"%s\"",
4318                                                 RelationGetRelationName(pkrel))));
4319
4320         list_free(indexoidlist);
4321
4322         return indexoid;
4323 }
4324
4325 /*
4326  * Scan the existing rows in a table to verify they meet a proposed FK
4327  * constraint.
4328  *
4329  * Caller must have opened and locked both relations.
4330  */
4331 static void
4332 validateForeignKeyConstraint(FkConstraint *fkconstraint,
4333                                                          Relation rel,
4334                                                          Relation pkrel)
4335 {
4336         HeapScanDesc scan;
4337         HeapTuple       tuple;
4338         Trigger         trig;
4339         ListCell   *list;
4340         int                     count;
4341
4342         /*
4343          * See if we can do it with a single LEFT JOIN query.  A FALSE result
4344          * indicates we must proceed with the fire-the-trigger method.
4345          */
4346         if (RI_Initial_Check(fkconstraint, rel, pkrel))
4347                 return;
4348
4349         /*
4350          * Scan through each tuple, calling RI_FKey_check_ins (insert trigger) as
4351          * if that tuple had just been inserted.  If any of those fail, it should
4352          * ereport(ERROR) and that's that.
4353          */
4354         MemSet(&trig, 0, sizeof(trig));
4355         trig.tgoid = InvalidOid;
4356         trig.tgname = fkconstraint->constr_name;
4357         trig.tgenabled = TRUE;
4358         trig.tgisconstraint = TRUE;
4359         trig.tgconstrrelid = RelationGetRelid(pkrel);
4360         trig.tgdeferrable = FALSE;
4361         trig.tginitdeferred = FALSE;
4362
4363         trig.tgargs = (char **) palloc(sizeof(char *) *
4364                                                                    (4 + list_length(fkconstraint->fk_attrs)
4365                                                                         + list_length(fkconstraint->pk_attrs)));
4366
4367         trig.tgargs[0] = trig.tgname;
4368         trig.tgargs[1] = RelationGetRelationName(rel);
4369         trig.tgargs[2] = RelationGetRelationName(pkrel);
4370         trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
4371         count = 4;
4372         foreach(list, fkconstraint->fk_attrs)
4373         {
4374                 char       *fk_at = strVal(lfirst(list));
4375
4376                 trig.tgargs[count] = fk_at;
4377                 count += 2;
4378         }
4379         count = 5;
4380         foreach(list, fkconstraint->pk_attrs)
4381         {
4382                 char       *pk_at = strVal(lfirst(list));
4383
4384                 trig.tgargs[count] = pk_at;
4385                 count += 2;
4386         }
4387         trig.tgnargs = count - 1;
4388
4389         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
4390
4391         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
4392         {
4393                 FunctionCallInfoData fcinfo;
4394                 TriggerData trigdata;
4395
4396                 /*
4397                  * Make a call to the trigger function
4398                  *
4399                  * No parameters are passed, but we do set a context
4400                  */
4401                 MemSet(&fcinfo, 0, sizeof(fcinfo));
4402
4403                 /*
4404                  * We assume RI_FKey_check_ins won't look at flinfo...
4405                  */
4406                 trigdata.type = T_TriggerData;
4407                 trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
4408                 trigdata.tg_relation = rel;
4409                 trigdata.tg_trigtuple = tuple;
4410                 trigdata.tg_newtuple = NULL;
4411                 trigdata.tg_trigger = &trig;
4412                 trigdata.tg_trigtuplebuf = scan->rs_cbuf;
4413                 trigdata.tg_newtuplebuf = InvalidBuffer;
4414
4415                 fcinfo.context = (Node *) &trigdata;
4416
4417                 RI_FKey_check_ins(&fcinfo);
4418         }
4419
4420         heap_endscan(scan);
4421
4422         pfree(trig.tgargs);
4423 }
4424
4425 static void
4426 CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
4427                                          ObjectAddress *constrobj, ObjectAddress *trigobj,
4428                                          bool on_insert)
4429 {
4430         CreateTrigStmt *fk_trigger;
4431         ListCell   *fk_attr;
4432         ListCell   *pk_attr;
4433
4434         fk_trigger = makeNode(CreateTrigStmt);
4435         fk_trigger->trigname = fkconstraint->constr_name;
4436         fk_trigger->relation = myRel;
4437         fk_trigger->before = false;
4438         fk_trigger->row = true;
4439
4440         /* Either ON INSERT or ON UPDATE */
4441         if (on_insert)
4442         {
4443                 fk_trigger->funcname = SystemFuncName("RI_FKey_check_ins");
4444                 fk_trigger->actions[0] = 'i';
4445         }
4446         else
4447         {
4448                 fk_trigger->funcname = SystemFuncName("RI_FKey_check_upd");
4449                 fk_trigger->actions[0] = 'u';
4450         }
4451         fk_trigger->actions[1] = '\0';
4452
4453         fk_trigger->isconstraint = true;
4454         fk_trigger->deferrable = fkconstraint->deferrable;
4455         fk_trigger->initdeferred = fkconstraint->initdeferred;
4456         fk_trigger->constrrel = fkconstraint->pktable;
4457
4458         fk_trigger->args = NIL;
4459         fk_trigger->args = lappend(fk_trigger->args,
4460                                                            makeString(fkconstraint->constr_name));
4461         fk_trigger->args = lappend(fk_trigger->args,
4462                                                            makeString(myRel->relname));
4463         fk_trigger->args = lappend(fk_trigger->args,
4464                                                            makeString(fkconstraint->pktable->relname));
4465         fk_trigger->args = lappend(fk_trigger->args,
4466                                 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4467         if (list_length(fkconstraint->fk_attrs) != list_length(fkconstraint->pk_attrs))
4468                 ereport(ERROR,
4469                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4470                                  errmsg("number of referencing and referenced columns for foreign key disagree")));
4471
4472         forboth(fk_attr, fkconstraint->fk_attrs,
4473                         pk_attr, fkconstraint->pk_attrs)
4474         {
4475                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4476                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4477         }
4478
4479         trigobj->objectId = CreateTrigger(fk_trigger, true);
4480
4481         /* Register dependency from trigger to constraint */
4482         recordDependencyOn(trigobj, constrobj, DEPENDENCY_INTERNAL);
4483
4484         /* Make changes-so-far visible */
4485         CommandCounterIncrement();
4486 }
4487
4488 /*
4489  * Create the triggers that implement an FK constraint.
4490  */
4491 static void
4492 createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
4493                                                  Oid constrOid)
4494 {
4495         RangeVar   *myRel;
4496         CreateTrigStmt *fk_trigger;
4497         ListCell   *fk_attr;
4498         ListCell   *pk_attr;
4499         ObjectAddress trigobj,
4500                                 constrobj;
4501
4502         /*
4503          * Reconstruct a RangeVar for my relation (not passed in, unfortunately).
4504          */
4505         myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
4506                                                  pstrdup(RelationGetRelationName(rel)));
4507
4508         /*
4509          * Preset objectAddress fields
4510          */
4511         constrobj.classId = ConstraintRelationId;
4512         constrobj.objectId = constrOid;
4513         constrobj.objectSubId = 0;
4514         trigobj.classId = TriggerRelationId;
4515         trigobj.objectSubId = 0;
4516
4517         /* Make changes-so-far visible */
4518         CommandCounterIncrement();
4519
4520         /*
4521          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the CHECK
4522          * action for both INSERTs and UPDATEs on the referencing table.
4523          */
4524         CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, true);
4525         CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, false);
4526
4527         /*
4528          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4529          * DELETE action on the referenced table.
4530          */
4531         fk_trigger = makeNode(CreateTrigStmt);
4532         fk_trigger->trigname = fkconstraint->constr_name;
4533         fk_trigger->relation = fkconstraint->pktable;
4534         fk_trigger->before = false;
4535         fk_trigger->row = true;
4536         fk_trigger->actions[0] = 'd';
4537         fk_trigger->actions[1] = '\0';
4538
4539         fk_trigger->isconstraint = true;
4540         fk_trigger->constrrel = myRel;
4541         switch (fkconstraint->fk_del_action)
4542         {
4543                 case FKCONSTR_ACTION_NOACTION:
4544                         fk_trigger->deferrable = fkconstraint->deferrable;
4545                         fk_trigger->initdeferred = fkconstraint->initdeferred;
4546                         fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_del");
4547                         break;
4548                 case FKCONSTR_ACTION_RESTRICT:
4549                         fk_trigger->deferrable = false;
4550                         fk_trigger->initdeferred = false;
4551                         fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_del");
4552                         break;
4553                 case FKCONSTR_ACTION_CASCADE:
4554                         fk_trigger->deferrable = false;
4555                         fk_trigger->initdeferred = false;
4556                         fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_del");
4557                         break;
4558                 case FKCONSTR_ACTION_SETNULL:
4559                         fk_trigger->deferrable = false;
4560                         fk_trigger->initdeferred = false;
4561                         fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_del");
4562                         break;
4563                 case FKCONSTR_ACTION_SETDEFAULT:
4564                         fk_trigger->deferrable = false;
4565                         fk_trigger->initdeferred = false;
4566                         fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_del");
4567                         break;
4568                 default:
4569                         elog(ERROR, "unrecognized FK action type: %d",
4570                                  (int) fkconstraint->fk_del_action);
4571                         break;
4572         }
4573
4574         fk_trigger->args = NIL;
4575         fk_trigger->args = lappend(fk_trigger->args,
4576                                                            makeString(fkconstraint->constr_name));
4577         fk_trigger->args = lappend(fk_trigger->args,
4578                                                            makeString(myRel->relname));
4579         fk_trigger->args = lappend(fk_trigger->args,
4580                                                            makeString(fkconstraint->pktable->relname));
4581         fk_trigger->args = lappend(fk_trigger->args,
4582                                 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4583         forboth(fk_attr, fkconstraint->fk_attrs,
4584                         pk_attr, fkconstraint->pk_attrs)
4585         {
4586                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4587                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4588         }
4589
4590         trigobj.objectId = CreateTrigger(fk_trigger, true);
4591
4592         /* Register dependency from trigger to constraint */
4593         recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4594
4595         /* Make changes-so-far visible */
4596         CommandCounterIncrement();
4597
4598         /*
4599          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4600          * UPDATE action on the referenced table.
4601          */
4602         fk_trigger = makeNode(CreateTrigStmt);
4603         fk_trigger->trigname = fkconstraint->constr_name;
4604         fk_trigger->relation = fkconstraint->pktable;
4605         fk_trigger->before = false;
4606         fk_trigger->row = true;
4607         fk_trigger->actions[0] = 'u';
4608         fk_trigger->actions[1] = '\0';
4609         fk_trigger->isconstraint = true;
4610         fk_trigger->constrrel = myRel;
4611         switch (fkconstraint->fk_upd_action)
4612         {
4613                 case FKCONSTR_ACTION_NOACTION:
4614                         fk_trigger->deferrable = fkconstraint->deferrable;
4615                         fk_trigger->initdeferred = fkconstraint->initdeferred;
4616                         fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_upd");
4617                         break;
4618                 case FKCONSTR_ACTION_RESTRICT:
4619                         fk_trigger->deferrable = false;
4620                         fk_trigger->initdeferred = false;
4621                         fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_upd");
4622                         break;
4623                 case FKCONSTR_ACTION_CASCADE:
4624                         fk_trigger->deferrable = false;
4625                         fk_trigger->initdeferred = false;
4626                         fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_upd");
4627                         break;
4628                 case FKCONSTR_ACTION_SETNULL:
4629                         fk_trigger->deferrable = false;
4630                         fk_trigger->initdeferred = false;
4631                         fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_upd");
4632                         break;
4633                 case FKCONSTR_ACTION_SETDEFAULT:
4634                         fk_trigger->deferrable = false;
4635                         fk_trigger->initdeferred = false;
4636                         fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_upd");
4637                         break;
4638                 default:
4639                         elog(ERROR, "unrecognized FK action type: %d",
4640                                  (int) fkconstraint->fk_upd_action);
4641                         break;
4642         }
4643
4644         fk_trigger->args = NIL;
4645         fk_trigger->args = lappend(fk_trigger->args,
4646                                                            makeString(fkconstraint->constr_name));
4647         fk_trigger->args = lappend(fk_trigger->args,
4648                                                            makeString(myRel->relname));
4649         fk_trigger->args = lappend(fk_trigger->args,
4650                                                            makeString(fkconstraint->pktable->relname));
4651         fk_trigger->args = lappend(fk_trigger->args,
4652                                 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4653         forboth(fk_attr, fkconstraint->fk_attrs,
4654                         pk_attr, fkconstraint->pk_attrs)
4655         {
4656                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4657                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4658         }
4659
4660         trigobj.objectId = CreateTrigger(fk_trigger, true);
4661
4662         /* Register dependency from trigger to constraint */
4663         recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4664 }
4665
4666 /*
4667  * fkMatchTypeToString -
4668  *        convert FKCONSTR_MATCH_xxx code to string to use in trigger args
4669  */
4670 static char *
4671 fkMatchTypeToString(char match_type)
4672 {
4673         switch (match_type)
4674         {
4675                 case FKCONSTR_MATCH_FULL:
4676                         return pstrdup("FULL");
4677                 case FKCONSTR_MATCH_PARTIAL:
4678                         return pstrdup("PARTIAL");
4679                 case FKCONSTR_MATCH_UNSPECIFIED:
4680                         return pstrdup("UNSPECIFIED");
4681                 default:
4682                         elog(ERROR, "unrecognized match type: %d",
4683                                  (int) match_type);
4684         }
4685         return NULL;                            /* can't get here */
4686 }
4687
4688 /*
4689  * ALTER TABLE DROP CONSTRAINT
4690  */
4691 static void
4692 ATPrepDropConstraint(List **wqueue, Relation rel,
4693                                          bool recurse, AlterTableCmd *cmd)
4694 {
4695         /*
4696          * We don't want errors or noise from child tables, so we have to pass
4697          * down a modified command.
4698          */
4699         if (recurse)
4700         {
4701                 AlterTableCmd *childCmd = copyObject(cmd);
4702
4703                 childCmd->subtype = AT_DropConstraintQuietly;
4704                 ATSimpleRecursion(wqueue, rel, childCmd, recurse);
4705         }
4706 }
4707
4708 static void
4709 ATExecDropConstraint(Relation rel, const char *constrName,
4710                                          DropBehavior behavior, bool quiet)
4711 {
4712         int                     deleted;
4713
4714         deleted = RemoveRelConstraints(rel, constrName, behavior);
4715
4716         if (!quiet)
4717         {
4718                 /* If zero constraints deleted, complain */
4719                 if (deleted == 0)
4720                         ereport(ERROR,
4721                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
4722                                          errmsg("constraint \"%s\" does not exist",
4723                                                         constrName)));
4724                 /* Otherwise if more than one constraint deleted, notify */
4725                 else if (deleted > 1)
4726                         ereport(NOTICE,
4727                                         (errmsg("multiple constraints named \"%s\" were dropped",
4728                                                         constrName)));
4729         }
4730 }
4731
4732 /*
4733  * ALTER COLUMN TYPE
4734  */
4735 static void
4736 ATPrepAlterColumnType(List **wqueue,
4737                                           AlteredTableInfo *tab, Relation rel,
4738                                           bool recurse, bool recursing,
4739                                           AlterTableCmd *cmd)
4740 {
4741         char       *colName = cmd->name;
4742         TypeName   *typename = (TypeName *) cmd->def;
4743         HeapTuple       tuple;
4744         Form_pg_attribute attTup;
4745         AttrNumber      attnum;
4746         Oid                     targettype;
4747         Node       *transform;
4748         NewColumnValue *newval;
4749         ParseState *pstate = make_parsestate(NULL);
4750
4751         /* lookup the attribute so we can check inheritance status */
4752         tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
4753         if (!HeapTupleIsValid(tuple))
4754                 ereport(ERROR,
4755                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
4756                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
4757                                                 colName, RelationGetRelationName(rel))));
4758         attTup = (Form_pg_attribute) GETSTRUCT(tuple);
4759         attnum = attTup->attnum;
4760
4761         /* Can't alter a system attribute */
4762         if (attnum <= 0)
4763                 ereport(ERROR,
4764                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4765                                  errmsg("cannot alter system column \"%s\"",
4766                                                 colName)));
4767
4768         /* Don't alter inherited columns */
4769         if (attTup->attinhcount > 0 && !recursing)
4770                 ereport(ERROR,
4771                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4772                                  errmsg("cannot alter inherited column \"%s\"",
4773                                                 colName)));
4774
4775         /* Look up the target type */
4776         targettype = LookupTypeName(typename);
4777         if (!OidIsValid(targettype))
4778                 ereport(ERROR,
4779                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
4780                                  errmsg("type \"%s\" does not exist",
4781                                                 TypeNameToString(typename))));
4782
4783         /* make sure datatype is legal for a column */
4784         CheckAttributeType(colName, targettype);
4785
4786         /*
4787          * Set up an expression to transform the old data value to the new type.
4788          * If a USING option was given, transform and use that expression, else
4789          * just take the old value and try to coerce it.  We do this first so that
4790          * type incompatibility can be detected before we waste effort, and
4791          * because we need the expression to be parsed against the original table
4792          * rowtype.
4793          */
4794         if (cmd->transform)
4795         {
4796                 RangeTblEntry *rte;
4797
4798                 /* Expression must be able to access vars of old table */
4799                 rte = addRangeTableEntryForRelation(pstate,
4800                                                                                         rel,
4801                                                                                         NULL,
4802                                                                                         false,
4803                                                                                         true);
4804                 addRTEtoQuery(pstate, rte, false, true, true);
4805
4806                 transform = transformExpr(pstate, cmd->transform);
4807
4808                 /* It can't return a set */
4809                 if (expression_returns_set(transform))
4810                         ereport(ERROR,
4811                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
4812                                          errmsg("transform expression must not return a set")));
4813
4814                 /* No subplans or aggregates, either... */
4815                 if (pstate->p_hasSubLinks)
4816                         ereport(ERROR,
4817                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4818                                          errmsg("cannot use subquery in transform expression")));
4819                 if (pstate->p_hasAggs)
4820                         ereport(ERROR,
4821                                         (errcode(ERRCODE_GROUPING_ERROR),
4822                         errmsg("cannot use aggregate function in transform expression")));
4823         }
4824         else
4825         {
4826                 transform = (Node *) makeVar(1, attnum,
4827                                                                          attTup->atttypid, attTup->atttypmod,
4828                                                                          0);
4829         }
4830
4831         transform = coerce_to_target_type(pstate,
4832                                                                           transform, exprType(transform),
4833                                                                           targettype, typename->typmod,
4834                                                                           COERCION_ASSIGNMENT,
4835                                                                           COERCE_IMPLICIT_CAST);
4836         if (transform == NULL)
4837                 ereport(ERROR,
4838                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
4839                                  errmsg("column \"%s\" cannot be cast to type \"%s\"",
4840                                                 colName, TypeNameToString(typename))));
4841
4842         /*
4843          * Add a work queue item to make ATRewriteTable update the column
4844          * contents.
4845          */
4846         newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
4847         newval->attnum = attnum;
4848         newval->expr = (Expr *) transform;
4849
4850         tab->newvals = lappend(tab->newvals, newval);
4851
4852         ReleaseSysCache(tuple);
4853
4854         /*
4855          * The recursion case is handled by ATSimpleRecursion.  However, if we are
4856          * told not to recurse, there had better not be any child tables; else the
4857          * alter would put them out of step.
4858          */
4859         if (recurse)
4860                 ATSimpleRecursion(wqueue, rel, cmd, recurse);
4861         else if (!recursing &&
4862                          find_inheritance_children(RelationGetRelid(rel)) != NIL)
4863                 ereport(ERROR,
4864                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4865                                  errmsg("type of inherited column \"%s\" must be changed in child tables too",
4866                                                 colName)));
4867 }
4868
4869 static void
4870 ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
4871                                           const char *colName, TypeName *typename)
4872 {
4873         HeapTuple       heapTup;
4874         Form_pg_attribute attTup;
4875         AttrNumber      attnum;
4876         HeapTuple       typeTuple;
4877         Form_pg_type tform;
4878         Oid                     targettype;
4879         Node       *defaultexpr;
4880         Relation        attrelation;
4881         Relation        depRel;
4882         ScanKeyData key[3];
4883         SysScanDesc scan;
4884         HeapTuple       depTup;
4885
4886         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
4887
4888         /* Look up the target column */
4889         heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
4890         if (!HeapTupleIsValid(heapTup))         /* shouldn't happen */
4891                 ereport(ERROR,
4892                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
4893                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
4894                                                 colName, RelationGetRelationName(rel))));
4895         attTup = (Form_pg_attribute) GETSTRUCT(heapTup);
4896         attnum = attTup->attnum;
4897
4898         /* Check for multiple ALTER TYPE on same column --- can't cope */
4899         if (attTup->atttypid != tab->oldDesc->attrs[attnum - 1]->atttypid ||
4900                 attTup->atttypmod != tab->oldDesc->attrs[attnum - 1]->atttypmod)
4901                 ereport(ERROR,
4902                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4903                                  errmsg("cannot alter type of column \"%s\" twice",
4904                                                 colName)));
4905
4906         /* Look up the target type (should not fail, since prep found it) */
4907         typeTuple = typenameType(typename);
4908         tform = (Form_pg_type) GETSTRUCT(typeTuple);
4909         targettype = HeapTupleGetOid(typeTuple);
4910
4911         /*
4912          * If there is a default expression for the column, get it and ensure we
4913          * can coerce it to the new datatype.  (We must do this before changing
4914          * the column type, because build_column_default itself will try to
4915          * coerce, and will not issue the error message we want if it fails.)
4916          *
4917          * We remove any implicit coercion steps at the top level of the old
4918          * default expression; this has been agreed to satisfy the principle of
4919          * least surprise.      (The conversion to the new column type should act like
4920          * it started from what the user sees as the stored expression, and the
4921          * implicit coercions aren't going to be shown.)
4922          */
4923         if (attTup->atthasdef)
4924         {
4925                 defaultexpr = build_column_default(rel, attnum);
4926                 Assert(defaultexpr);
4927                 defaultexpr = strip_implicit_coercions(defaultexpr);
4928                 defaultexpr = coerce_to_target_type(NULL,               /* no UNKNOWN params */
4929                                                                                   defaultexpr, exprType(defaultexpr),
4930                                                                                         targettype, typename->typmod,
4931                                                                                         COERCION_ASSIGNMENT,
4932                                                                                         COERCE_IMPLICIT_CAST);
4933                 if (defaultexpr == NULL)
4934                         ereport(ERROR,
4935                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
4936                         errmsg("default for column \"%s\" cannot be cast to type \"%s\"",
4937                                    colName, TypeNameToString(typename))));
4938         }
4939         else
4940                 defaultexpr = NULL;
4941
4942         /*
4943          * Find everything that depends on the column (constraints, indexes, etc),
4944          * and record enough information to let us recreate the objects.
4945          *
4946          * The actual recreation does not happen here, but only after we have
4947          * performed all the individual ALTER TYPE operations.  We have to save
4948          * the info before executing ALTER TYPE, though, else the deparser will
4949          * get confused.
4950          *
4951          * There could be multiple entries for the same object, so we must check
4952          * to ensure we process each one only once.  Note: we assume that an index
4953          * that implements a constraint will not show a direct dependency on the
4954          * column.
4955          */
4956         depRel = heap_open(DependRelationId, RowExclusiveLock);
4957
4958         ScanKeyInit(&key[0],
4959                                 Anum_pg_depend_refclassid,
4960                                 BTEqualStrategyNumber, F_OIDEQ,
4961                                 ObjectIdGetDatum(RelationRelationId));
4962         ScanKeyInit(&key[1],
4963                                 Anum_pg_depend_refobjid,
4964                                 BTEqualStrategyNumber, F_OIDEQ,
4965                                 ObjectIdGetDatum(RelationGetRelid(rel)));
4966         ScanKeyInit(&key[2],
4967                                 Anum_pg_depend_refobjsubid,
4968                                 BTEqualStrategyNumber, F_INT4EQ,
4969                                 Int32GetDatum((int32) attnum));
4970
4971         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
4972                                                           SnapshotNow, 3, key);
4973
4974         while (HeapTupleIsValid(depTup = systable_getnext(scan)))
4975         {
4976                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
4977                 ObjectAddress foundObject;
4978
4979                 /* We don't expect any PIN dependencies on columns */
4980                 if (foundDep->deptype == DEPENDENCY_PIN)
4981                         elog(ERROR, "cannot alter type of a pinned column");
4982
4983                 foundObject.classId = foundDep->classid;
4984                 foundObject.objectId = foundDep->objid;
4985                 foundObject.objectSubId = foundDep->objsubid;
4986
4987                 switch (getObjectClass(&foundObject))
4988                 {
4989                         case OCLASS_CLASS:
4990                                 {
4991                                         char            relKind = get_rel_relkind(foundObject.objectId);
4992
4993                                         if (relKind == RELKIND_INDEX)
4994                                         {
4995                                                 Assert(foundObject.objectSubId == 0);
4996                                                 if (!list_member_oid(tab->changedIndexOids, foundObject.objectId))
4997                                                 {
4998                                                         tab->changedIndexOids = lappend_oid(tab->changedIndexOids,
4999                                                                                                            foundObject.objectId);
5000                                                         tab->changedIndexDefs = lappend(tab->changedIndexDefs,
5001                                                            pg_get_indexdef_string(foundObject.objectId));
5002                                                 }
5003                                         }
5004                                         else if (relKind == RELKIND_SEQUENCE)
5005                                         {
5006                                                 /*
5007                                                  * This must be a SERIAL column's sequence.  We need
5008                                                  * not do anything to it.
5009                                                  */
5010                                                 Assert(foundObject.objectSubId == 0);
5011                                         }
5012                                         else
5013                                         {
5014                                                 /* Not expecting any other direct dependencies... */
5015                                                 elog(ERROR, "unexpected object depending on column: %s",
5016                                                          getObjectDescription(&foundObject));
5017                                         }
5018                                         break;
5019                                 }
5020
5021                         case OCLASS_CONSTRAINT:
5022                                 Assert(foundObject.objectSubId == 0);
5023                                 if (!list_member_oid(tab->changedConstraintOids,
5024                                                                          foundObject.objectId))
5025                                 {
5026                                         char *defstring = pg_get_constraintdef_string(foundObject.objectId);
5027
5028                                         /*
5029                                          * Put NORMAL dependencies at the front of the list and
5030                                          * AUTO dependencies at the back.  This makes sure that
5031                                          * foreign-key constraints depending on this column will
5032                                          * be dropped before unique or primary-key constraints of
5033                                          * the column; which we must have because the FK
5034                                          * constraints depend on the indexes belonging to the
5035                                          * unique constraints.
5036                                          */
5037                                         if (foundDep->deptype == DEPENDENCY_NORMAL)
5038                                         {
5039                                                 tab->changedConstraintOids =
5040                                                         lcons_oid(foundObject.objectId,
5041                                                                           tab->changedConstraintOids);
5042                                                 tab->changedConstraintDefs =
5043                                                         lcons(defstring,
5044                                                                   tab->changedConstraintDefs);
5045                                         }
5046                                         else
5047                                         {
5048                                                 tab->changedConstraintOids =
5049                                                         lappend_oid(tab->changedConstraintOids,
5050                                                                                 foundObject.objectId);
5051                                                 tab->changedConstraintDefs =
5052                                                         lappend(tab->changedConstraintDefs,
5053                                                                         defstring);
5054                                         }
5055                                 }
5056                                 break;
5057
5058                         case OCLASS_REWRITE:
5059                                 /* XXX someday see if we can cope with revising views */
5060                                 ereport(ERROR,
5061                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5062                                                  errmsg("cannot alter type of a column used by a view or rule"),
5063                                                  errdetail("%s depends on column \"%s\"",
5064                                                                    getObjectDescription(&foundObject),
5065                                                                    colName)));
5066                                 break;
5067
5068                         case OCLASS_DEFAULT:
5069
5070                                 /*
5071                                  * Ignore the column's default expression, since we will fix
5072                                  * it below.
5073                                  */
5074                                 Assert(defaultexpr);
5075                                 break;
5076
5077                         case OCLASS_PROC:
5078                         case OCLASS_TYPE:
5079                         case OCLASS_CAST:
5080                         case OCLASS_CONVERSION:
5081                         case OCLASS_LANGUAGE:
5082                         case OCLASS_OPERATOR:
5083                         case OCLASS_OPCLASS:
5084                         case OCLASS_TRIGGER:
5085                         case OCLASS_SCHEMA:
5086
5087                                 /*
5088                                  * We don't expect any of these sorts of objects to depend on
5089                                  * a column.
5090                                  */
5091                                 elog(ERROR, "unexpected object depending on column: %s",
5092                                          getObjectDescription(&foundObject));
5093                                 break;
5094
5095                         default:
5096                                 elog(ERROR, "unrecognized object class: %u",
5097                                          foundObject.classId);
5098                 }
5099         }
5100
5101         systable_endscan(scan);
5102
5103         /*
5104          * Now scan for dependencies of this column on other things.  The only
5105          * thing we should find is the dependency on the column datatype, which we
5106          * want to remove.
5107          */
5108         ScanKeyInit(&key[0],
5109                                 Anum_pg_depend_classid,
5110                                 BTEqualStrategyNumber, F_OIDEQ,
5111                                 ObjectIdGetDatum(RelationRelationId));
5112         ScanKeyInit(&key[1],
5113                                 Anum_pg_depend_objid,
5114                                 BTEqualStrategyNumber, F_OIDEQ,
5115                                 ObjectIdGetDatum(RelationGetRelid(rel)));
5116         ScanKeyInit(&key[2],
5117                                 Anum_pg_depend_objsubid,
5118                                 BTEqualStrategyNumber, F_INT4EQ,
5119                                 Int32GetDatum((int32) attnum));
5120
5121         scan = systable_beginscan(depRel, DependDependerIndexId, true,
5122                                                           SnapshotNow, 3, key);
5123
5124         while (HeapTupleIsValid(depTup = systable_getnext(scan)))
5125         {
5126                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
5127
5128                 if (foundDep->deptype != DEPENDENCY_NORMAL)
5129                         elog(ERROR, "found unexpected dependency type '%c'",
5130                                  foundDep->deptype);
5131                 if (foundDep->refclassid != TypeRelationId ||
5132                         foundDep->refobjid != attTup->atttypid)
5133                         elog(ERROR, "found unexpected dependency for column");
5134
5135                 simple_heap_delete(depRel, &depTup->t_self);
5136         }
5137
5138         systable_endscan(scan);
5139
5140         heap_close(depRel, RowExclusiveLock);
5141
5142         /*
5143          * Here we go --- change the recorded column type.      (Note heapTup is a
5144          * copy of the syscache entry, so okay to scribble on.)
5145          */
5146         attTup->atttypid = targettype;
5147         attTup->atttypmod = typename->typmod;
5148         attTup->attndims = list_length(typename->arrayBounds);
5149         attTup->attlen = tform->typlen;
5150         attTup->attbyval = tform->typbyval;
5151         attTup->attalign = tform->typalign;
5152         attTup->attstorage = tform->typstorage;
5153
5154         ReleaseSysCache(typeTuple);
5155
5156         simple_heap_update(attrelation, &heapTup->t_self, heapTup);
5157
5158         /* keep system catalog indexes current */
5159         CatalogUpdateIndexes(attrelation, heapTup);
5160
5161         heap_close(attrelation, RowExclusiveLock);
5162
5163         /* Install dependency on new datatype */
5164         add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype);
5165
5166         /*
5167          * Drop any pg_statistic entry for the column, since it's now wrong type
5168          */
5169         RemoveStatistics(RelationGetRelid(rel), attnum);
5170
5171         /*
5172          * Update the default, if present, by brute force --- remove and re-add
5173          * the default.  Probably unsafe to take shortcuts, since the new version
5174          * may well have additional dependencies.  (It's okay to do this now,
5175          * rather than after other ALTER TYPE commands, since the default won't
5176          * depend on other column types.)
5177          */
5178         if (defaultexpr)
5179         {
5180                 /* Must make new row visible since it will be updated again */
5181                 CommandCounterIncrement();
5182
5183                 /*
5184                  * We use RESTRICT here for safety, but at present we do not expect
5185                  * anything to depend on the default.
5186                  */
5187                 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
5188
5189                 StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
5190         }
5191
5192         /* Cleanup */
5193         heap_freetuple(heapTup);
5194 }
5195
5196 /*
5197  * Cleanup after we've finished all the ALTER TYPE operations for a
5198  * particular relation.  We have to drop and recreate all the indexes
5199  * and constraints that depend on the altered columns.
5200  */
5201 static void
5202 ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
5203 {
5204         ObjectAddress obj;
5205         ListCell   *l;
5206
5207         /*
5208          * Re-parse the index and constraint definitions, and attach them to the
5209          * appropriate work queue entries.      We do this before dropping because in
5210          * the case of a FOREIGN KEY constraint, we might not yet have exclusive
5211          * lock on the table the constraint is attached to, and we need to get
5212          * that before dropping.  It's safe because the parser won't actually look
5213          * at the catalogs to detect the existing entry.
5214          */
5215         foreach(l, tab->changedIndexDefs)
5216                 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5217         foreach(l, tab->changedConstraintDefs)
5218                 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5219
5220         /*
5221          * Now we can drop the existing constraints and indexes --- constraints
5222          * first, since some of them might depend on the indexes.  In fact, we
5223          * have to delete FOREIGN KEY constraints before UNIQUE constraints,
5224          * but we already ordered the constraint list to ensure that would happen.
5225          * It should be okay to use DROP_RESTRICT here, since nothing else should
5226          * be depending on these objects.
5227          */
5228         foreach(l, tab->changedConstraintOids)
5229         {
5230                 obj.classId = ConstraintRelationId;
5231                 obj.objectId = lfirst_oid(l);
5232                 obj.objectSubId = 0;
5233                 performDeletion(&obj, DROP_RESTRICT);
5234         }
5235
5236         foreach(l, tab->changedIndexOids)
5237         {
5238                 obj.classId = RelationRelationId;
5239                 obj.objectId = lfirst_oid(l);
5240                 obj.objectSubId = 0;
5241                 performDeletion(&obj, DROP_RESTRICT);
5242         }
5243
5244         /*
5245          * The objects will get recreated during subsequent passes over the work
5246          * queue.
5247          */
5248 }
5249
5250 static void
5251 ATPostAlterTypeParse(char *cmd, List **wqueue)
5252 {
5253         List       *raw_parsetree_list;
5254         List       *querytree_list;
5255         ListCell   *list_item;
5256
5257         /*
5258          * We expect that we only have to do raw parsing and parse analysis, not
5259          * any rule rewriting, since these will all be utility statements.
5260          */
5261         raw_parsetree_list = raw_parser(cmd);
5262         querytree_list = NIL;
5263         foreach(list_item, raw_parsetree_list)
5264         {
5265                 Node       *parsetree = (Node *) lfirst(list_item);
5266
5267                 querytree_list = list_concat(querytree_list,
5268                                                                          parse_analyze(parsetree, NULL, 0));
5269         }
5270
5271         /*
5272          * Attach each generated command to the proper place in the work queue.
5273          * Note this could result in creation of entirely new work-queue entries.
5274          */
5275         foreach(list_item, querytree_list)
5276         {
5277                 Query      *query = (Query *) lfirst(list_item);
5278                 Relation        rel;
5279                 AlteredTableInfo *tab;
5280
5281                 Assert(IsA(query, Query));
5282                 Assert(query->commandType == CMD_UTILITY);
5283                 switch (nodeTag(query->utilityStmt))
5284                 {
5285                         case T_IndexStmt:
5286                                 {
5287                                         IndexStmt  *stmt = (IndexStmt *) query->utilityStmt;
5288                                         AlterTableCmd *newcmd;
5289
5290                                         rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5291                                         tab = ATGetQueueEntry(wqueue, rel);
5292                                         newcmd = makeNode(AlterTableCmd);
5293                                         newcmd->subtype = AT_ReAddIndex;
5294                                         newcmd->def = (Node *) stmt;
5295                                         tab->subcmds[AT_PASS_OLD_INDEX] =
5296                                                 lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
5297                                         relation_close(rel, NoLock);
5298                                         break;
5299                                 }
5300                         case T_AlterTableStmt:
5301                                 {
5302                                         AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt;
5303                                         ListCell   *lcmd;
5304
5305                                         rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5306                                         tab = ATGetQueueEntry(wqueue, rel);
5307                                         foreach(lcmd, stmt->cmds)
5308                                         {
5309                                                 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
5310
5311                                                 switch (cmd->subtype)
5312                                                 {
5313                                                         case AT_AddIndex:
5314                                                                 cmd->subtype = AT_ReAddIndex;
5315                                                                 tab->subcmds[AT_PASS_OLD_INDEX] =
5316                                                                         lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd);
5317                                                                 break;
5318                                                         case AT_AddConstraint:
5319                                                                 tab->subcmds[AT_PASS_OLD_CONSTR] =
5320                                                                         lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd);
5321                                                                 break;
5322                                                         default:
5323                                                                 elog(ERROR, "unexpected statement type: %d",
5324                                                                          (int) cmd->subtype);
5325                                                 }
5326                                         }
5327                                         relation_close(rel, NoLock);
5328                                         break;
5329                                 }
5330                         default:
5331                                 elog(ERROR, "unexpected statement type: %d",
5332                                          (int) nodeTag(query->utilityStmt));
5333                 }
5334         }
5335 }
5336
5337
5338 /*
5339  * ALTER TABLE OWNER
5340  *
5341  * recursing is true if we are recursing from a table to its indexes or
5342  * toast table.  We don't allow the ownership of those things to be
5343  * changed separately from the parent table.  Also, we can skip permission
5344  * checks (this is necessary not just an optimization, else we'd fail to
5345  * handle toast tables properly).
5346  */
5347 void
5348 ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
5349 {
5350         Relation        target_rel;
5351         Relation        class_rel;
5352         HeapTuple       tuple;
5353         Form_pg_class tuple_class;
5354
5355         /*
5356          * Get exclusive lock till end of transaction on the target table. Use
5357          * relation_open so that we can work on indexes and sequences.
5358          */
5359         target_rel = relation_open(relationOid, AccessExclusiveLock);
5360
5361         /* Get its pg_class tuple, too */
5362         class_rel = heap_open(RelationRelationId, RowExclusiveLock);
5363
5364         tuple = SearchSysCache(RELOID,
5365                                                    ObjectIdGetDatum(relationOid),
5366                                                    0, 0, 0);
5367         if (!HeapTupleIsValid(tuple))
5368                 elog(ERROR, "cache lookup failed for relation %u", relationOid);
5369         tuple_class = (Form_pg_class) GETSTRUCT(tuple);
5370
5371         /* Can we change the ownership of this tuple? */
5372         switch (tuple_class->relkind)
5373         {
5374                 case RELKIND_RELATION:
5375                 case RELKIND_VIEW:
5376                 case RELKIND_SEQUENCE:
5377                         /* ok to change owner */
5378                         break;
5379                 case RELKIND_INDEX:
5380                         if (!recursing)
5381                         {
5382                                 /*
5383                                  * Because ALTER INDEX OWNER used to be allowed, and in fact
5384                                  * is generated by old versions of pg_dump, we give a warning
5385                                  * and do nothing rather than erroring out.  Also, to avoid
5386                                  * unnecessary chatter while restoring those old dumps, say
5387                                  * nothing at all if the command would be a no-op anyway.
5388                                  */
5389                                 if (tuple_class->relowner != newOwnerId)
5390                                         ereport(WARNING,
5391                                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5392                                                          errmsg("cannot change owner of index \"%s\"",
5393                                                                         NameStr(tuple_class->relname)),
5394                                                          errhint("Change the ownership of the index's table, instead.")));
5395                                 /* quick hack to exit via the no-op path */
5396                                 newOwnerId = tuple_class->relowner;
5397                         }
5398                         break;
5399                 case RELKIND_TOASTVALUE:
5400                         if (recursing)
5401                                 break;
5402                         /* FALL THRU */
5403                 default:
5404                         ereport(ERROR,
5405                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5406                                          errmsg("\"%s\" is not a table, view, or sequence",
5407                                                         NameStr(tuple_class->relname))));
5408         }
5409
5410         /*
5411          * If the new owner is the same as the existing owner, consider the
5412          * command to have succeeded.  This is for dump restoration purposes.
5413          */
5414         if (tuple_class->relowner != newOwnerId)
5415         {
5416                 Datum           repl_val[Natts_pg_class];
5417                 char            repl_null[Natts_pg_class];
5418                 char            repl_repl[Natts_pg_class];
5419                 Acl                *newAcl;
5420                 Datum           aclDatum;
5421                 bool            isNull;
5422                 HeapTuple       newtuple;
5423
5424                 /* skip permission checks when recursing to index or toast table */
5425                 if (!recursing)
5426                 {
5427                         /* Superusers can always do it */
5428                         if (!superuser())
5429                         {
5430                                 Oid                     namespaceOid = tuple_class->relnamespace;
5431                                 AclResult       aclresult;
5432
5433                                 /* Otherwise, must be owner of the existing object */
5434                                 if (!pg_class_ownercheck(relationOid, GetUserId()))
5435                                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5436                                                                    RelationGetRelationName(target_rel));
5437
5438                                 /* Must be able to become new owner */
5439                                 check_is_member_of_role(GetUserId(), newOwnerId);
5440
5441                                 /* New owner must have CREATE privilege on namespace */
5442                                 aclresult = pg_namespace_aclcheck(namespaceOid, newOwnerId,
5443                                                                                                   ACL_CREATE);
5444                                 if (aclresult != ACLCHECK_OK)
5445                                         aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
5446                                                                    get_namespace_name(namespaceOid));
5447                         }
5448                 }
5449
5450                 memset(repl_null, ' ', sizeof(repl_null));
5451                 memset(repl_repl, ' ', sizeof(repl_repl));
5452
5453                 repl_repl[Anum_pg_class_relowner - 1] = 'r';
5454                 repl_val[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(newOwnerId);
5455
5456                 /*
5457                  * Determine the modified ACL for the new owner.  This is only
5458                  * necessary when the ACL is non-null.
5459                  */
5460                 aclDatum = SysCacheGetAttr(RELOID, tuple,
5461                                                                    Anum_pg_class_relacl,
5462                                                                    &isNull);
5463                 if (!isNull)
5464                 {
5465                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
5466                                                                  tuple_class->relowner, newOwnerId);
5467                         repl_repl[Anum_pg_class_relacl - 1] = 'r';
5468                         repl_val[Anum_pg_class_relacl - 1] = PointerGetDatum(newAcl);
5469                 }
5470
5471                 newtuple = heap_modifytuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl);
5472
5473                 simple_heap_update(class_rel, &newtuple->t_self, newtuple);
5474                 CatalogUpdateIndexes(class_rel, newtuple);
5475
5476                 heap_freetuple(newtuple);
5477
5478                 /* Update owner dependency reference */
5479                 changeDependencyOnOwner(RelationRelationId, relationOid, newOwnerId);
5480
5481                 /*
5482                  * Also change the ownership of the table's rowtype, if it has one
5483                  */
5484                 if (tuple_class->relkind != RELKIND_INDEX)
5485                         AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId);
5486
5487                 /*
5488                  * If we are operating on a table, also change the ownership of any
5489                  * indexes and sequences that belong to the table, as well as the
5490                  * table's toast table (if it has one)
5491                  */
5492                 if (tuple_class->relkind == RELKIND_RELATION ||
5493                         tuple_class->relkind == RELKIND_TOASTVALUE)
5494                 {
5495                         List       *index_oid_list;
5496                         ListCell   *i;
5497
5498                         /* Find all the indexes belonging to this relation */
5499                         index_oid_list = RelationGetIndexList(target_rel);
5500
5501                         /* For each index, recursively change its ownership */
5502                         foreach(i, index_oid_list)
5503                                 ATExecChangeOwner(lfirst_oid(i), newOwnerId, true);
5504
5505                         list_free(index_oid_list);
5506                 }
5507
5508                 if (tuple_class->relkind == RELKIND_RELATION)
5509                 {
5510                         /* If it has a toast table, recurse to change its ownership */
5511                         if (tuple_class->reltoastrelid != InvalidOid)
5512                                 ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerId,
5513                                                                   true);
5514
5515                         /* If it has dependent sequences, recurse to change them too */
5516                         change_owner_recurse_to_sequences(relationOid, newOwnerId);
5517                 }
5518         }
5519
5520         ReleaseSysCache(tuple);
5521         heap_close(class_rel, RowExclusiveLock);
5522         relation_close(target_rel, NoLock);
5523 }
5524
5525 /*
5526  * change_owner_recurse_to_sequences
5527  *
5528  * Helper function for ATExecChangeOwner.  Examines pg_depend searching
5529  * for sequences that are dependent on serial columns, and changes their
5530  * ownership.
5531  */
5532 static void
5533 change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId)
5534 {
5535         Relation        depRel;
5536         SysScanDesc scan;
5537         ScanKeyData key[2];
5538         HeapTuple       tup;
5539
5540         /*
5541          * SERIAL sequences are those having an internal dependency on one of the
5542          * table's columns (we don't care *which* column, exactly).
5543          */
5544         depRel = heap_open(DependRelationId, AccessShareLock);
5545
5546         ScanKeyInit(&key[0],
5547                                 Anum_pg_depend_refclassid,
5548                                 BTEqualStrategyNumber, F_OIDEQ,
5549                                 ObjectIdGetDatum(RelationRelationId));
5550         ScanKeyInit(&key[1],
5551                                 Anum_pg_depend_refobjid,
5552                                 BTEqualStrategyNumber, F_OIDEQ,
5553                                 ObjectIdGetDatum(relationOid));
5554         /* we leave refobjsubid unspecified */
5555
5556         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
5557                                                           SnapshotNow, 2, key);
5558
5559         while (HeapTupleIsValid(tup = systable_getnext(scan)))
5560         {
5561                 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
5562                 Relation        seqRel;
5563
5564                 /* skip dependencies other than internal dependencies on columns */
5565                 if (depForm->refobjsubid == 0 ||
5566                         depForm->classid != RelationRelationId ||
5567                         depForm->objsubid != 0 ||
5568                         depForm->deptype != DEPENDENCY_INTERNAL)
5569                         continue;
5570
5571                 /* Use relation_open just in case it's an index */
5572                 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
5573
5574                 /* skip non-sequence relations */
5575                 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
5576                 {
5577                         /* No need to keep the lock */
5578                         relation_close(seqRel, AccessExclusiveLock);
5579                         continue;
5580                 }
5581
5582                 /* We don't need to close the sequence while we alter it. */
5583                 ATExecChangeOwner(depForm->objid, newOwnerId, false);
5584
5585                 /* Now we can close it.  Keep the lock till end of transaction. */
5586                 relation_close(seqRel, NoLock);
5587         }
5588
5589         systable_endscan(scan);
5590
5591         relation_close(depRel, AccessShareLock);
5592 }
5593
5594 /*
5595  * ALTER TABLE CLUSTER ON
5596  *
5597  * The only thing we have to do is to change the indisclustered bits.
5598  */
5599 static void
5600 ATExecClusterOn(Relation rel, const char *indexName)
5601 {
5602         Oid                     indexOid;
5603
5604         indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
5605
5606         if (!OidIsValid(indexOid))
5607                 ereport(ERROR,
5608                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
5609                                  errmsg("index \"%s\" for table \"%s\" does not exist",
5610                                                 indexName, RelationGetRelationName(rel))));
5611
5612         /* Check index is valid to cluster on */
5613         check_index_is_clusterable(rel, indexOid, false);
5614
5615         /* And do the work */
5616         mark_index_clustered(rel, indexOid);
5617 }
5618
5619 /*
5620  * ALTER TABLE SET WITHOUT CLUSTER
5621  *
5622  * We have to find any indexes on the table that have indisclustered bit
5623  * set and turn it off.
5624  */
5625 static void
5626 ATExecDropCluster(Relation rel)
5627 {
5628         mark_index_clustered(rel, InvalidOid);
5629 }
5630
5631 /*
5632  * ALTER TABLE SET TABLESPACE
5633  */
5634 static void
5635 ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
5636 {
5637         Oid                     tablespaceId;
5638         AclResult       aclresult;
5639
5640         /*
5641          * We do our own permission checking because we want to allow this on
5642          * indexes.
5643          */
5644         if (rel->rd_rel->relkind != RELKIND_RELATION &&
5645                 rel->rd_rel->relkind != RELKIND_INDEX)
5646                 ereport(ERROR,
5647                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5648                                  errmsg("\"%s\" is not a table or index",
5649                                                 RelationGetRelationName(rel))));
5650
5651         /* Permissions checks */
5652         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
5653                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5654                                            RelationGetRelationName(rel));
5655
5656         if (!allowSystemTableMods && IsSystemRelation(rel))
5657                 ereport(ERROR,
5658                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5659                                  errmsg("permission denied: \"%s\" is a system catalog",
5660                                                 RelationGetRelationName(rel))));
5661
5662         /* Check that the tablespace exists */
5663         tablespaceId = get_tablespace_oid(tablespacename);
5664         if (!OidIsValid(tablespaceId))
5665                 ereport(ERROR,
5666                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
5667                                  errmsg("tablespace \"%s\" does not exist", tablespacename)));
5668
5669         /* Check its permissions */
5670         aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
5671         if (aclresult != ACLCHECK_OK)
5672                 aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
5673
5674         /* Save info for Phase 3 to do the real work */
5675         if (OidIsValid(tab->newTableSpace))
5676                 ereport(ERROR,
5677                                 (errcode(ERRCODE_SYNTAX_ERROR),
5678                                  errmsg("cannot have multiple SET TABLESPACE subcommands")));
5679         tab->newTableSpace = tablespaceId;
5680 }
5681
5682 /*
5683  * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
5684  * rewriting to be done, so we just want to copy the data as fast as possible.
5685  */
5686 static void
5687 ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
5688 {
5689         Relation        rel;
5690         Oid                     oldTableSpace;
5691         Oid                     reltoastrelid;
5692         Oid                     reltoastidxid;
5693         RelFileNode newrnode;
5694         SMgrRelation dstrel;
5695         Relation        pg_class;
5696         HeapTuple       tuple;
5697         Form_pg_class rd_rel;
5698
5699         rel = relation_open(tableOid, NoLock);
5700
5701         /*
5702          * We can never allow moving of shared or nailed-in-cache relations,
5703          * because we can't support changing their reltablespace values.
5704          */
5705         if (rel->rd_rel->relisshared || rel->rd_isnailed)
5706                 ereport(ERROR,
5707                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5708                                  errmsg("cannot move system relation \"%s\"",
5709                                                 RelationGetRelationName(rel))));
5710
5711         /*
5712          * Don't allow moving temp tables of other backends ... their local buffer
5713          * manager is not going to cope.
5714          */
5715         if (isOtherTempNamespace(RelationGetNamespace(rel)))
5716                 ereport(ERROR,
5717                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5718                                  errmsg("cannot move temporary tables of other sessions")));
5719
5720         /*
5721          * No work if no change in tablespace.
5722          */
5723         oldTableSpace = rel->rd_rel->reltablespace;
5724         if (newTableSpace == oldTableSpace ||
5725                 (newTableSpace == MyDatabaseTableSpace && oldTableSpace == 0))
5726         {
5727                 relation_close(rel, NoLock);
5728                 return;
5729         }
5730
5731         reltoastrelid = rel->rd_rel->reltoastrelid;
5732         reltoastidxid = rel->rd_rel->reltoastidxid;
5733
5734         /* Get a modifiable copy of the relation's pg_class row */
5735         pg_class = heap_open(RelationRelationId, RowExclusiveLock);
5736
5737         tuple = SearchSysCacheCopy(RELOID,
5738                                                            ObjectIdGetDatum(tableOid),
5739                                                            0, 0, 0);
5740         if (!HeapTupleIsValid(tuple))
5741                 elog(ERROR, "cache lookup failed for relation %u", tableOid);
5742         rd_rel = (Form_pg_class) GETSTRUCT(tuple);
5743
5744         /* create another storage file. Is it a little ugly ? */
5745         /* NOTE: any conflict in relfilenode value will be caught here */
5746         newrnode = rel->rd_node;
5747         newrnode.spcNode = newTableSpace;
5748
5749         dstrel = smgropen(newrnode);
5750         smgrcreate(dstrel, rel->rd_istemp, false);
5751
5752         /* copy relation data to the new physical file */
5753         copy_relation_data(rel, dstrel);
5754
5755         /* schedule unlinking old physical file */
5756         RelationOpenSmgr(rel);
5757         smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
5758
5759         /*
5760          * Now drop smgr references.  The source was already dropped by
5761          * smgrscheduleunlink.
5762          */
5763         smgrclose(dstrel);
5764
5765         /* update the pg_class row */
5766         rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
5767         simple_heap_update(pg_class, &tuple->t_self, tuple);
5768         CatalogUpdateIndexes(pg_class, tuple);
5769
5770         heap_freetuple(tuple);
5771
5772         heap_close(pg_class, RowExclusiveLock);
5773
5774         relation_close(rel, NoLock);
5775
5776         /* Make sure the reltablespace change is visible */
5777         CommandCounterIncrement();
5778
5779         /* Move associated toast relation and/or index, too */
5780         if (OidIsValid(reltoastrelid))
5781                 ATExecSetTableSpace(reltoastrelid, newTableSpace);
5782         if (OidIsValid(reltoastidxid))
5783                 ATExecSetTableSpace(reltoastidxid, newTableSpace);
5784 }
5785
5786 /*
5787  * Copy data, block by block
5788  */
5789 static void
5790 copy_relation_data(Relation rel, SMgrRelation dst)
5791 {
5792         SMgrRelation src;
5793         bool            use_wal;
5794         BlockNumber nblocks;
5795         BlockNumber blkno;
5796         char            buf[BLCKSZ];
5797         Page            page = (Page) buf;
5798
5799         /*
5800          * Since we copy the file directly without looking at the shared buffers,
5801          * we'd better first flush out any pages of the source relation that are
5802          * in shared buffers.  We assume no new changes will be made while we are
5803          * holding exclusive lock on the rel.
5804          */
5805         FlushRelationBuffers(rel);
5806
5807         /*
5808          * We need to log the copied data in WAL iff WAL archiving is enabled AND
5809          * it's not a temp rel.
5810          */
5811         use_wal = XLogArchivingActive() && !rel->rd_istemp;
5812
5813         nblocks = RelationGetNumberOfBlocks(rel);
5814         /* RelationGetNumberOfBlocks will certainly have opened rd_smgr */
5815         src = rel->rd_smgr;
5816
5817         for (blkno = 0; blkno < nblocks; blkno++)
5818         {
5819                 smgrread(src, blkno, buf);
5820
5821                 /* XLOG stuff */
5822                 if (use_wal)
5823                 {
5824                         xl_heap_newpage xlrec;
5825                         XLogRecPtr      recptr;
5826                         XLogRecData rdata[2];
5827
5828                         /* NO ELOG(ERROR) from here till newpage op is logged */
5829                         START_CRIT_SECTION();
5830
5831                         xlrec.node = dst->smgr_rnode;
5832                         xlrec.blkno = blkno;
5833
5834                         rdata[0].data = (char *) &xlrec;
5835                         rdata[0].len = SizeOfHeapNewpage;
5836                         rdata[0].buffer = InvalidBuffer;
5837                         rdata[0].next = &(rdata[1]);
5838
5839                         rdata[1].data = (char *) page;
5840                         rdata[1].len = BLCKSZ;
5841                         rdata[1].buffer = InvalidBuffer;
5842                         rdata[1].next = NULL;
5843
5844                         recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
5845
5846                         PageSetLSN(page, recptr);
5847                         PageSetTLI(page, ThisTimeLineID);
5848
5849                         END_CRIT_SECTION();
5850                 }
5851
5852                 /*
5853                  * Now write the page.  We say isTemp = true even if it's not a temp
5854                  * rel, because there's no need for smgr to schedule an fsync for this
5855                  * write; we'll do it ourselves below.
5856                  */
5857                 smgrwrite(dst, blkno, buf, true);
5858         }
5859
5860         /*
5861          * If the rel isn't temp, we must fsync it down to disk before it's safe
5862          * to commit the transaction.  (For a temp rel we don't care since the rel
5863          * will be uninteresting after a crash anyway.)
5864          *
5865          * It's obvious that we must do this when not WAL-logging the copy. It's
5866          * less obvious that we have to do it even if we did WAL-log the copied
5867          * pages. The reason is that since we're copying outside shared buffers, a
5868          * CHECKPOINT occurring during the copy has no way to flush the previously
5869          * written data to disk (indeed it won't know the new rel even exists).  A
5870          * crash later on would replay WAL from the checkpoint, therefore it
5871          * wouldn't replay our earlier WAL entries. If we do not fsync those pages
5872          * here, they might still not be on disk when the crash occurs.
5873          */
5874         if (!rel->rd_istemp)
5875                 smgrimmedsync(dst);
5876 }
5877
5878 /*
5879  * ALTER TABLE ENABLE/DISABLE TRIGGER
5880  *
5881  * We just pass this off to trigger.c.
5882  */
5883 static void
5884 ATExecEnableDisableTrigger(Relation rel, char *trigname,
5885                                                    bool enable, bool skip_system)
5886 {
5887         EnableDisableTrigger(rel, trigname, enable, skip_system);
5888 }
5889
5890 /*
5891  * ALTER TABLE CREATE TOAST TABLE
5892  *
5893  * Note: this is also invoked from outside this module; in such cases we
5894  * expect the caller to have verified that the relation is a table and we
5895  * have all the right permissions.      Callers expect this function
5896  * to end with CommandCounterIncrement if it makes any changes.
5897  */
5898 void
5899 AlterTableCreateToastTable(Oid relOid, bool silent)
5900 {
5901         Relation        rel;
5902         HeapTuple       reltup;
5903         TupleDesc       tupdesc;
5904         bool            shared_relation;
5905         Relation        class_rel;
5906         Oid                     toast_relid;
5907         Oid                     toast_idxid;
5908         char            toast_relname[NAMEDATALEN];
5909         char            toast_idxname[NAMEDATALEN];
5910         IndexInfo  *indexInfo;
5911         Oid                     classObjectId[2];
5912         ObjectAddress baseobject,
5913                                 toastobject;
5914
5915         /*
5916          * Grab an exclusive lock on the target table, which we will NOT release
5917          * until end of transaction.  (This is probably redundant in all present
5918          * uses...)
5919          */
5920         rel = heap_open(relOid, AccessExclusiveLock);
5921
5922         /*
5923          * Toast table is shared if and only if its parent is.
5924          *
5925          * We cannot allow toasting a shared relation after initdb (because
5926          * there's no way to mark it toasted in other databases' pg_class).
5927          * Unfortunately we can't distinguish initdb from a manually started
5928          * standalone backend (toasting happens after the bootstrap phase, so
5929          * checking IsBootstrapProcessingMode() won't work).  However, we can at
5930          * least prevent this mistake under normal multi-user operation.
5931          */
5932         shared_relation = rel->rd_rel->relisshared;
5933         if (shared_relation && IsUnderPostmaster)
5934                 ereport(ERROR,
5935                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5936                                  errmsg("shared tables cannot be toasted after initdb")));
5937
5938         /*
5939          * Is it already toasted?
5940          */
5941         if (rel->rd_rel->reltoastrelid != InvalidOid)
5942         {
5943                 if (silent)
5944                 {
5945                         heap_close(rel, NoLock);
5946                         return;
5947                 }
5948
5949                 ereport(ERROR,
5950                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5951                                  errmsg("table \"%s\" already has a TOAST table",
5952                                                 RelationGetRelationName(rel))));
5953         }
5954
5955         /*
5956          * Check to see whether the table actually needs a TOAST table.
5957          */
5958         if (!needs_toast_table(rel))
5959         {
5960                 if (silent)
5961                 {
5962                         heap_close(rel, NoLock);
5963                         return;
5964                 }
5965
5966                 ereport(ERROR,
5967                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5968                                  errmsg("table \"%s\" does not need a TOAST table",
5969                                                 RelationGetRelationName(rel))));
5970         }
5971
5972         /*
5973          * Create the toast table and its index
5974          */
5975         snprintf(toast_relname, sizeof(toast_relname),
5976                          "pg_toast_%u", relOid);
5977         snprintf(toast_idxname, sizeof(toast_idxname),
5978                          "pg_toast_%u_index", relOid);
5979
5980         /* this is pretty painful...  need a tuple descriptor */
5981         tupdesc = CreateTemplateTupleDesc(3, false);
5982         TupleDescInitEntry(tupdesc, (AttrNumber) 1,
5983                                            "chunk_id",
5984                                            OIDOID,
5985                                            -1, 0);
5986         TupleDescInitEntry(tupdesc, (AttrNumber) 2,
5987                                            "chunk_seq",
5988                                            INT4OID,
5989                                            -1, 0);
5990         TupleDescInitEntry(tupdesc, (AttrNumber) 3,
5991                                            "chunk_data",
5992                                            BYTEAOID,
5993                                            -1, 0);
5994
5995         /*
5996          * Ensure that the toast table doesn't itself get toasted, or we'll be
5997          * toast :-(.  This is essential for chunk_data because type bytea is
5998          * toastable; hit the other two just to be sure.
5999          */
6000         tupdesc->attrs[0]->attstorage = 'p';
6001         tupdesc->attrs[1]->attstorage = 'p';
6002         tupdesc->attrs[2]->attstorage = 'p';
6003
6004         /*
6005          * Note: the toast relation is placed in the regular pg_toast namespace
6006          * even if its master relation is a temp table.  There cannot be any
6007          * naming collision, and the toast rel will be destroyed when its master
6008          * is, so there's no need to handle the toast rel as temp.
6009          */
6010         toast_relid = heap_create_with_catalog(toast_relname,
6011                                                                                    PG_TOAST_NAMESPACE,
6012                                                                                    rel->rd_rel->reltablespace,
6013                                                                                    InvalidOid,
6014                                                                                    rel->rd_rel->relowner,
6015                                                                                    tupdesc,
6016                                                                                    RELKIND_TOASTVALUE,
6017                                                                                    shared_relation,
6018                                                                                    true,
6019                                                                                    0,
6020                                                                                    ONCOMMIT_NOOP,
6021                                                                                    true);
6022
6023         /* make the toast relation visible, else index creation will fail */
6024         CommandCounterIncrement();
6025
6026         /*
6027          * Create unique index on chunk_id, chunk_seq.
6028          *
6029          * NOTE: the normal TOAST access routines could actually function with a
6030          * single-column index on chunk_id only. However, the slice access
6031          * routines use both columns for faster access to an individual chunk. In
6032          * addition, we want it to be unique as a check against the possibility of
6033          * duplicate TOAST chunk OIDs. The index might also be a little more
6034          * efficient this way, since btree isn't all that happy with large numbers
6035          * of equal keys.
6036          */
6037
6038         indexInfo = makeNode(IndexInfo);
6039         indexInfo->ii_NumIndexAttrs = 2;
6040         indexInfo->ii_KeyAttrNumbers[0] = 1;
6041         indexInfo->ii_KeyAttrNumbers[1] = 2;
6042         indexInfo->ii_Expressions = NIL;
6043         indexInfo->ii_ExpressionsState = NIL;
6044         indexInfo->ii_Predicate = NIL;
6045         indexInfo->ii_PredicateState = NIL;
6046         indexInfo->ii_Unique = true;
6047
6048         classObjectId[0] = OID_BTREE_OPS_OID;
6049         classObjectId[1] = INT4_BTREE_OPS_OID;
6050
6051         toast_idxid = index_create(toast_relid, toast_idxname, InvalidOid,
6052                                                            indexInfo,
6053                                                            BTREE_AM_OID,
6054                                                            rel->rd_rel->reltablespace,
6055                                                            classObjectId,
6056                                                            true, false, true, false);
6057
6058         /*
6059          * Update toast rel's pg_class entry to show that it has an index. The
6060          * index OID is stored into the reltoastidxid field for easy access by the
6061          * tuple toaster.
6062          */
6063         setRelhasindex(toast_relid, true, true, toast_idxid);
6064
6065         /*
6066          * Store the toast table's OID in the parent relation's pg_class row
6067          */
6068         class_rel = heap_open(RelationRelationId, RowExclusiveLock);
6069
6070         reltup = SearchSysCacheCopy(RELOID,
6071                                                                 ObjectIdGetDatum(relOid),
6072                                                                 0, 0, 0);
6073         if (!HeapTupleIsValid(reltup))
6074                 elog(ERROR, "cache lookup failed for relation %u", relOid);
6075
6076         ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
6077
6078         simple_heap_update(class_rel, &reltup->t_self, reltup);
6079
6080         /* Keep catalog indexes current */
6081         CatalogUpdateIndexes(class_rel, reltup);
6082
6083         heap_freetuple(reltup);
6084
6085         heap_close(class_rel, RowExclusiveLock);
6086
6087         /*
6088          * Register dependency from the toast table to the master, so that the
6089          * toast table will be deleted if the master is.
6090          */
6091         baseobject.classId = RelationRelationId;
6092         baseobject.objectId = relOid;
6093         baseobject.objectSubId = 0;
6094         toastobject.classId = RelationRelationId;
6095         toastobject.objectId = toast_relid;
6096         toastobject.objectSubId = 0;
6097
6098         recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
6099
6100         /*
6101          * Clean up and make changes visible
6102          */
6103         heap_close(rel, NoLock);
6104
6105         CommandCounterIncrement();
6106 }
6107
6108 /*
6109  * Check to see whether the table needs a TOAST table.  It does only if
6110  * (1) there are any toastable attributes, and (2) the maximum length
6111  * of a tuple could exceed TOAST_TUPLE_THRESHOLD.  (We don't want to
6112  * create a toast table for something like "f1 varchar(20)".)
6113  */
6114 static bool
6115 needs_toast_table(Relation rel)
6116 {
6117         int32           data_length = 0;
6118         bool            maxlength_unknown = false;
6119         bool            has_toastable_attrs = false;
6120         TupleDesc       tupdesc;
6121         Form_pg_attribute *att;
6122         int32           tuple_length;
6123         int                     i;
6124
6125         tupdesc = rel->rd_att;
6126         att = tupdesc->attrs;
6127
6128         for (i = 0; i < tupdesc->natts; i++)
6129         {
6130                 if (att[i]->attisdropped)
6131                         continue;
6132                 data_length = att_align(data_length, att[i]->attalign);
6133                 if (att[i]->attlen > 0)
6134                 {
6135                         /* Fixed-length types are never toastable */
6136                         data_length += att[i]->attlen;
6137                 }
6138                 else
6139                 {
6140                         int32           maxlen = type_maximum_size(att[i]->atttypid,
6141                                                                                                    att[i]->atttypmod);
6142
6143                         if (maxlen < 0)
6144                                 maxlength_unknown = true;
6145                         else
6146                                 data_length += maxlen;
6147                         if (att[i]->attstorage != 'p')
6148                                 has_toastable_attrs = true;
6149                 }
6150         }
6151         if (!has_toastable_attrs)
6152                 return false;                   /* nothing to toast? */
6153         if (maxlength_unknown)
6154                 return true;                    /* any unlimited-length attrs? */
6155         tuple_length = MAXALIGN(offsetof(HeapTupleHeaderData, t_bits) +
6156                                                         BITMAPLEN(tupdesc->natts)) +
6157                 MAXALIGN(data_length);
6158         return (tuple_length > TOAST_TUPLE_THRESHOLD);
6159 }
6160
6161
6162 /*
6163  * Execute ALTER TABLE SET SCHEMA
6164  *
6165  * Note: caller must have checked ownership of the relation already
6166  */
6167 void
6168 AlterTableNamespace(RangeVar *relation, const char *newschema)
6169 {
6170         Relation        rel;
6171         Oid                     relid;
6172         Oid                     oldNspOid;
6173         Oid                     nspOid;
6174         Relation        classRel;
6175
6176         rel = heap_openrv(relation, AccessExclusiveLock);
6177
6178         /* heap_openrv allows TOAST, but we don't want to */
6179         if (rel->rd_rel->relkind == RELKIND_TOASTVALUE)
6180                 ereport(ERROR,
6181                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
6182                                  errmsg("\"%s\" is a TOAST relation",
6183                                                 RelationGetRelationName(rel))));
6184
6185         relid = RelationGetRelid(rel);
6186         oldNspOid = RelationGetNamespace(rel);
6187
6188         /* get schema OID and check its permissions */
6189         nspOid = LookupCreationNamespace(newschema);
6190
6191         if (oldNspOid == nspOid)
6192                 ereport(ERROR,
6193                                 (errcode(ERRCODE_DUPLICATE_TABLE),
6194                                  errmsg("relation \"%s\" is already in schema \"%s\"",
6195                                                 RelationGetRelationName(rel),
6196                                                 newschema)));
6197
6198         /* disallow renaming into or out of temp schemas */
6199         if (isAnyTempNamespace(nspOid) || isAnyTempNamespace(oldNspOid))
6200                 ereport(ERROR,
6201                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6202                         errmsg("cannot move objects into or out of temporary schemas")));
6203
6204         /* same for TOAST schema */
6205         if (nspOid == PG_TOAST_NAMESPACE || oldNspOid == PG_TOAST_NAMESPACE)
6206                 ereport(ERROR,
6207                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6208                                  errmsg("cannot move objects into or out of TOAST schema")));
6209
6210         /* OK, modify the pg_class row and pg_depend entry */
6211         classRel = heap_open(RelationRelationId, RowExclusiveLock);
6212
6213         AlterRelationNamespaceInternal(classRel, relid, oldNspOid, nspOid, true);
6214
6215         /* Fix the table's rowtype too */
6216         AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false);
6217
6218         /* Fix other dependent stuff */
6219         if (rel->rd_rel->relkind == RELKIND_RELATION)
6220         {
6221                 AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
6222                 AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema);
6223                 AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
6224         }
6225
6226         heap_close(classRel, RowExclusiveLock);
6227
6228         /* close rel, but keep lock until commit */
6229         relation_close(rel, NoLock);
6230 }
6231
6232 /*
6233  * The guts of relocating a relation to another namespace: fix the pg_class
6234  * entry, and the pg_depend entry if any.  Caller must already have
6235  * opened and write-locked pg_class.
6236  */
6237 void
6238 AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
6239                                                            Oid oldNspOid, Oid newNspOid,
6240                                                            bool hasDependEntry)
6241 {
6242         HeapTuple       classTup;
6243         Form_pg_class classForm;
6244
6245         classTup = SearchSysCacheCopy(RELOID,
6246                                                                   ObjectIdGetDatum(relOid),
6247                                                                   0, 0, 0);
6248         if (!HeapTupleIsValid(classTup))
6249                 elog(ERROR, "cache lookup failed for relation %u", relOid);
6250         classForm = (Form_pg_class) GETSTRUCT(classTup);
6251
6252         Assert(classForm->relnamespace == oldNspOid);
6253
6254         /* check for duplicate name (more friendly than unique-index failure) */
6255         if (get_relname_relid(NameStr(classForm->relname),
6256                                                   newNspOid) != InvalidOid)
6257                 ereport(ERROR,
6258                                 (errcode(ERRCODE_DUPLICATE_TABLE),
6259                                  errmsg("relation \"%s\" already exists in schema \"%s\"",
6260                                                 NameStr(classForm->relname),
6261                                                 get_namespace_name(newNspOid))));
6262
6263         /* classTup is a copy, so OK to scribble on */
6264         classForm->relnamespace = newNspOid;
6265
6266         simple_heap_update(classRel, &classTup->t_self, classTup);
6267         CatalogUpdateIndexes(classRel, classTup);
6268
6269         /* Update dependency on schema if caller said so */
6270         if (hasDependEntry &&
6271                 changeDependencyFor(RelationRelationId, relOid,
6272                                                         NamespaceRelationId, oldNspOid, newNspOid) != 1)
6273                 elog(ERROR, "failed to change schema dependency for relation \"%s\"",
6274                          NameStr(classForm->relname));
6275
6276         heap_freetuple(classTup);
6277 }
6278
6279 /*
6280  * Move all indexes for the specified relation to another namespace.
6281  *
6282  * Note: we assume adequate permission checking was done by the caller,
6283  * and that the caller has a suitable lock on the owning relation.
6284  */
6285 static void
6286 AlterIndexNamespaces(Relation classRel, Relation rel,
6287                                          Oid oldNspOid, Oid newNspOid)
6288 {
6289         List       *indexList;
6290         ListCell   *l;
6291
6292         indexList = RelationGetIndexList(rel);
6293
6294         foreach(l, indexList)
6295         {
6296                 Oid                     indexOid = lfirst_oid(l);
6297
6298                 /*
6299                  * Note: currently, the index will not have its own dependency on the
6300                  * namespace, so we don't need to do changeDependencyFor(). There's no
6301                  * rowtype in pg_type, either.
6302                  */
6303                 AlterRelationNamespaceInternal(classRel, indexOid,
6304                                                                            oldNspOid, newNspOid,
6305                                                                            false);
6306         }
6307
6308         list_free(indexList);
6309 }
6310
6311 /*
6312  * Move all SERIAL-column sequences of the specified relation to another
6313  * namespace.
6314  *
6315  * Note: we assume adequate permission checking was done by the caller,
6316  * and that the caller has a suitable lock on the owning relation.
6317  */
6318 static void
6319 AlterSeqNamespaces(Relation classRel, Relation rel,
6320                                    Oid oldNspOid, Oid newNspOid, const char *newNspName)
6321 {
6322         Relation        depRel;
6323         SysScanDesc scan;
6324         ScanKeyData key[2];
6325         HeapTuple       tup;
6326
6327         /*
6328          * SERIAL sequences are those having an internal dependency on one of the
6329          * table's columns (we don't care *which* column, exactly).
6330          */
6331         depRel = heap_open(DependRelationId, AccessShareLock);
6332
6333         ScanKeyInit(&key[0],
6334                                 Anum_pg_depend_refclassid,
6335                                 BTEqualStrategyNumber, F_OIDEQ,
6336                                 ObjectIdGetDatum(RelationRelationId));
6337         ScanKeyInit(&key[1],
6338                                 Anum_pg_depend_refobjid,
6339                                 BTEqualStrategyNumber, F_OIDEQ,
6340                                 ObjectIdGetDatum(RelationGetRelid(rel)));
6341         /* we leave refobjsubid unspecified */
6342
6343         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
6344                                                           SnapshotNow, 2, key);
6345
6346         while (HeapTupleIsValid(tup = systable_getnext(scan)))
6347         {
6348                 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
6349                 Relation        seqRel;
6350
6351                 /* skip dependencies other than internal dependencies on columns */
6352                 if (depForm->refobjsubid == 0 ||
6353                         depForm->classid != RelationRelationId ||
6354                         depForm->objsubid != 0 ||
6355                         depForm->deptype != DEPENDENCY_INTERNAL)
6356                         continue;
6357
6358                 /* Use relation_open just in case it's an index */
6359                 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
6360
6361                 /* skip non-sequence relations */
6362                 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
6363                 {
6364                         /* No need to keep the lock */
6365                         relation_close(seqRel, AccessExclusiveLock);
6366                         continue;
6367                 }
6368
6369                 /* Fix the pg_class and pg_depend entries */
6370                 AlterRelationNamespaceInternal(classRel, depForm->objid,
6371                                                                            oldNspOid, newNspOid,
6372                                                                            true);
6373
6374                 /*
6375                  * Sequences have entries in pg_type. We need to be careful to move
6376                  * them to the new namespace, too.
6377                  */
6378                 AlterTypeNamespaceInternal(RelationGetForm(seqRel)->reltype,
6379                                                                    newNspOid, false);
6380
6381                 /* Now we can close it.  Keep the lock till end of transaction. */
6382                 relation_close(seqRel, NoLock);
6383         }
6384
6385         systable_endscan(scan);
6386
6387         relation_close(depRel, AccessShareLock);
6388 }
6389
6390
6391 /*
6392  * This code supports
6393  *      CREATE TEMP TABLE ... ON COMMIT { DROP | PRESERVE ROWS | DELETE ROWS }
6394  *
6395  * Because we only support this for TEMP tables, it's sufficient to remember
6396  * the state in a backend-local data structure.
6397  */
6398
6399 /*
6400  * Register a newly-created relation's ON COMMIT action.
6401  */
6402 void
6403 register_on_commit_action(Oid relid, OnCommitAction action)
6404 {
6405         OnCommitItem *oc;
6406         MemoryContext oldcxt;
6407
6408         /*
6409          * We needn't bother registering the relation unless there is an ON COMMIT
6410          * action we need to take.
6411          */
6412         if (action == ONCOMMIT_NOOP || action == ONCOMMIT_PRESERVE_ROWS)
6413                 return;
6414
6415         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
6416
6417         oc = (OnCommitItem *) palloc(sizeof(OnCommitItem));
6418         oc->relid = relid;
6419         oc->oncommit = action;
6420         oc->creating_subid = GetCurrentSubTransactionId();
6421         oc->deleting_subid = InvalidSubTransactionId;
6422
6423         on_commits = lcons(oc, on_commits);
6424
6425         MemoryContextSwitchTo(oldcxt);
6426 }
6427
6428 /*
6429  * Unregister any ON COMMIT action when a relation is deleted.
6430  *
6431  * Actually, we only mark the OnCommitItem entry as to be deleted after commit.
6432  */
6433 void
6434 remove_on_commit_action(Oid relid)
6435 {
6436         ListCell   *l;
6437
6438         foreach(l, on_commits)
6439         {
6440                 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6441
6442                 if (oc->relid == relid)
6443                 {
6444                         oc->deleting_subid = GetCurrentSubTransactionId();
6445                         break;
6446                 }
6447         }
6448 }
6449
6450 /*
6451  * Perform ON COMMIT actions.
6452  *
6453  * This is invoked just before actually committing, since it's possible
6454  * to encounter errors.
6455  */
6456 void
6457 PreCommit_on_commit_actions(void)
6458 {
6459         ListCell   *l;
6460         List       *oids_to_truncate = NIL;
6461
6462         foreach(l, on_commits)
6463         {
6464                 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6465
6466                 /* Ignore entry if already dropped in this xact */
6467                 if (oc->deleting_subid != InvalidSubTransactionId)
6468                         continue;
6469
6470                 switch (oc->oncommit)
6471                 {
6472                         case ONCOMMIT_NOOP:
6473                         case ONCOMMIT_PRESERVE_ROWS:
6474                                 /* Do nothing (there shouldn't be such entries, actually) */
6475                                 break;
6476                         case ONCOMMIT_DELETE_ROWS:
6477                                 oids_to_truncate = lappend_oid(oids_to_truncate, oc->relid);
6478                                 break;
6479                         case ONCOMMIT_DROP:
6480                                 {
6481                                         ObjectAddress object;
6482
6483                                         object.classId = RelationRelationId;
6484                                         object.objectId = oc->relid;
6485                                         object.objectSubId = 0;
6486                                         performDeletion(&object, DROP_CASCADE);
6487
6488                                         /*
6489                                          * Note that table deletion will call
6490                                          * remove_on_commit_action, so the entry should get marked
6491                                          * as deleted.
6492                                          */
6493                                         Assert(oc->deleting_subid != InvalidSubTransactionId);
6494                                         break;
6495                                 }
6496                 }
6497         }
6498         if (oids_to_truncate != NIL)
6499         {
6500                 heap_truncate(oids_to_truncate);
6501                 CommandCounterIncrement();              /* XXX needed? */
6502         }
6503 }
6504
6505 /*
6506  * Post-commit or post-abort cleanup for ON COMMIT management.
6507  *
6508  * All we do here is remove no-longer-needed OnCommitItem entries.
6509  *
6510  * During commit, remove entries that were deleted during this transaction;
6511  * during abort, remove those created during this transaction.
6512  */
6513 void
6514 AtEOXact_on_commit_actions(bool isCommit)
6515 {
6516         ListCell   *cur_item;
6517         ListCell   *prev_item;
6518
6519         prev_item = NULL;
6520         cur_item = list_head(on_commits);
6521
6522         while (cur_item != NULL)
6523         {
6524                 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6525
6526                 if (isCommit ? oc->deleting_subid != InvalidSubTransactionId :
6527                         oc->creating_subid != InvalidSubTransactionId)
6528                 {
6529                         /* cur_item must be removed */
6530                         on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6531                         pfree(oc);
6532                         if (prev_item)
6533                                 cur_item = lnext(prev_item);
6534                         else
6535                                 cur_item = list_head(on_commits);
6536                 }
6537                 else
6538                 {
6539                         /* cur_item must be preserved */
6540                         oc->creating_subid = InvalidSubTransactionId;
6541                         oc->deleting_subid = InvalidSubTransactionId;
6542                         prev_item = cur_item;
6543                         cur_item = lnext(prev_item);
6544                 }
6545         }
6546 }
6547
6548 /*
6549  * Post-subcommit or post-subabort cleanup for ON COMMIT management.
6550  *
6551  * During subabort, we can immediately remove entries created during this
6552  * subtransaction.      During subcommit, just relabel entries marked during
6553  * this subtransaction as being the parent's responsibility.
6554  */
6555 void
6556 AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
6557                                                           SubTransactionId parentSubid)
6558 {
6559         ListCell   *cur_item;
6560         ListCell   *prev_item;
6561
6562         prev_item = NULL;
6563         cur_item = list_head(on_commits);
6564
6565         while (cur_item != NULL)
6566         {
6567                 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6568
6569                 if (!isCommit && oc->creating_subid == mySubid)
6570                 {
6571                         /* cur_item must be removed */
6572                         on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6573                         pfree(oc);
6574                         if (prev_item)
6575                                 cur_item = lnext(prev_item);
6576                         else
6577                                 cur_item = list_head(on_commits);
6578                 }
6579                 else
6580                 {
6581                         /* cur_item must be preserved */
6582                         if (oc->creating_subid == mySubid)
6583                                 oc->creating_subid = parentSubid;
6584                         if (oc->deleting_subid == mySubid)
6585                                 oc->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId;
6586                         prev_item = cur_item;
6587                         cur_item = lnext(prev_item);
6588                 }
6589         }
6590 }