X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fcommands%2Ftablecmds.c;h=6c60ddd5c104f79909116f844a56df99bc7747bb;hb=65e3ea76417d1baab158fd8305ebed4f43141c7a;hp=c484c148c254acfbde412352687d48b04b9a7fd4;hpb=f2f5b05655afa80377757a2c335c01b28de24429;p=postgresql diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c484c148c2..6c60ddd5c1 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -3,19 +3,23 @@ * tablecmds.c * Commands for creating and altering table structures and settings * - * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.180 2006/03/05 15:58:24 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.273 2008/12/13 19:13:44 tgl Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" -#include "access/tuptoaster.h" +#include "access/heapam.h" +#include "access/reloptions.h" +#include "access/relscan.h" +#include "access/sysattr.h" +#include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" #include "catalog/heap.h" @@ -27,31 +31,40 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" +#include "catalog/pg_tablespace.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" +#include "catalog/pg_type_fn.h" +#include "catalog/storage.h" +#include "catalog/toasting.h" #include "commands/cluster.h" #include "commands/defrem.h" +#include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" #include "commands/typecmds.h" #include "executor/executor.h" -#include "lib/stringinfo.h" #include "miscadmin.h" #include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/parsenodes.h" #include "optimizer/clauses.h" #include "optimizer/plancat.h" #include "optimizer/prep.h" -#include "parser/analyze.h" #include "parser/gramparse.h" -#include "parser/parser.h" #include "parser/parse_clause.h" #include "parser/parse_coerce.h" #include "parser/parse_expr.h" #include "parser/parse_oper.h" #include "parser/parse_relation.h" #include "parser/parse_type.h" +#include "parser/parse_utilcmd.h" +#include "parser/parser.h" +#include "rewrite/rewriteDefine.h" #include "rewrite/rewriteHandler.h" +#include "storage/bufmgr.h" +#include "storage/lmgr.h" #include "storage/smgr.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -60,7 +73,9 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/relcache.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" +#include "utils/tqual.h" /* @@ -122,6 +137,7 @@ typedef struct AlteredTableInfo /* Information saved by Phases 1/2 for Phase 3: */ List *constraints; /* List of NewConstraint */ List *newvals; /* List of NewColumnValue */ + bool new_notnull; /* T if we added new NOT NULL constraints */ Oid newTableSpace; /* new tablespace; 0 means no change */ /* Objects to rebuild after completing ALTER TYPE operations */ List *changedConstraintOids; /* OIDs of constraints to rebuild */ @@ -131,12 +147,13 @@ typedef struct AlteredTableInfo } AlteredTableInfo; /* Struct describing one new constraint to check in Phase 3 scan */ +/* Note: new NOT NULL constraints are handled elsewhere */ typedef struct NewConstraint { char *name; /* Constraint name, or NULL if none */ - ConstrType contype; /* CHECK, NOT_NULL, or FOREIGN */ - AttrNumber attnum; /* only relevant for NOT_NULL */ + ConstrType contype; /* CHECK or FOREIGN */ Oid refrelid; /* PK rel, if FOREIGN */ + Oid conid; /* OID of pg_constraint entry, if FOREIGN */ Node *qual; /* Check expr or FkConstraint struct */ List *qualstate; /* Execution state for CHECK */ } NewConstraint; @@ -155,15 +172,66 @@ typedef struct NewColumnValue ExprState *exprstate; /* execution state */ } NewColumnValue; +/* + * Error-reporting support for RemoveRelations + */ +struct dropmsgstrings +{ + char kind; + int nonexistent_code; + const char *nonexistent_msg; + const char *skipping_msg; + const char *nota_msg; + const char *drophint_msg; +}; + +static const struct dropmsgstrings dropmsgstringarray[] = { + {RELKIND_RELATION, + ERRCODE_UNDEFINED_TABLE, + gettext_noop("table \"%s\" does not exist"), + gettext_noop("table \"%s\" does not exist, skipping"), + gettext_noop("\"%s\" is not a table"), + gettext_noop("Use DROP TABLE to remove a table.")}, + {RELKIND_SEQUENCE, + ERRCODE_UNDEFINED_TABLE, + gettext_noop("sequence \"%s\" does not exist"), + gettext_noop("sequence \"%s\" does not exist, skipping"), + gettext_noop("\"%s\" is not a sequence"), + gettext_noop("Use DROP SEQUENCE to remove a sequence.")}, + {RELKIND_VIEW, + ERRCODE_UNDEFINED_TABLE, + gettext_noop("view \"%s\" does not exist"), + gettext_noop("view \"%s\" does not exist, skipping"), + gettext_noop("\"%s\" is not a view"), + gettext_noop("Use DROP VIEW to remove a view.")}, + {RELKIND_INDEX, + ERRCODE_UNDEFINED_OBJECT, + gettext_noop("index \"%s\" does not exist"), + gettext_noop("index \"%s\" does not exist, skipping"), + gettext_noop("\"%s\" is not an index"), + gettext_noop("Use DROP INDEX to remove an index.")}, + {RELKIND_COMPOSITE_TYPE, + ERRCODE_UNDEFINED_OBJECT, + gettext_noop("type \"%s\" does not exist"), + gettext_noop("type \"%s\" does not exist, skipping"), + gettext_noop("\"%s\" is not a type"), + gettext_noop("Use DROP TYPE to remove a type.")}, + {'\0', 0, NULL, NULL, NULL, NULL} +}; + static void truncate_check_rel(Relation rel); static List *MergeAttributes(List *schema, List *supers, bool istemp, List **supOids, List **supconstr, int *supOidCount); -static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno); +static bool MergeCheckConstraint(List *constraints, char *name, Node *expr); +static bool change_varattnos_walker(Node *node, const AttrNumber *newattno); +static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel); +static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel); static void StoreCatalogInheritance(Oid relationId, List *supers); +static void StoreCatalogInheritance1(Oid relationId, Oid parentOid, + int16 seqNumber, Relation inhRelation); static int findAttrByName(const char *attributeName, List *schema); static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass); -static bool needs_toast_table(Relation rel); static void AlterIndexNamespaces(Relation classRel, Relation rel, Oid oldNspOid, Oid newNspOid); static void AlterSeqNamespaces(Relation classRel, Relation rel, @@ -179,32 +247,29 @@ static Oid transformFkeyCheckAttrs(Relation pkrel, int numattrs, int16 *attnums, Oid *opclasses); static void validateForeignKeyConstraint(FkConstraint *fkconstraint, - Relation rel, Relation pkrel); + Relation rel, Relation pkrel, Oid constraintOid); static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, - Oid constrOid); -static char *fkMatchTypeToString(char match_type); + Oid constraintOid); static void ATController(Relation rel, List *cmds, bool recurse); static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, bool recurse, bool recursing); static void ATRewriteCatalogs(List **wqueue); -static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd); +static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, + AlterTableCmd *cmd); static void ATRewriteTables(List **wqueue); static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap); static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel); static void ATSimplePermissions(Relation rel, bool allowView); +static void ATSimplePermissionsRelationOrIndex(Relation rel); static void ATSimpleRecursion(List **wqueue, Relation rel, AlterTableCmd *cmd, bool recurse); static void ATOneLevelRecursion(List **wqueue, Relation rel, AlterTableCmd *cmd); -static void find_composite_type_dependencies(Oid typeOid, - const char *origTblName); static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd); static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel, ColumnDef *colDef); static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid); -static void add_column_support_dependency(Oid relid, int32 attnum, - RangeVar *support); static void ATExecDropNotNull(Relation rel, const char *colName); static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, const char *colName); @@ -221,14 +286,18 @@ static void ATExecDropColumn(Relation rel, const char *colName, bool recurse, bool recursing); static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel, IndexStmt *stmt, bool is_rebuild); -static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, - Node *newConstraint); +static void ATExecAddConstraint(List **wqueue, + AlteredTableInfo *tab, Relation rel, + Node *newConstraint, bool recurse); +static void ATAddCheckConstraint(List **wqueue, + AlteredTableInfo *tab, Relation rel, + Constraint *constr, + bool recurse, bool recursing); static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, FkConstraint *fkconstraint); -static void ATPrepDropConstraint(List **wqueue, Relation rel, - bool recurse, AlterTableCmd *cmd); static void ATExecDropConstraint(Relation rel, const char *constrName, - DropBehavior behavior, bool quiet); + DropBehavior behavior, + bool recurse, bool recursing); static void ATPrepAlterColumnType(List **wqueue, AlteredTableInfo *tab, Relation rel, bool recurse, bool recursing, @@ -244,14 +313,15 @@ static void ATExecDropCluster(Relation rel); static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename); static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace); +static void ATExecSetRelOptions(Relation rel, List *defList, bool isReset); static void ATExecEnableDisableTrigger(Relation rel, char *trigname, - bool enable, bool skip_system); -static void copy_relation_data(Relation rel, SMgrRelation dst); -static void update_ri_trigger_args(Oid relid, - const char *oldname, - const char *newname, - bool fk_scan, - bool update_relname); + char fires_when, bool skip_system); +static void ATExecEnableDisableRule(Relation rel, char *rulename, + char fires_when); +static void ATExecAddInherit(Relation rel, RangeVar *parent); +static void ATExecDropInherit(Relation rel, RangeVar *parent); +static void copy_relation_data(SMgrRelation rel, SMgrRelation dst, + ForkNumber forkNum, bool istemp); /* ---------------------------------------------------------------- @@ -276,8 +346,9 @@ DefineRelation(CreateStmt *stmt, char relkind) bool localHasOids; int parentOidCount; List *rawDefaults; + List *cookedDefaults; + Datum reloptions; ListCell *listptr; - int i; AttrNumber attnum; /* @@ -313,7 +384,7 @@ DefineRelation(CreateStmt *stmt, char relkind) } /* - * Select tablespace to use. If not specified, use default_tablespace + * Select tablespace to use. If not specified, use default tablespace * (which may in turn default to database's default). */ if (stmt->tablespacename) @@ -327,12 +398,12 @@ DefineRelation(CreateStmt *stmt, char relkind) } else { - tablespaceId = GetDefaultTablespace(); + tablespaceId = GetDefaultTablespace(stmt->relation->istemp); /* note InvalidOid is OK in this case */ } /* Check permissions except when using database's default */ - if (OidIsValid(tablespaceId)) + if (OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace) { AclResult aclresult; @@ -343,6 +414,13 @@ DefineRelation(CreateStmt *stmt, char relkind) get_tablespace_name(tablespaceId)); } + /* + * Parse and validate reloptions, if any. + */ + reloptions = transformRelOptions((Datum) 0, stmt->options, true, false); + + (void) heap_reloptions(relkind, reloptions, true); + /* * Look up inheritance ancestors and generate relation schema, including * inherited attributes. @@ -352,85 +430,84 @@ DefineRelation(CreateStmt *stmt, char relkind) &inheritOids, &old_constraints, &parentOidCount); /* - * Create a relation descriptor from the relation schema and create the - * relation. Note that in this stage only inherited (pre-cooked) defaults - * and constraints will be included into the new relation. - * (BuildDescForRelation takes care of the inherited defaults, but we have - * to copy inherited constraints here.) + * Create a tuple descriptor from the relation schema. Note that this + * deals with column names, types, and NOT NULL constraints, but not + * default values or CHECK constraints; we handle those below. */ descriptor = BuildDescForRelation(schema); - localHasOids = interpretOidsOption(stmt->hasoids); + localHasOids = interpretOidsOption(stmt->options); descriptor->tdhasoid = (localHasOids || parentOidCount > 0); - if (old_constraints != NIL) + /* + * Find columns with default values and prepare for insertion of the + * defaults. Pre-cooked (that is, inherited) defaults go into a list of + * CookedConstraint structs that we'll pass to heap_create_with_catalog, + * while raw defaults go into a list of RawColumnDefault structs that + * will be processed by AddRelationNewConstraints. (We can't deal with + * raw expressions until we can do transformExpr.) + * + * We can set the atthasdef flags now in the tuple descriptor; this just + * saves StoreAttrDefault from having to do an immediate update of the + * pg_attribute rows. + */ + rawDefaults = NIL; + cookedDefaults = NIL; + attnum = 0; + + foreach(listptr, schema) { - ConstrCheck *check = (ConstrCheck *) - palloc0(list_length(old_constraints) * sizeof(ConstrCheck)); - int ncheck = 0; + ColumnDef *colDef = lfirst(listptr); - foreach(listptr, old_constraints) + attnum++; + + if (colDef->raw_default != NULL) { - Constraint *cdef = (Constraint *) lfirst(listptr); - bool dup = false; + RawColumnDefault *rawEnt; - if (cdef->contype != CONSTR_CHECK) - continue; - Assert(cdef->name != NULL); - Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL); + Assert(colDef->cooked_default == NULL); - /* - * In multiple-inheritance situations, it's possible to inherit - * the same grandparent constraint through multiple parents. - * Hence, discard inherited constraints that match as to both name - * and expression. Otherwise, gripe if the names conflict. - */ - for (i = 0; i < ncheck; i++) - { - if (strcmp(check[i].ccname, cdef->name) != 0) - continue; - if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0) - { - dup = true; - break; - } - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("duplicate check constraint name \"%s\"", - cdef->name))); - } - if (!dup) - { - check[ncheck].ccname = cdef->name; - check[ncheck].ccbin = pstrdup(cdef->cooked_expr); - ncheck++; - } + rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault)); + rawEnt->attnum = attnum; + rawEnt->raw_default = colDef->raw_default; + rawDefaults = lappend(rawDefaults, rawEnt); + descriptor->attrs[attnum - 1]->atthasdef = true; } - if (ncheck > 0) + else if (colDef->cooked_default != NULL) { - if (descriptor->constr == NULL) - { - descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr)); - descriptor->constr->defval = NULL; - descriptor->constr->num_defval = 0; - descriptor->constr->has_not_null = false; - } - descriptor->constr->num_check = ncheck; - descriptor->constr->check = check; + CookedConstraint *cooked; + + cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint)); + cooked->contype = CONSTR_DEFAULT; + cooked->name = NULL; + cooked->attnum = attnum; + cooked->expr = stringToNode(colDef->cooked_default); + cooked->is_local = true; /* not used for defaults */ + cooked->inhcount = 0; /* ditto */ + cookedDefaults = lappend(cookedDefaults, cooked); + descriptor->attrs[attnum - 1]->atthasdef = true; } } + /* + * Create the relation. Inherited defaults and constraints are passed + * in for immediate handling --- since they don't need parsing, they + * can be stored immediately. + */ relationId = heap_create_with_catalog(relname, namespaceId, tablespaceId, InvalidOid, GetUserId(), descriptor, + list_concat(cookedDefaults, + old_constraints), relkind, false, localHasOids, parentOidCount, stmt->oncommit, + reloptions, allowSystemTableMods); StoreCatalogInheritance(relationId, inheritOids); @@ -457,44 +534,10 @@ DefineRelation(CreateStmt *stmt, char relkind) * apply the parser's transformExpr routine, but transformExpr doesn't * work unless we have a pre-existing relation. So, the transformation has * to be postponed to this final step of CREATE TABLE. - * - * Another task that's conveniently done at this step is to add dependency - * links between columns and supporting relations (such as SERIAL - * sequences). - * - * First, scan schema to find new column defaults. - */ - rawDefaults = NIL; - attnum = 0; - - foreach(listptr, schema) - { - ColumnDef *colDef = lfirst(listptr); - - attnum++; - - if (colDef->raw_default != NULL) - { - RawColumnDefault *rawEnt; - - Assert(colDef->cooked_default == NULL); - - rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault)); - rawEnt->attnum = attnum; - rawEnt->raw_default = colDef->raw_default; - rawDefaults = lappend(rawDefaults, rawEnt); - } - - /* Create dependency for supporting relation for this column */ - if (colDef->support != NULL) - add_column_support_dependency(relationId, attnum, colDef->support); - } - - /* - * Parse and add the defaults/constraints, if any. */ if (rawDefaults || stmt->constraints) - AddRelationRawConstraints(rel, rawDefaults, stmt->constraints); + AddRelationNewConstraints(rel, rawDefaults, stmt->constraints, + true, true); /* * Clean up. We keep lock on new relation (although it shouldn't be @@ -506,22 +549,197 @@ DefineRelation(CreateStmt *stmt, char relkind) } /* - * RemoveRelation - * Deletes a relation. + * Emit the right error or warning message for a "DROP" command issued on a + * non-existent relation + */ +static void +DropErrorMsgNonExistent(const char *relname, char rightkind, bool missing_ok) +{ + const struct dropmsgstrings *rentry; + + for (rentry = dropmsgstringarray; rentry->kind != '\0'; rentry++) + { + if (rentry->kind == rightkind) + { + if (!missing_ok) + { + ereport(ERROR, + (errcode(rentry->nonexistent_code), + errmsg(rentry->nonexistent_msg, relname))); + } + else + { + ereport(NOTICE, (errmsg(rentry->skipping_msg, relname))); + break; + } + } + } + + Assert(rentry->kind != '\0'); /* Should be impossible */ +} + +/* + * Emit the right error message for a "DROP" command issued on a + * relation of the wrong type + */ +static void +DropErrorMsgWrongType(const char *relname, char wrongkind, char rightkind) +{ + const struct dropmsgstrings *rentry; + const struct dropmsgstrings *wentry; + + for (rentry = dropmsgstringarray; rentry->kind != '\0'; rentry++) + if (rentry->kind == rightkind) + break; + Assert(rentry->kind != '\0'); + + for (wentry = dropmsgstringarray; wentry->kind != '\0'; wentry++) + if (wentry->kind == wrongkind) + break; + /* wrongkind could be something we don't have in our table... */ + + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg(rentry->nota_msg, relname), + (wentry->kind != '\0') ? errhint(wentry->drophint_msg) : 0)); +} + +/* + * RemoveRelations + * Implements DROP TABLE, DROP INDEX, DROP SEQUENCE, DROP VIEW */ void -RemoveRelation(const RangeVar *relation, DropBehavior behavior) +RemoveRelations(DropStmt *drop) { - Oid relOid; - ObjectAddress object; + ObjectAddresses *objects; + char relkind; + ListCell *cell; - relOid = RangeVarGetRelid(relation, false); + /* + * First we identify all the relations, then we delete them in a single + * performMultipleDeletions() call. This is to avoid unwanted + * DROP RESTRICT errors if one of the relations depends on another. + */ - object.classId = RelationRelationId; - object.objectId = relOid; - object.objectSubId = 0; + /* Determine required relkind */ + switch (drop->removeType) + { + case OBJECT_TABLE: + relkind = RELKIND_RELATION; + break; - performDeletion(&object, behavior); + case OBJECT_INDEX: + relkind = RELKIND_INDEX; + break; + + case OBJECT_SEQUENCE: + relkind = RELKIND_SEQUENCE; + break; + + case OBJECT_VIEW: + relkind = RELKIND_VIEW; + break; + + default: + elog(ERROR, "unrecognized drop object type: %d", + (int) drop->removeType); + relkind = 0; /* keep compiler quiet */ + break; + } + + /* Lock and validate each relation; build a list of object addresses */ + objects = new_object_addresses(); + + foreach(cell, drop->objects) + { + RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell)); + Oid relOid; + HeapTuple tuple; + Form_pg_class classform; + ObjectAddress obj; + + /* + * These next few steps are a great deal like relation_openrv, but we + * don't bother building a relcache entry since we don't need it. + * + * Check for shared-cache-inval messages before trying to access the + * relation. This is needed to cover the case where the name + * identifies a rel that has been dropped and recreated since the + * start of our transaction: if we don't flush the old syscache entry, + * then we'll latch onto that entry and suffer an error later. + */ + AcceptInvalidationMessages(); + + /* Look up the appropriate relation using namespace search */ + relOid = RangeVarGetRelid(rel, true); + + /* Not there? */ + if (!OidIsValid(relOid)) + { + DropErrorMsgNonExistent(rel->relname, relkind, drop->missing_ok); + continue; + } + + /* + * In DROP INDEX, attempt to acquire lock on the parent table before + * locking the index. index_drop() will need this anyway, and since + * regular queries lock tables before their indexes, we risk deadlock + * if we do it the other way around. No error if we don't find a + * pg_index entry, though --- that most likely means it isn't an + * index, and we'll fail below. + */ + if (relkind == RELKIND_INDEX) + { + tuple = SearchSysCache(INDEXRELID, + ObjectIdGetDatum(relOid), + 0, 0, 0); + if (HeapTupleIsValid(tuple)) + { + Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple); + + LockRelationOid(index->indrelid, AccessExclusiveLock); + ReleaseSysCache(tuple); + } + } + + /* Get the lock before trying to fetch the syscache entry */ + LockRelationOid(relOid, AccessExclusiveLock); + + tuple = SearchSysCache(RELOID, + ObjectIdGetDatum(relOid), + 0, 0, 0); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for relation %u", relOid); + classform = (Form_pg_class) GETSTRUCT(tuple); + + if (classform->relkind != relkind) + DropErrorMsgWrongType(rel->relname, classform->relkind, relkind); + + /* Allow DROP to either table owner or schema owner */ + if (!pg_class_ownercheck(relOid, GetUserId()) && + !pg_namespace_ownercheck(classform->relnamespace, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + rel->relname); + + if (!allowSystemTableMods && IsSystemClass(classform)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied: \"%s\" is a system catalog", + rel->relname))); + + /* OK, we're ready to delete this one */ + obj.classId = RelationRelationId; + obj.objectId = relOid; + obj.objectSubId = 0; + + add_exact_object_address(&obj, objects); + + ReleaseSysCache(tuple); + } + + performMultipleDeletions(objects, drop->behavior); + + free_object_addresses(objects); } /* @@ -541,6 +759,10 @@ ExecuteTruncate(TruncateStmt *stmt) { List *rels = NIL; List *relids = NIL; + List *seq_relids = NIL; + EState *estate; + ResultRelInfo *resultRelInfos; + ResultRelInfo *resultRelInfo; ListCell *cell; /* @@ -552,24 +774,30 @@ ExecuteTruncate(TruncateStmt *stmt) Relation rel; rel = heap_openrv(rv, AccessExclusiveLock); + /* don't throw error for "TRUNCATE foo, foo" */ + if (list_member_oid(relids, RelationGetRelid(rel))) + { + heap_close(rel, AccessExclusiveLock); + continue; + } truncate_check_rel(rel); rels = lappend(rels, rel); relids = lappend_oid(relids, RelationGetRelid(rel)); } /* - * In CASCADE mode, suck in all referencing relations as well. This - * requires multiple iterations to find indirectly-dependent relations. - * At each phase, we need to exclusive-lock new rels before looking - * for their dependencies, else we might miss something. Also, we - * check each rel as soon as we open it, to avoid a faux pas such as - * holding lock for a long time on a rel we have no permissions for. + * In CASCADE mode, suck in all referencing relations as well. This + * requires multiple iterations to find indirectly-dependent relations. At + * each phase, we need to exclusive-lock new rels before looking for their + * dependencies, else we might miss something. Also, we check each rel as + * soon as we open it, to avoid a faux pas such as holding lock for a long + * time on a rel we have no permissions for. */ if (stmt->behavior == DROP_CASCADE) { for (;;) { - List *newrelids; + List *newrelids; newrelids = heap_truncate_find_FKs(relids); if (newrelids == NIL) @@ -577,7 +805,7 @@ ExecuteTruncate(TruncateStmt *stmt) foreach(cell, newrelids) { - Oid relid = lfirst_oid(cell); + Oid relid = lfirst_oid(cell); Relation rel; rel = heap_open(relid, AccessExclusiveLock); @@ -593,8 +821,8 @@ ExecuteTruncate(TruncateStmt *stmt) /* * Check foreign key references. In CASCADE mode, this should be - * unnecessary since we just pulled in all the references; but as - * a cross-check, do it anyway if in an Assert-enabled build. + * unnecessary since we just pulled in all the references; but as a + * cross-check, do it anyway if in an Assert-enabled build. */ #ifdef USE_ASSERT_CHECKING heap_truncate_check_FKs(rels, false); @@ -603,6 +831,79 @@ ExecuteTruncate(TruncateStmt *stmt) heap_truncate_check_FKs(rels, false); #endif + /* + * If we are asked to restart sequences, find all the sequences, + * lock them (we only need AccessShareLock because that's all that + * ALTER SEQUENCE takes), and check permissions. We want to do this + * early since it's pointless to do all the truncation work only to fail + * on sequence permissions. + */ + if (stmt->restart_seqs) + { + foreach(cell, rels) + { + Relation rel = (Relation) lfirst(cell); + List *seqlist = getOwnedSequences(RelationGetRelid(rel)); + ListCell *seqcell; + + foreach(seqcell, seqlist) + { + Oid seq_relid = lfirst_oid(seqcell); + Relation seq_rel; + + seq_rel = relation_open(seq_relid, AccessShareLock); + + /* This check must match AlterSequence! */ + if (!pg_class_ownercheck(seq_relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + RelationGetRelationName(seq_rel)); + + seq_relids = lappend_oid(seq_relids, seq_relid); + + relation_close(seq_rel, NoLock); + } + } + } + + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + + /* + * To fire triggers, we'll need an EState as well as a ResultRelInfo + * for each relation. + */ + estate = CreateExecutorState(); + resultRelInfos = (ResultRelInfo *) + palloc(list_length(rels) * sizeof(ResultRelInfo)); + resultRelInfo = resultRelInfos; + foreach(cell, rels) + { + Relation rel = (Relation) lfirst(cell); + + InitResultRelInfo(resultRelInfo, + rel, + 0, /* dummy rangetable index */ + CMD_DELETE, /* don't need any index info */ + false); + resultRelInfo++; + } + estate->es_result_relations = resultRelInfos; + estate->es_num_result_relations = list_length(rels); + + /* + * Process all BEFORE STATEMENT TRUNCATE triggers before we begin + * truncating (this is because one of them might throw an error). + * Also, if we were to allow them to prevent statement execution, + * that would need to be handled here. + */ + resultRelInfo = resultRelInfos; + foreach(cell, rels) + { + estate->es_result_relation_info = resultRelInfo; + ExecBSTruncateTriggers(estate, resultRelInfo); + resultRelInfo++; + } + /* * OK, truncate each table. */ @@ -617,20 +918,18 @@ ExecuteTruncate(TruncateStmt *stmt) * the relfilenode value. The old storage file is scheduled for * deletion at commit. */ - setNewRelfilenode(rel); + setNewRelfilenode(rel, RecentXmin); heap_relid = RelationGetRelid(rel); toast_relid = rel->rd_rel->reltoastrelid; - heap_close(rel, NoLock); - /* * The same for the toast table, if any. */ if (OidIsValid(toast_relid)) { rel = relation_open(toast_relid, AccessExclusiveLock); - setNewRelfilenode(rel); + setNewRelfilenode(rel, RecentXmin); heap_close(rel, NoLock); } @@ -639,14 +938,60 @@ ExecuteTruncate(TruncateStmt *stmt) */ reindex_relation(heap_relid, true); } + + /* + * Process all AFTER STATEMENT TRUNCATE triggers. + */ + resultRelInfo = resultRelInfos; + foreach(cell, rels) + { + estate->es_result_relation_info = resultRelInfo; + ExecASTruncateTriggers(estate, resultRelInfo); + resultRelInfo++; + } + + /* Handle queued AFTER triggers */ + AfterTriggerEndQuery(estate); + + /* We can clean up the EState now */ + FreeExecutorState(estate); + + /* And close the rels (can't do this while EState still holds refs) */ + foreach(cell, rels) + { + Relation rel = (Relation) lfirst(cell); + + heap_close(rel, NoLock); + } + + /* + * Lastly, restart any owned sequences if we were asked to. This is done + * last because it's nontransactional: restarts will not roll back if + * we abort later. Hence it's important to postpone them as long as + * possible. (This is also a big reason why we locked and + * permission-checked the sequences beforehand.) + */ + if (stmt->restart_seqs) + { + List *options = list_make1(makeDefElem("restart", NULL)); + + foreach(cell, seq_relids) + { + Oid seq_relid = lfirst_oid(cell); + + AlterSequenceInternal(seq_relid, options); + } + } } /* - * Check that a given rel is safe to truncate. Subroutine for ExecuteTruncate + * Check that a given rel is safe to truncate. Subroutine for ExecuteTruncate */ static void truncate_check_rel(Relation rel) { + AclResult aclresult; + /* Only allow truncate on regular tables */ if (rel->rd_rel->relkind != RELKIND_RELATION) ereport(ERROR, @@ -655,8 +1000,10 @@ truncate_check_rel(Relation rel) RelationGetRelationName(rel)))); /* Permissions checks */ - if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), + ACL_TRUNCATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_CLASS, RelationGetRelationName(rel)); if (!allowSystemTableMods && IsSystemRelation(rel)) @@ -666,9 +1013,8 @@ truncate_check_rel(Relation rel) RelationGetRelationName(rel)))); /* - * We can never allow truncation of shared or nailed-in-cache - * relations, because we can't support changing their relfilenode - * values. + * We can never allow truncation of shared or nailed-in-cache relations, + * because we can't support changing their relfilenode values. */ if (rel->rd_rel->relisshared || rel->rd_isnailed) ereport(ERROR, @@ -677,13 +1023,19 @@ truncate_check_rel(Relation rel) RelationGetRelationName(rel)))); /* - * Don't allow truncate on temp tables of other backends ... their - * local buffer manager is not going to cope. + * Don't allow truncate on temp tables of other backends ... their local + * buffer manager is not going to cope. */ if (isOtherTempNamespace(RelationGetNamespace(rel))) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot truncate temporary tables of other sessions"))); + errmsg("cannot truncate temporary tables of other sessions"))); + + /* + * Also check for active uses of the relation in the current transaction, + * including open scans and pending AFTER trigger events. + */ + CheckTableNotInUse(rel, "TRUNCATE"); } /*---------- @@ -793,7 +1145,7 @@ MergeAttributes(List *schema, List *supers, bool istemp, if (strcmp(coldef->colname, restdef->colname) == 0) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_COLUMN), - errmsg("column \"%s\" duplicated", + errmsg("column \"%s\" specified more than once", coldef->colname))); } } @@ -841,8 +1193,8 @@ MergeAttributes(List *schema, List *supers, bool istemp, if (list_member_oid(parentOids, RelationGetRelid(relation))) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE), - errmsg("inherited relation \"%s\" duplicated", - parent->relname))); + errmsg("relation \"%s\" would be inherited from more than once", + parent->relname))); parentOids = lappend_oid(parentOids, RelationGetRelid(relation)); @@ -867,7 +1219,6 @@ MergeAttributes(List *schema, List *supers, bool istemp, char *attributeName = NameStr(attribute->attname); int exist_attno; ColumnDef *def; - TypeName *typename; /* * Ignore dropped columns in the parent. @@ -889,6 +1240,9 @@ MergeAttributes(List *schema, List *supers, bool istemp, exist_attno = findAttrByName(attributeName, inhSchema); if (exist_attno > 0) { + Oid defTypeId; + int32 deftypmod; + /* * Yes, try to merge the two column definitions. They must * have the same type and typmod. @@ -897,8 +1251,9 @@ MergeAttributes(List *schema, List *supers, bool istemp, (errmsg("merging multiple inherited definitions of column \"%s\"", attributeName))); def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1); - if (typenameTypeId(def->typename) != attribute->atttypid || - def->typename->typmod != attribute->atttypmod) + defTypeId = typenameTypeId(NULL, def->typename, &deftypmod); + if (defTypeId != attribute->atttypid || + deftypmod != attribute->atttypmod) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("inherited column \"%s\" has a type conflict", @@ -919,17 +1274,14 @@ MergeAttributes(List *schema, List *supers, bool istemp, */ def = makeNode(ColumnDef); def->colname = pstrdup(attributeName); - typename = makeNode(TypeName); - typename->typeid = attribute->atttypid; - typename->typmod = attribute->atttypmod; - def->typename = typename; + def->typename = makeTypeNameFromOid(attribute->atttypid, + attribute->atttypmod); def->inhcount = 1; def->is_local = false; def->is_not_null = attribute->attnotnull; def->raw_default = NULL; def->cooked_default = NULL; def->constraints = NIL; - def->support = NULL; inhSchema = lappend(inhSchema, def); newattno[parent_attno - 1] = ++child_attno; } @@ -978,8 +1330,9 @@ MergeAttributes(List *schema, List *supers, bool istemp, } /* - * Now copy the constraints of this parent, adjusting attnos using the - * completed newattno[] map + * Now copy the CHECK constraints of this parent, adjusting attnos + * using the completed newattno[] map. Identically named constraints + * are merged if possible, else we throw error. */ if (constr && constr->num_check > 0) { @@ -988,17 +1341,28 @@ MergeAttributes(List *schema, List *supers, bool istemp, for (i = 0; i < constr->num_check; i++) { - Constraint *cdef = makeNode(Constraint); + char *name = check[i].ccname; Node *expr; - cdef->contype = CONSTR_CHECK; - cdef->name = pstrdup(check[i].ccname); - cdef->raw_expr = NULL; /* adjust varattnos of ccbin here */ expr = stringToNode(check[i].ccbin); change_varattnos_of_a_node(expr, newattno); - cdef->cooked_expr = nodeToString(expr); - constraints = lappend(constraints, cdef); + + /* check for duplicate */ + if (!MergeCheckConstraint(constraints, name, expr)) + { + /* nope, this is a new one */ + CookedConstraint *cooked; + + cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint)); + cooked->contype = CONSTR_CHECK; + cooked->name = pstrdup(name); + cooked->attnum = 0; /* not used for constraints */ + cooked->expr = expr; + cooked->is_local = false; + cooked->inhcount = 1; + constraints = lappend(constraints, cooked); + } } } @@ -1032,6 +1396,10 @@ MergeAttributes(List *schema, List *supers, bool istemp, if (exist_attno > 0) { ColumnDef *def; + Oid defTypeId, + newTypeId; + int32 deftypmod, + newtypmod; /* * Yes, try to merge the two column definitions. They must @@ -1041,8 +1409,9 @@ MergeAttributes(List *schema, List *supers, bool istemp, (errmsg("merging column \"%s\" with inherited definition", attributeName))); def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1); - if (typenameTypeId(def->typename) != typenameTypeId(newdef->typename) || - def->typename->typmod != newdef->typename->typmod) + defTypeId = typenameTypeId(NULL, def->typename, &deftypmod); + newTypeId = typenameTypeId(NULL, newdef->typename, &newtypmod); + if (defTypeId != newTypeId || deftypmod != newtypmod) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("column \"%s\" has a type conflict", @@ -1108,47 +1477,146 @@ MergeAttributes(List *schema, List *supers, bool istemp, return schema; } + /* - * complementary static functions for MergeAttributes(). + * MergeCheckConstraint + * Try to merge an inherited CHECK constraint with previous ones * - * Varattnos of pg_constraint.conbin must be rewritten when subclasses inherit - * constraints from parent classes, since the inherited attributes could - * be given different column numbers in multiple-inheritance cases. + * If we inherit identically-named constraints from multiple parents, we must + * merge them, or throw an error if they don't have identical definitions. * - * Note that the passed node tree is modified in place! + * constraints is a list of CookedConstraint structs for previous constraints. + * + * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's + * got a so-far-unique name, or throws error if conflict. */ static bool -change_varattnos_walker(Node *node, const AttrNumber *newattno) +MergeCheckConstraint(List *constraints, char *name, Node *expr) { - if (node == NULL) - return false; - if (IsA(node, Var)) + ListCell *lc; + + foreach(lc, constraints) { - Var *var = (Var *) node; + CookedConstraint *ccon = (CookedConstraint *) lfirst(lc); - if (var->varlevelsup == 0 && var->varno == 1 && - var->varattno > 0) + Assert(ccon->contype == CONSTR_CHECK); + + /* Non-matching names never conflict */ + if (strcmp(ccon->name, name) != 0) + continue; + + if (equal(expr, ccon->expr)) { - /* - * ??? the following may be a problem when the node is multiply - * referenced though stringToNode() doesn't create such a node - * currently. - */ - Assert(newattno[var->varattno - 1] > 0); - var->varattno = newattno[var->varattno - 1]; + /* OK to merge */ + ccon->inhcount++; + return true; } - return false; - } + + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("check constraint name \"%s\" appears multiple times but with different expressions", + name))); + } + + return false; +} + + +/* + * Replace varattno values in an expression tree according to the given + * map array, that is, varattno N is replaced by newattno[N-1]. It is + * caller's responsibility to ensure that the array is long enough to + * define values for all user varattnos present in the tree. System column + * attnos remain unchanged. + * + * Note that the passed node tree is modified in-place! + */ +void +change_varattnos_of_a_node(Node *node, const AttrNumber *newattno) +{ + /* no setup needed, so away we go */ + (void) change_varattnos_walker(node, newattno); +} + +static bool +change_varattnos_walker(Node *node, const AttrNumber *newattno) +{ + if (node == NULL) + return false; + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + if (var->varlevelsup == 0 && var->varno == 1 && + var->varattno > 0) + { + /* + * ??? the following may be a problem when the node is multiply + * referenced though stringToNode() doesn't create such a node + * currently. + */ + Assert(newattno[var->varattno - 1] > 0); + var->varattno = var->varoattno = newattno[var->varattno - 1]; + } + return false; + } return expression_tree_walker(node, change_varattnos_walker, (void *) newattno); } -static bool -change_varattnos_of_a_node(Node *node, const AttrNumber *newattno) +/* + * Generate a map for change_varattnos_of_a_node from old and new TupleDesc's, + * matching according to column name. + */ +AttrNumber * +varattnos_map(TupleDesc old, TupleDesc new) +{ + AttrNumber *attmap; + int i, + j; + + attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts); + for (i = 1; i <= old->natts; i++) + { + if (old->attrs[i - 1]->attisdropped) + continue; /* leave the entry as zero */ + + for (j = 1; j <= new->natts; j++) + { + if (strcmp(NameStr(old->attrs[i - 1]->attname), + NameStr(new->attrs[j - 1]->attname)) == 0) + { + attmap[i - 1] = j; + break; + } + } + } + return attmap; +} + +/* + * Generate a map for change_varattnos_of_a_node from a TupleDesc and a list + * of ColumnDefs + */ +AttrNumber * +varattnos_map_schema(TupleDesc old, List *schema) { - return change_varattnos_walker(node, newattno); + AttrNumber *attmap; + int i; + + attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts); + for (i = 1; i <= old->natts; i++) + { + if (old->attrs[i - 1]->attisdropped) + continue; /* leave the entry as zero */ + + attmap[i - 1] = findAttrByName(NameStr(old->attrs[i - 1]->attname), + schema); + } + return attmap; } + /* * StoreCatalogInheritance * Updates the system catalogs with proper inheritance information. @@ -1159,10 +1627,8 @@ static void StoreCatalogInheritance(Oid relationId, List *supers) { Relation relation; - TupleDesc desc; int16 seqNumber; ListCell *entry; - HeapTuple tuple; /* * sanity checks @@ -1182,54 +1648,69 @@ StoreCatalogInheritance(Oid relationId, List *supers) * anymore, there's no need to look for indirect ancestors.) */ relation = heap_open(InheritsRelationId, RowExclusiveLock); - desc = RelationGetDescr(relation); seqNumber = 1; foreach(entry, supers) { Oid parentOid = lfirst_oid(entry); - Datum datum[Natts_pg_inherits]; - char nullarr[Natts_pg_inherits]; - ObjectAddress childobject, - parentobject; - datum[0] = ObjectIdGetDatum(relationId); /* inhrel */ - datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */ - datum[2] = Int16GetDatum(seqNumber); /* inhseqno */ + StoreCatalogInheritance1(relationId, parentOid, seqNumber, relation); + seqNumber++; + } + + heap_close(relation, RowExclusiveLock); +} - nullarr[0] = ' '; - nullarr[1] = ' '; - nullarr[2] = ' '; +/* + * Make catalog entries showing relationId as being an inheritance child + * of parentOid. inhRelation is the already-opened pg_inherits catalog. + */ +static void +StoreCatalogInheritance1(Oid relationId, Oid parentOid, + int16 seqNumber, Relation inhRelation) +{ + TupleDesc desc = RelationGetDescr(inhRelation); + Datum datum[Natts_pg_inherits]; + bool nullarr[Natts_pg_inherits]; + ObjectAddress childobject, + parentobject; + HeapTuple tuple; - tuple = heap_formtuple(desc, datum, nullarr); + /* + * Make the pg_inherits entry + */ + datum[0] = ObjectIdGetDatum(relationId); /* inhrelid */ + datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */ + datum[2] = Int16GetDatum(seqNumber); /* inhseqno */ - simple_heap_insert(relation, tuple); + nullarr[0] = false; + nullarr[1] = false; + nullarr[2] = false; - CatalogUpdateIndexes(relation, tuple); + tuple = heap_form_tuple(desc, datum, nullarr); - heap_freetuple(tuple); + simple_heap_insert(inhRelation, tuple); - /* - * Store a dependency too - */ - parentobject.classId = RelationRelationId; - parentobject.objectId = parentOid; - parentobject.objectSubId = 0; - childobject.classId = RelationRelationId; - childobject.objectId = relationId; - childobject.objectSubId = 0; + CatalogUpdateIndexes(inhRelation, tuple); - recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL); + heap_freetuple(tuple); - /* - * Mark the parent as having subclasses. - */ - setRelhassubclassInRelation(parentOid, true); + /* + * Store a dependency too + */ + parentobject.classId = RelationRelationId; + parentobject.objectId = parentOid; + parentobject.objectSubId = 0; + childobject.classId = RelationRelationId; + childobject.objectId = relationId; + childobject.objectSubId = 0; - seqNumber += 1; - } + recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL); - heap_close(relation, RowExclusiveLock); + /* + * Mark the parent as having subclasses. + */ + setRelhassubclassInRelation(parentOid, true); } /* @@ -1500,26 +1981,69 @@ renameatt(Oid myrelid, heap_close(attrelation, RowExclusiveLock); + relation_close(targetrelation, NoLock); /* close rel but keep lock */ +} + + +/* + * Execute ALTER TABLE/INDEX/SEQUENCE/VIEW RENAME + * + * Caller has already done permissions checks. + */ +void +RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype) +{ + Relation targetrelation; + Oid namespaceId; + char relkind; + /* - * Update att name in any RI triggers associated with the relation. + * Grab an exclusive lock on the target table, index, sequence or view, + * which we will NOT release until end of transaction. */ - if (targetrelation->rd_rel->reltriggers > 0) - { - /* update tgargs column reference where att is primary key */ - update_ri_trigger_args(RelationGetRelid(targetrelation), - oldattname, newattname, - false, false); - /* update tgargs column reference where att is foreign key */ - update_ri_trigger_args(RelationGetRelid(targetrelation), - oldattname, newattname, - true, false); - } + targetrelation = relation_open(myrelid, AccessExclusiveLock); - relation_close(targetrelation, NoLock); /* close rel but keep lock */ + namespaceId = RelationGetNamespace(targetrelation); + relkind = targetrelation->rd_rel->relkind; + + /* + * For compatibility with prior releases, we don't complain if ALTER TABLE + * or ALTER INDEX is used to rename a sequence or view. + */ + if (reltype == OBJECT_SEQUENCE && relkind != RELKIND_SEQUENCE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a sequence", + RelationGetRelationName(targetrelation)))); + + if (reltype == OBJECT_VIEW && relkind != RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a view", + RelationGetRelationName(targetrelation)))); + + /* + * Don't allow ALTER TABLE on composite types. + * We want people to use ALTER TYPE for that. + */ + if (relkind == RELKIND_COMPOSITE_TYPE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a composite type", + RelationGetRelationName(targetrelation)), + errhint("Use ALTER TYPE instead."))); + + /* Do the work */ + RenameRelationInternal(myrelid, newrelname, namespaceId); + + /* + * Close rel, but keep exclusive lock! + */ + relation_close(targetrelation, NoLock); } /* - * renamerel - change the name of a relation + * RenameRelationInternal - change the name of a relation * * XXX - When renaming sequences, we don't bother to modify the * sequence name that is stored within the sequence itself @@ -1528,44 +2052,30 @@ renameatt(Oid myrelid, * sequence, AFAIK there's no need for it to be there. */ void -renamerel(Oid myrelid, const char *newrelname) +RenameRelationInternal(Oid myrelid, const char *newrelname, Oid namespaceId) { Relation targetrelation; Relation relrelation; /* for RELATION relation */ HeapTuple reltup; - Oid namespaceId; - char *oldrelname; - char relkind; - bool relhastriggers; + Form_pg_class relform; /* - * Grab an exclusive lock on the target table or index, which we will NOT - * release until end of transaction. + * Grab an exclusive lock on the target table, index, sequence or + * view, which we will NOT release until end of transaction. */ targetrelation = relation_open(myrelid, AccessExclusiveLock); - oldrelname = pstrdup(RelationGetRelationName(targetrelation)); - namespaceId = RelationGetNamespace(targetrelation); - - if (!allowSystemTableMods && IsSystemRelation(targetrelation)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(targetrelation)))); - - relkind = targetrelation->rd_rel->relkind; - relhastriggers = (targetrelation->rd_rel->reltriggers > 0); - /* * Find relation's pg_class tuple, and make sure newrelname isn't in use. */ relrelation = heap_open(RelationRelationId, RowExclusiveLock); reltup = SearchSysCacheCopy(RELOID, - PointerGetDatum(myrelid), + ObjectIdGetDatum(myrelid), 0, 0, 0); if (!HeapTupleIsValid(reltup)) /* shouldn't happen */ elog(ERROR, "cache lookup failed for relation %u", myrelid); + relform = (Form_pg_class) GETSTRUCT(reltup); if (get_relname_relid(newrelname, namespaceId) != InvalidOid) ereport(ERROR, @@ -1577,7 +2087,7 @@ renamerel(Oid myrelid, const char *newrelname) * Update pg_class tuple with new relname. (Scribbling on reltup is OK * because it's a copy...) */ - namestrcpy(&(((Form_pg_class) GETSTRUCT(reltup))->relname), newrelname); + namestrcpy(&(relform->relname), newrelname); simple_heap_update(relrelation, &reltup->t_self, reltup); @@ -1590,24 +2100,19 @@ renamerel(Oid myrelid, const char *newrelname) /* * Also rename the associated type, if any. */ - if (relkind != RELKIND_INDEX) - TypeRename(oldrelname, namespaceId, newrelname); + if (OidIsValid(targetrelation->rd_rel->reltype)) + RenameTypeInternal(targetrelation->rd_rel->reltype, + newrelname, namespaceId); /* - * Update rel name in any RI triggers associated with the relation. + * Also rename the associated constraint, if any. */ - if (relhastriggers) + if (targetrelation->rd_rel->relkind == RELKIND_INDEX) { - /* update tgargs where relname is primary key */ - update_ri_trigger_args(myrelid, - oldrelname, - newrelname, - false, true); - /* update tgargs where relname is foreign key */ - update_ri_trigger_args(myrelid, - oldrelname, - newrelname, - true, true); + Oid constraintId = get_index_constraint(myrelid); + + if (OidIsValid(constraintId)) + RenameConstraintById(constraintId, newrelname); } /* @@ -1617,201 +2122,52 @@ renamerel(Oid myrelid, const char *newrelname) } /* - * Scan pg_trigger for RI triggers that are on the specified relation - * (if fk_scan is false) or have it as the tgconstrrel (if fk_scan - * is true). Update RI trigger args fields matching oldname to contain - * newname instead. If update_relname is true, examine the relname - * fields; otherwise examine the attname fields. + * Disallow ALTER TABLE (and similar commands) when the current backend has + * any open reference to the target table besides the one just acquired by + * the calling command; this implies there's an open cursor or active plan. + * We need this check because our AccessExclusiveLock doesn't protect us + * against stomping on our own foot, only other people's feet! + * + * For ALTER TABLE, the only case known to cause serious trouble is ALTER + * COLUMN TYPE, and some changes are obviously pretty benign, so this could + * possibly be relaxed to only error out for certain types of alterations. + * But the use-case for allowing any of these things is not obvious, so we + * won't work hard at it for now. + * + * We also reject these commands if there are any pending AFTER trigger events + * for the rel. This is certainly necessary for the rewriting variants of + * ALTER TABLE, because they don't preserve tuple TIDs and so the pending + * events would try to fetch the wrong tuples. It might be overly cautious + * in other cases, but again it seems better to err on the side of paranoia. + * + * REINDEX calls this with "rel" referencing the index to be rebuilt; here + * we are worried about active indexscans on the index. The trigger-event + * check can be skipped, since we are doing no damage to the parent table. + * + * The statement name (eg, "ALTER TABLE") is passed for use in error messages. */ -static void -update_ri_trigger_args(Oid relid, - const char *oldname, - const char *newname, - bool fk_scan, - bool update_relname) +void +CheckTableNotInUse(Relation rel, const char *stmt) { - Relation tgrel; - ScanKeyData skey[1]; - SysScanDesc trigscan; - HeapTuple tuple; - Datum values[Natts_pg_trigger]; - char nulls[Natts_pg_trigger]; - char replaces[Natts_pg_trigger]; - - tgrel = heap_open(TriggerRelationId, RowExclusiveLock); - if (fk_scan) - { - ScanKeyInit(&skey[0], - Anum_pg_trigger_tgconstrrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(relid)); - trigscan = systable_beginscan(tgrel, TriggerConstrRelidIndexId, - true, SnapshotNow, - 1, skey); - } - else - { - ScanKeyInit(&skey[0], - Anum_pg_trigger_tgrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(relid)); - trigscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, - true, SnapshotNow, - 1, skey); - } - - while ((tuple = systable_getnext(trigscan)) != NULL) - { - Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple); - bytea *val; - bytea *newtgargs; - bool isnull; - int tg_type; - bool examine_pk; - bool changed; - int tgnargs; - int i; - int newlen; - const char *arga[RI_MAX_ARGUMENTS]; - const char *argp; - - tg_type = RI_FKey_trigger_type(pg_trigger->tgfoid); - if (tg_type == RI_TRIGGER_NONE) - { - /* Not an RI trigger, forget it */ - continue; - } - - /* - * It is an RI trigger, so parse the tgargs bytea. - * - * NB: we assume the field will never be compressed or moved out of - * line; so does trigger.c ... - */ - tgnargs = pg_trigger->tgnargs; - val = (bytea *) - DatumGetPointer(fastgetattr(tuple, - Anum_pg_trigger_tgargs, - tgrel->rd_att, &isnull)); - if (isnull || tgnargs < RI_FIRST_ATTNAME_ARGNO || - tgnargs > RI_MAX_ARGUMENTS) - { - /* This probably shouldn't happen, but ignore busted triggers */ - continue; - } - argp = (const char *) VARDATA(val); - for (i = 0; i < tgnargs; i++) - { - arga[i] = argp; - argp += strlen(argp) + 1; - } - - /* - * Figure out which item(s) to look at. If the trigger is primary-key - * type and attached to my rel, I should look at the PK fields; if it - * is foreign-key type and attached to my rel, I should look at the FK - * fields. But the opposite rule holds when examining triggers found - * by tgconstrrel search. - */ - examine_pk = (tg_type == RI_TRIGGER_PK) == (!fk_scan); - - changed = false; - if (update_relname) - { - /* Change the relname if needed */ - i = examine_pk ? RI_PK_RELNAME_ARGNO : RI_FK_RELNAME_ARGNO; - if (strcmp(arga[i], oldname) == 0) - { - arga[i] = newname; - changed = true; - } - } - else - { - /* Change attname(s) if needed */ - i = examine_pk ? RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX : - RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_FK_IDX; - for (; i < tgnargs; i += 2) - { - if (strcmp(arga[i], oldname) == 0) - { - arga[i] = newname; - changed = true; - } - } - } - - if (!changed) - { - /* Don't need to update this tuple */ - continue; - } - - /* - * Construct modified tgargs bytea. - */ - newlen = VARHDRSZ; - for (i = 0; i < tgnargs; i++) - newlen += strlen(arga[i]) + 1; - newtgargs = (bytea *) palloc(newlen); - VARATT_SIZEP(newtgargs) = newlen; - newlen = VARHDRSZ; - for (i = 0; i < tgnargs; i++) - { - strcpy(((char *) newtgargs) + newlen, arga[i]); - newlen += strlen(arga[i]) + 1; - } + int expected_refcnt; - /* - * Build modified tuple. - */ - for (i = 0; i < Natts_pg_trigger; i++) - { - values[i] = (Datum) 0; - replaces[i] = ' '; - nulls[i] = ' '; - } - values[Anum_pg_trigger_tgargs - 1] = PointerGetDatum(newtgargs); - replaces[Anum_pg_trigger_tgargs - 1] = 'r'; - - tuple = heap_modifytuple(tuple, RelationGetDescr(tgrel), values, nulls, replaces); - - /* - * Update pg_trigger and its indexes - */ - simple_heap_update(tgrel, &tuple->t_self, tuple); - - CatalogUpdateIndexes(tgrel, tuple); - - /* - * Invalidate trigger's relation's relcache entry so that other - * backends (and this one too!) are sent SI message to make them - * rebuild relcache entries. (Ideally this should happen - * automatically...) - * - * We can skip this for triggers on relid itself, since that relcache - * flush will happen anyway due to the table or column rename. We - * just need to catch the far ends of RI relationships. - */ - pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple); - if (pg_trigger->tgrelid != relid) - CacheInvalidateRelcacheByRelid(pg_trigger->tgrelid); - - /* free up our scratch memory */ - pfree(newtgargs); - heap_freetuple(tuple); - } - - systable_endscan(trigscan); - - heap_close(tgrel, RowExclusiveLock); - - /* - * Increment cmd counter to make updates visible; this is needed in case - * the same tuple has to be updated again by next pass (can happen in case - * of a self-referential FK relationship). - */ - CommandCounterIncrement(); + expected_refcnt = rel->rd_isnailed ? 2 : 1; + if (rel->rd_refcnt != expected_refcnt) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + /* translator: first %s is a SQL command, eg ALTER TABLE */ + errmsg("cannot %s \"%s\" because " + "it is being used by active queries in this session", + stmt, RelationGetRelationName(rel)))); + + if (rel->rd_rel->relkind != RELKIND_INDEX && + AfterTriggerPendingOnRel(RelationGetRelid(rel))) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + /* translator: first %s is a SQL command, eg ALTER TABLE */ + errmsg("cannot %s \"%s\" because " + "it has pending trigger events", + stmt, RelationGetRelationName(rel)))); } /* @@ -1835,8 +2191,9 @@ update_ri_trigger_args(Oid relid, * expressions that need to be evaluated with respect to the old table * schema. * - * ATRewriteCatalogs performs phase 2 for each affected table (note that - * phases 2 and 3 do no explicit recursion, since phase 1 already did it). + * ATRewriteCatalogs performs phase 2 for each affected table. (Note that + * phases 2 and 3 normally do no explicit recursion, since phase 1 already + * did it --- although some subcommands have to recurse in phase 2 instead.) * Certain subcommands need to be performed before others to avoid * unnecessary conflicts; for example, DROP COLUMN should come before * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple @@ -1850,22 +2207,68 @@ update_ri_trigger_args(Oid relid, void AlterTable(AlterTableStmt *stmt) { - ATController(relation_openrv(stmt->relation, AccessExclusiveLock), - stmt->cmds, - interpretInhOption(stmt->relation->inhOpt)); + Relation rel = relation_openrv(stmt->relation, AccessExclusiveLock); + + CheckTableNotInUse(rel, "ALTER TABLE"); + + /* Check relation type against type specified in the ALTER command */ + switch (stmt->relkind) + { + case OBJECT_TABLE: + /* + * For mostly-historical reasons, we allow ALTER TABLE to apply + * to all relation types. + */ + break; + + case OBJECT_INDEX: + if (rel->rd_rel->relkind != RELKIND_INDEX) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not an index", + RelationGetRelationName(rel)))); + break; + + case OBJECT_SEQUENCE: + if (rel->rd_rel->relkind != RELKIND_SEQUENCE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a sequence", + RelationGetRelationName(rel)))); + break; + + case OBJECT_VIEW: + if (rel->rd_rel->relkind != RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a view", + RelationGetRelationName(rel)))); + break; + + default: + elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind); + } + + ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt)); } /* * AlterTableInternal * * ALTER TABLE with target specified by OID + * + * We do not reject if the relation is already open, because it's quite + * likely that one or more layers of caller have it open. That means it + * is unsafe to use this entry point for alterations that could break + * existing query plans. On the assumption it's not used for such, we + * don't have to reject pending AFTER triggers, either. */ void AlterTableInternal(Oid relid, List *cmds, bool recurse) { - ATController(relation_open(relid, AccessExclusiveLock), - cmds, - recurse); + Relation rel = relation_open(relid, AccessExclusiveLock); + + ATController(rel, cmds, recurse); } static void @@ -1931,6 +2334,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ATPrepAddColumn(wqueue, rel, recurse, cmd); pass = AT_PASS_ADD_COL; break; + case AT_AddColumnToView: /* add column via CREATE OR REPLACE VIEW */ + ATSimplePermissions(rel, true); + /* Performs own recursion */ + ATPrepAddColumn(wqueue, rel, recurse, cmd); + pass = AT_PASS_ADD_COL; + break; case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ /* @@ -1942,7 +2351,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ATSimplePermissions(rel, true); ATSimpleRecursion(wqueue, rel, cmd, recurse); /* No command-specific prep needed */ - pass = AT_PASS_ADD_CONSTR; + pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP; break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ ATSimplePermissions(rel, false); @@ -1984,27 +2393,18 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, break; case AT_AddConstraint: /* ADD CONSTRAINT */ ATSimplePermissions(rel, false); - - /* - * Currently we recurse only for CHECK constraints, never for - * foreign-key constraints. UNIQUE/PKEY constraints won't be seen - * here. - */ - if (IsA(cmd->def, Constraint)) - ATSimpleRecursion(wqueue, rel, cmd, recurse); - /* No command-specific prep needed */ + /* Recursion occurs during execution phase */ + /* No command-specific prep needed except saving recurse flag */ + if (recurse) + cmd->subtype = AT_AddConstraintRecurse; pass = AT_PASS_ADD_CONSTR; break; case AT_DropConstraint: /* DROP CONSTRAINT */ ATSimplePermissions(rel, false); - /* Performs own recursion */ - ATPrepDropConstraint(wqueue, rel, recurse, cmd); - pass = AT_PASS_DROP; - break; - case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */ - ATSimplePermissions(rel, false); - ATSimpleRecursion(wqueue, rel, cmd, recurse); - /* No command-specific prep needed */ + /* Recursion occurs during execution phase */ + /* No command-specific prep needed except saving recurse flag */ + if (recurse) + cmd->subtype = AT_DropConstraintRecurse; pass = AT_PASS_DROP; break; case AT_AlterColumnType: /* ALTER COLUMN TYPE */ @@ -2013,12 +2413,6 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd); pass = AT_PASS_ALTER_TYPE; break; - case AT_ToastTable: /* CREATE TOAST TABLE */ - ATSimplePermissions(rel, false); - /* This command never recurses */ - /* No command-specific prep needed */ - pass = AT_PASS_MISC; - break; case AT_ChangeOwner: /* ALTER OWNER */ /* This command never recurses */ /* No command-specific prep needed */ @@ -2046,16 +2440,32 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, pass = AT_PASS_DROP; break; case AT_SetTableSpace: /* SET TABLESPACE */ + ATSimplePermissionsRelationOrIndex(rel); /* This command never recurses */ ATPrepSetTableSpace(tab, rel, cmd->name); pass = AT_PASS_MISC; /* doesn't actually matter */ break; + case AT_SetRelOptions: /* SET (...) */ + case AT_ResetRelOptions: /* RESET (...) */ + ATSimplePermissionsRelationOrIndex(rel); + /* This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; case AT_EnableTrig: /* ENABLE TRIGGER variants */ + case AT_EnableAlwaysTrig: + case AT_EnableReplicaTrig: case AT_EnableTrigAll: case AT_EnableTrigUser: case AT_DisableTrig: /* DISABLE TRIGGER variants */ case AT_DisableTrigAll: case AT_DisableTrigUser: + case AT_EnableRule: /* ENABLE/DISABLE RULE variants */ + case AT_EnableAlwaysRule: + case AT_EnableReplicaRule: + case AT_DisableRule: + case AT_AddInherit: /* INHERIT / NO INHERIT */ + case AT_DropInherit: ATSimplePermissions(rel, false); /* These commands never recurse */ /* No command-specific prep needed */ @@ -2111,7 +2521,7 @@ ATRewriteCatalogs(List **wqueue) rel = relation_open(tab->relid, NoLock); foreach(lcmd, subcmds) - ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd)); + ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd)); /* * After the ALTER TYPE pass, do cleanup work (this is not done in @@ -2126,8 +2536,8 @@ ATRewriteCatalogs(List **wqueue) } /* - * Do an implicit CREATE TOAST TABLE if we executed any subcommands that - * might have added a column or changed column storage. + * Check to see if a toast table must be added, if we executed any + * subcommands that might have added a column or changed column storage. */ foreach(ltab, *wqueue) { @@ -2137,7 +2547,7 @@ ATRewriteCatalogs(List **wqueue) (tab->subcmds[AT_PASS_ADD_COL] || tab->subcmds[AT_PASS_ALTER_TYPE] || tab->subcmds[AT_PASS_COL_ATTRS])) - AlterTableCreateToastTable(tab->relid, true); + AlterTableCreateToastTable(tab->relid); } } @@ -2145,11 +2555,13 @@ ATRewriteCatalogs(List **wqueue) * ATExecCmd: dispatch a subcommand to appropriate execution routine */ static void -ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd) +ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, + AlterTableCmd *cmd) { switch (cmd->subtype) { case AT_AddColumn: /* ADD COLUMN */ + case AT_AddColumnToView: /* add column via CREATE OR REPLACE VIEW */ ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def); break; case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ @@ -2180,20 +2592,20 @@ ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd) ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true); break; case AT_AddConstraint: /* ADD CONSTRAINT */ - ATExecAddConstraint(tab, rel, cmd->def); + ATExecAddConstraint(wqueue, tab, rel, cmd->def, false); + break; + case AT_AddConstraintRecurse: /* ADD CONSTRAINT with recursion */ + ATExecAddConstraint(wqueue, tab, rel, cmd->def, true); break; case AT_DropConstraint: /* DROP CONSTRAINT */ - ATExecDropConstraint(rel, cmd->name, cmd->behavior, false); + ATExecDropConstraint(rel, cmd->name, cmd->behavior, false, false); break; - case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */ - ATExecDropConstraint(rel, cmd->name, cmd->behavior, true); + case AT_DropConstraintRecurse: /* DROP CONSTRAINT with recursion */ + ATExecDropConstraint(rel, cmd->name, cmd->behavior, true, false); break; case AT_AlterColumnType: /* ALTER COLUMN TYPE */ ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def); break; - case AT_ToastTable: /* CREATE TOAST TABLE */ - AlterTableCreateToastTable(RelationGetRelid(rel), false); - break; case AT_ChangeOwner: /* ALTER OWNER */ ATExecChangeOwner(RelationGetRelid(rel), get_roleid_checked(cmd->name), @@ -2218,23 +2630,68 @@ ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd) * Nothing to do here; Phase 3 does the work */ break; + case AT_SetRelOptions: /* SET (...) */ + ATExecSetRelOptions(rel, (List *) cmd->def, false); + break; + case AT_ResetRelOptions: /* RESET (...) */ + ATExecSetRelOptions(rel, (List *) cmd->def, true); + break; + case AT_EnableTrig: /* ENABLE TRIGGER name */ - ATExecEnableDisableTrigger(rel, cmd->name, true, false); + ATExecEnableDisableTrigger(rel, cmd->name, + TRIGGER_FIRES_ON_ORIGIN, false); + break; + case AT_EnableAlwaysTrig: /* ENABLE ALWAYS TRIGGER name */ + ATExecEnableDisableTrigger(rel, cmd->name, + TRIGGER_FIRES_ALWAYS, false); + break; + case AT_EnableReplicaTrig: /* ENABLE REPLICA TRIGGER name */ + ATExecEnableDisableTrigger(rel, cmd->name, + TRIGGER_FIRES_ON_REPLICA, false); break; case AT_DisableTrig: /* DISABLE TRIGGER name */ - ATExecEnableDisableTrigger(rel, cmd->name, false, false); + ATExecEnableDisableTrigger(rel, cmd->name, + TRIGGER_DISABLED, false); break; case AT_EnableTrigAll: /* ENABLE TRIGGER ALL */ - ATExecEnableDisableTrigger(rel, NULL, true, false); + ATExecEnableDisableTrigger(rel, NULL, + TRIGGER_FIRES_ON_ORIGIN, false); break; case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */ - ATExecEnableDisableTrigger(rel, NULL, false, false); + ATExecEnableDisableTrigger(rel, NULL, + TRIGGER_DISABLED, false); break; case AT_EnableTrigUser: /* ENABLE TRIGGER USER */ - ATExecEnableDisableTrigger(rel, NULL, true, true); + ATExecEnableDisableTrigger(rel, NULL, + TRIGGER_FIRES_ON_ORIGIN, true); break; case AT_DisableTrigUser: /* DISABLE TRIGGER USER */ - ATExecEnableDisableTrigger(rel, NULL, false, true); + ATExecEnableDisableTrigger(rel, NULL, + TRIGGER_DISABLED, true); + break; + + case AT_EnableRule: /* ENABLE RULE name */ + ATExecEnableDisableRule(rel, cmd->name, + RULE_FIRES_ON_ORIGIN); + break; + case AT_EnableAlwaysRule: /* ENABLE ALWAYS RULE name */ + ATExecEnableDisableRule(rel, cmd->name, + RULE_FIRES_ALWAYS); + break; + case AT_EnableReplicaRule: /* ENABLE REPLICA RULE name */ + ATExecEnableDisableRule(rel, cmd->name, + RULE_FIRES_ON_REPLICA); + break; + case AT_DisableRule: /* DISABLE RULE name */ + ATExecEnableDisableRule(rel, cmd->name, + RULE_DISABLED); + break; + + case AT_AddInherit: + ATExecAddInherit(rel, (RangeVar *) cmd->def); + break; + case AT_DropInherit: + ATExecDropInherit(rel, (RangeVar *) cmd->def); break; default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", @@ -2328,8 +2785,13 @@ ATRewriteTables(List **wqueue) */ ATRewriteTable(tab, OIDNewHeap); - /* Swap the physical files of the old and new heaps. */ - swap_relation_files(tab->relid, OIDNewHeap); + /* + * Swap the physical files of the old and new heaps. Since we are + * generating a new heap, we can use RecentXmin for the table's + * new relfrozenxid because we rewrote all the tuples on + * ATRewriteTable, so no older Xid remains on the table. + */ + swap_relation_files(tab->relid, OIDNewHeap, RecentXmin); CommandCounterIncrement(); @@ -2358,7 +2820,7 @@ ATRewriteTables(List **wqueue) * Test the current data within the table against new constraints * generated by ALTER TABLE commands, but don't rebuild data. */ - if (tab->constraints != NIL) + if (tab->constraints != NIL || tab->new_notnull) ATRewriteTable(tab, InvalidOid); /* @@ -2400,7 +2862,8 @@ ATRewriteTables(List **wqueue) refrel = heap_open(con->refrelid, RowShareLock); - validateForeignKeyConstraint(fkconstraint, rel, refrel); + validateForeignKeyConstraint(fkconstraint, rel, refrel, + con->conid); heap_close(refrel, NoLock); } @@ -2424,6 +2887,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) TupleDesc oldTupDesc; TupleDesc newTupDesc; bool needscan = false; + List *notnull_attrs; int i; ListCell *l; EState *estate; @@ -2451,7 +2915,8 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) */ if (newrel) find_composite_type_dependencies(oldrel->rd_rel->reltype, - RelationGetRelationName(oldrel)); + RelationGetRelationName(oldrel), + NULL); /* * Generate the constraint and default execution states @@ -2474,9 +2939,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) case CONSTR_FOREIGN: /* Nothing to do here */ break; - case CONSTR_NOTNULL: - needscan = true; - break; default: elog(ERROR, "unrecognized constraint type: %d", (int) con->contype); @@ -2492,6 +2954,25 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate); } + notnull_attrs = NIL; + if (newrel || tab->new_notnull) + { + /* + * If we are rebuilding the tuples OR if we added any new NOT NULL + * constraints, check all not-null constraints. This is a bit of + * overkill but it minimizes risk of bugs, and heap_attisnull is a + * pretty cheap test anyway. + */ + for (i = 0; i < newTupDesc->natts; i++) + { + if (newTupDesc->attrs[i]->attnotnull && + !newTupDesc->attrs[i]->attisdropped) + notnull_attrs = lappend_int(notnull_attrs, i); + } + if (notnull_attrs) + needscan = true; + } + if (needscan) { ExprContext *econtext; @@ -2592,6 +3073,17 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) ExecStoreTuple(tuple, newslot, InvalidBuffer, false); econtext->ecxt_scantuple = newslot; + foreach(l, notnull_attrs) + { + int attn = lfirst_int(l); + + if (heap_attisnull(tuple, attn + 1)) + ereport(ERROR, + (errcode(ERRCODE_NOT_NULL_VIOLATION), + errmsg("column \"%s\" contains null values", + NameStr(newTupDesc->attrs[attn]->attname)))); + } + foreach(l, tab->constraints) { NewConstraint *con = lfirst(l); @@ -2605,21 +3097,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) errmsg("check constraint \"%s\" is violated by some row", con->name))); break; - case CONSTR_NOTNULL: - { - Datum d; - bool isnull; - - d = heap_getattr(tuple, con->attnum, newTupDesc, - &isnull); - if (isnull) - ereport(ERROR, - (errcode(ERRCODE_NOT_NULL_VIOLATION), - errmsg("column \"%s\" contains null values", - get_attname(tab->relid, - con->attnum)))); - } - break; case CONSTR_FOREIGN: /* Nothing to do here */ break; @@ -2640,6 +3117,9 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) MemoryContextSwitchTo(oldCxt); heap_endscan(scan); + + ExecDropSingleTupleTableSlot(oldslot); + ExecDropSingleTupleTableSlot(newslot); } FreeExecutorState(estate); @@ -2719,6 +3199,35 @@ ATSimplePermissions(Relation rel, bool allowView) RelationGetRelationName(rel)))); } +/* + * ATSimplePermissionsRelationOrIndex + * + * - Ensure that it is a relation or an index + * - Ensure this user is the owner + * - Ensure that it is not a system table + */ +static void +ATSimplePermissionsRelationOrIndex(Relation rel) +{ + if (rel->rd_rel->relkind != RELKIND_RELATION && + rel->rd_rel->relkind != RELKIND_INDEX) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table or index", + RelationGetRelationName(rel)))); + + /* Permissions checks */ + if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + RelationGetRelationName(rel)); + + if (!allowSystemTableMods && IsSystemRelation(rel)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied: \"%s\" is a system catalog", + RelationGetRelationName(rel)))); +} + /* * ATSimpleRecursion * @@ -2757,6 +3266,7 @@ ATSimpleRecursion(List **wqueue, Relation rel, if (childrelid == relid) continue; childrel = relation_open(childrelid, AccessExclusiveLock); + CheckTableNotInUse(childrel, "ALTER TABLE"); ATPrepCmd(wqueue, childrel, cmd, false, true); relation_close(childrel, NoLock); } @@ -2788,6 +3298,7 @@ ATOneLevelRecursion(List **wqueue, Relation rel, Relation childrel; childrel = relation_open(childrelid, AccessExclusiveLock); + CheckTableNotInUse(childrel, "ALTER TABLE"); ATPrepCmd(wqueue, childrel, cmd, true, true); relation_close(childrel, NoLock); } @@ -2797,21 +3308,27 @@ ATOneLevelRecursion(List **wqueue, Relation rel, /* * find_composite_type_dependencies * - * Check to see if a table's rowtype is being used as a column in some + * Check to see if a composite type is being used as a column in some * other table (possibly nested several levels deep in composite types!). * Eventually, we'd like to propagate the check or rewrite operation * into other such tables, but for now, just error out if we find any. * + * Caller should provide either a table name or a type name (not both) to + * report in the error message, if any. + * * We assume that functions and views depending on the type are not reasons * to reject the ALTER. (How safe is this really?) */ -static void -find_composite_type_dependencies(Oid typeOid, const char *origTblName) +void +find_composite_type_dependencies(Oid typeOid, + const char *origTblName, + const char *origTypeName) { Relation depRel; ScanKeyData key[2]; SysScanDesc depScan; HeapTuple depTup; + Oid arrayOid; /* * We scan pg_depend to find those things that depend on the rowtype. (We @@ -2848,12 +3365,20 @@ find_composite_type_dependencies(Oid typeOid, const char *origTblName) if (rel->rd_rel->relkind == RELKIND_RELATION) { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype", - origTblName, - RelationGetRelationName(rel), - NameStr(att->attname)))); + if (origTblName) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype", + origTblName, + RelationGetRelationName(rel), + NameStr(att->attname)))); + else + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter type \"%s\" because column \"%s\".\"%s\" uses it", + origTypeName, + RelationGetRelationName(rel), + NameStr(att->attname)))); } else if (OidIsValid(rel->rd_rel->reltype)) { @@ -2862,7 +3387,7 @@ find_composite_type_dependencies(Oid typeOid, const char *origTblName) * recursively check for indirect dependencies via its rowtype. */ find_composite_type_dependencies(rel->rd_rel->reltype, - origTblName); + origTblName, origTypeName); } relation_close(rel, AccessShareLock); @@ -2871,6 +3396,14 @@ find_composite_type_dependencies(Oid typeOid, const char *origTblName) systable_endscan(depScan); relation_close(depRel, AccessShareLock); + + /* + * If there's an array type for the rowtype, must check for uses of it, + * too. + */ + arrayOid = get_array_type(typeOid); + if (OidIsValid(arrayOid)) + find_composite_type_dependencies(arrayOid, origTblName, origTypeName); } @@ -2879,7 +3412,7 @@ find_composite_type_dependencies(Oid typeOid, const char *origTblName) * * Adds an additional attribute to a relation making the assumption that * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the - * AT_AddColumn AlterTableCmd by analyze.c and added as independent + * AT_AddColumn AlterTableCmd by parse_utilcmd.c and added as independent * AlterTableCmd's. */ static void @@ -2901,8 +3434,6 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, /* Child should see column as singly inherited */ colDefChild->inhcount = 1; colDefChild->is_local = false; - /* and don't make a support dependency on the child */ - colDefChild->support = NULL; ATOneLevelRecursion(wqueue, rel, childCmd); } @@ -2927,14 +3458,14 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, Relation pgclass, attrdesc; HeapTuple reltup; - HeapTuple attributeTuple; - Form_pg_attribute attribute; - FormData_pg_attribute attributeD; + FormData_pg_attribute attribute; int i; int minattnum, maxatts; + char relkind; HeapTuple typeTuple; Oid typeOid; + int32 typmod; Form_pg_type tform; Expr *defval; @@ -2953,10 +3484,13 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, if (HeapTupleIsValid(tuple)) { Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple); + Oid ctypeId; + int32 ctypmod; /* Okay if child matches by type */ - if (typenameTypeId(colDef->typename) != childatt->atttypid || - colDef->typename->typmod != childatt->atttypmod) + ctypeId = typenameTypeId(NULL, colDef->typename, &ctypmod); + if (ctypeId != childatt->atttypid || + ctypmod != childatt->atttypmod) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("child table \"%s\" has different type for column \"%s\"", @@ -3001,6 +3535,7 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, colDef->colname, RelationGetRelationName(rel)))); minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts; + relkind = ((Form_pg_class) GETSTRUCT(reltup))->relkind; maxatts = minattnum + 1; if (maxatts > MaxHeapAttributeNumber) ereport(ERROR, @@ -3009,44 +3544,34 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, MaxHeapAttributeNumber))); i = minattnum + 1; - typeTuple = typenameType(colDef->typename); + typeTuple = typenameType(NULL, colDef->typename, &typmod); tform = (Form_pg_type) GETSTRUCT(typeTuple); typeOid = HeapTupleGetOid(typeTuple); /* make sure datatype is legal for a column */ CheckAttributeType(colDef->colname, typeOid); - attributeTuple = heap_addheader(Natts_pg_attribute, - false, - ATTRIBUTE_TUPLE_SIZE, - (void *) &attributeD); - - attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple); - - attribute->attrelid = myrelid; - namestrcpy(&(attribute->attname), colDef->colname); - attribute->atttypid = typeOid; - attribute->attstattarget = -1; - attribute->attlen = tform->typlen; - attribute->attcacheoff = -1; - attribute->atttypmod = colDef->typename->typmod; - attribute->attnum = i; - attribute->attbyval = tform->typbyval; - attribute->attndims = list_length(colDef->typename->arrayBounds); - attribute->attstorage = tform->typstorage; - attribute->attalign = tform->typalign; - attribute->attnotnull = colDef->is_not_null; - attribute->atthasdef = false; - attribute->attisdropped = false; - attribute->attislocal = colDef->is_local; - attribute->attinhcount = colDef->inhcount; + attribute.attrelid = myrelid; + namestrcpy(&(attribute.attname), colDef->colname); + attribute.atttypid = typeOid; + attribute.attstattarget = -1; + attribute.attlen = tform->typlen; + attribute.attcacheoff = -1; + attribute.atttypmod = typmod; + attribute.attnum = i; + attribute.attbyval = tform->typbyval; + attribute.attndims = list_length(colDef->typename->arrayBounds); + attribute.attstorage = tform->typstorage; + attribute.attalign = tform->typalign; + attribute.attnotnull = colDef->is_not_null; + attribute.atthasdef = false; + attribute.attisdropped = false; + attribute.attislocal = colDef->is_local; + attribute.attinhcount = colDef->inhcount; ReleaseSysCache(typeTuple); - simple_heap_insert(attrdesc, attributeTuple); - - /* Update indexes on pg_attribute */ - CatalogUpdateIndexes(attrdesc, attributeTuple); + InsertPgAttributeTuple(attrdesc, &attribute, NULL); heap_close(attrdesc, RowExclusiveLock); @@ -3075,14 +3600,14 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, RawColumnDefault *rawEnt; rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault)); - rawEnt->attnum = attribute->attnum; + rawEnt->attnum = attribute.attnum; rawEnt->raw_default = copyObject(colDef->raw_default); /* * This function is intended for CREATE TABLE, so it processes a * _list_ of defaults, but we just do one. */ - AddRelationRawConstraints(rel, list_make1(rawEnt), NIL); + AddRelationNewConstraints(rel, list_make1(rawEnt), NIL, false, true); /* Make the additional catalog changes visible */ CommandCounterIncrement(); @@ -3107,44 +3632,55 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, * the constraints more directly.) * * Note: we use build_column_default, and not just the cooked default - * returned by AddRelationRawConstraints, so that the right thing happens + * returned by AddRelationNewConstraints, so that the right thing happens * when a datatype's default applies. + * + * We skip this logic completely for views. */ - defval = (Expr *) build_column_default(rel, attribute->attnum); + if (relkind != RELKIND_VIEW) { + defval = (Expr *) build_column_default(rel, attribute.attnum); - if (!defval && GetDomainConstraints(typeOid) != NIL) - { - Oid basetype = getBaseType(typeOid); - - defval = (Expr *) makeNullConst(basetype); - defval = (Expr *) coerce_to_target_type(NULL, - (Node *) defval, - basetype, - typeOid, - colDef->typename->typmod, - COERCION_ASSIGNMENT, - COERCE_IMPLICIT_CAST); - if (defval == NULL) /* should not happen */ - elog(ERROR, "failed to coerce base type to domain"); - } + if (!defval && GetDomainConstraints(typeOid) != NIL) + { + Oid baseTypeId; + int32 baseTypeMod; + + baseTypeMod = typmod; + baseTypeId = getBaseTypeAndTypmod(typeOid, &baseTypeMod); + defval = (Expr *) makeNullConst(baseTypeId, baseTypeMod); + defval = (Expr *) coerce_to_target_type(NULL, + (Node *) defval, + baseTypeId, + typeOid, + typmod, + COERCION_ASSIGNMENT, + COERCE_IMPLICIT_CAST, + -1); + if (defval == NULL) /* should not happen */ + elog(ERROR, "failed to coerce base type to domain"); + } - if (defval) - { - NewColumnValue *newval; + if (defval) + { + NewColumnValue *newval; - newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue)); - newval->attnum = attribute->attnum; - newval->expr = defval; + newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue)); + newval->attnum = attribute.attnum; + newval->expr = defval; + + tab->newvals = lappend(tab->newvals, newval); + } - tab->newvals = lappend(tab->newvals, newval); + /* + * If the new column is NOT NULL, tell Phase 3 it needs to test that. + */ + tab->new_notnull |= colDef->is_not_null; } /* * Add needed dependency entries for the new column. */ - add_column_datatype_dependency(myrelid, i, attribute->atttypid); - if (colDef->support != NULL) - add_column_support_dependency(myrelid, i, colDef->support); + add_column_datatype_dependency(myrelid, i, attribute.atttypid); } /* @@ -3165,24 +3701,6 @@ add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid) recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); } -/* - * Install a dependency for a column's supporting relation (serial sequence). - */ -static void -add_column_support_dependency(Oid relid, int32 attnum, RangeVar *support) -{ - ObjectAddress colobject, - suppobject; - - colobject.classId = RelationRelationId; - colobject.objectId = relid; - colobject.objectSubId = attnum; - suppobject.classId = RelationRelationId; - suppobject.objectId = RangeVarGetRelid(support, false); - suppobject.objectSubId = 0; - recordDependencyOn(&suppobject, &colobject, DEPENDENCY_INTERNAL); -} - /* * ALTER TABLE ALTER COLUMN DROP NOT NULL */ @@ -3286,7 +3804,6 @@ ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, HeapTuple tuple; AttrNumber attnum; Relation attr_rel; - NewConstraint *newcon; /* * lookup the attribute @@ -3322,13 +3839,8 @@ ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, /* keep the system catalog indexes current */ CatalogUpdateIndexes(attr_rel, tuple); - /* Tell Phase 3 to test the constraint */ - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->contype = CONSTR_NOTNULL; - newcon->attnum = attnum; - newcon->name = "NOT NULL"; - - tab->constraints = lappend(tab->constraints, newcon); + /* Tell Phase 3 it needs to test the constraint */ + tab->new_notnull = true; } heap_close(attr_rel, RowExclusiveLock); @@ -3380,7 +3892,7 @@ ATExecColumnDefault(Relation rel, const char *colName, * This function is intended for CREATE TABLE, so it processes a * _list_ of defaults, but we just do one. */ - AddRelationRawConstraints(rel, list_make1(rawEnt), NIL); + AddRelationNewConstraints(rel, list_make1(rawEnt), NIL, false, true); } } @@ -3430,9 +3942,9 @@ ATExecSetStatistics(Relation rel, const char *colName, Node *newValue) errmsg("statistics target %d is too low", newtarget))); } - else if (newtarget > 1000) + else if (newtarget > 10000) { - newtarget = 1000; + newtarget = 10000; ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("lowering statistics target to %d", @@ -3615,6 +4127,7 @@ ATExecDropColumn(Relation rel, const char *colName, Form_pg_attribute childatt; childrel = heap_open(childrelid, AccessExclusiveLock); + CheckTableNotInUse(childrel, "ALTER TABLE"); tuple = SearchSysCacheCopyAttName(childrelid, colName); if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ @@ -3656,7 +4169,7 @@ ATExecDropColumn(Relation rel, const char *colName, { /* * If we were told to drop ONLY in this table (no recursion), - * we need to mark the inheritors' attribute as locally + * we need to mark the inheritors' attributes as locally * defined rather than inherited. */ childatt->attinhcount--; @@ -3718,9 +4231,9 @@ ATExecDropColumn(Relation rel, const char *colName, /* * ALTER TABLE ADD INDEX * - * There is no such command in the grammar, but the parser converts UNIQUE - * and PRIMARY KEY constraints into AT_AddIndex subcommands. This lets us - * schedule creation of the index at the appropriate time during ALTER. + * There is no such command in the grammar, but parse_utilcmd.c converts + * UNIQUE and PRIMARY KEY constraints into AT_AddIndex subcommands. This lets + * us schedule creation of the index at the appropriate time during ALTER. */ static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel, @@ -3739,6 +4252,8 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, /* suppress notices when rebuilding existing index */ quiet = is_rebuild; + /* The IndexStmt has already been through transformIndexStmt */ + DefineIndex(stmt->relation, /* relation */ stmt->idxname, /* index name */ InvalidOid, /* no predefined OID */ @@ -3746,21 +4261,23 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, stmt->tableSpace, stmt->indexParams, /* parameters */ (Expr *) stmt->whereClause, - stmt->rangetable, + stmt->options, stmt->unique, stmt->primary, stmt->isconstraint, true, /* is_alter_table */ check_rights, skip_build, - quiet); + quiet, + false); } /* * ALTER TABLE ADD CONSTRAINT */ static void -ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint) +ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, + Node *newConstraint, bool recurse) { switch (nodeTag(newConstraint)) { @@ -3771,41 +4288,15 @@ ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint) /* * Currently, we only expect to see CONSTR_CHECK nodes * arriving here (see the preprocessing done in - * parser/analyze.c). Use a switch anyway to make it easier - * to add more code later. + * parse_utilcmd.c). Use a switch anyway to make it easier to + * add more code later. */ switch (constr->contype) { case CONSTR_CHECK: - { - List *newcons; - ListCell *lcon; - - /* - * Call AddRelationRawConstraints to do the work. - * It returns a list of cooked constraints. - */ - newcons = AddRelationRawConstraints(rel, NIL, - list_make1(constr)); - /* Add each constraint to Phase 3's queue */ - foreach(lcon, newcons) - { - CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); - NewConstraint *newcon; - - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = ccon->name; - newcon->contype = ccon->contype; - newcon->attnum = ccon->attnum; - /* ExecQual wants implicit-AND format */ - newcon->qual = (Node *) - make_ands_implicit((Expr *) ccon->expr); - - tab->constraints = lappend(tab->constraints, - newcon); - } - break; - } + ATAddCheckConstraint(wqueue, tab, rel, + constr, recurse, false); + break; default: elog(ERROR, "unrecognized constraint type: %d", (int) constr->contype); @@ -3817,6 +4308,9 @@ ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint) FkConstraint *fkconstraint = (FkConstraint *) newConstraint; /* + * Note that we currently never recurse for FK constraints, + * so the "recurse" flag is silently ignored. + * * Assign or validate constraint name */ if (fkconstraint->constr_name) @@ -3849,6 +4343,107 @@ ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint) } } +/* + * Add a check constraint to a single table and its children + * + * Subroutine for ATExecAddConstraint. + * + * We must recurse to child tables during execution, rather than using + * ALTER TABLE's normal prep-time recursion. The reason is that all the + * constraints *must* be given the same name, else they won't be seen as + * related later. If the user didn't explicitly specify a name, then + * AddRelationNewConstraints would normally assign different names to the + * child constraints. To fix that, we must capture the name assigned at + * the parent table and pass that down. + */ +static void +ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, + Constraint *constr, bool recurse, bool recursing) +{ + List *newcons; + ListCell *lcon; + List *children; + ListCell *child; + + /* At top level, permission check was done in ATPrepCmd, else do it */ + if (recursing) + ATSimplePermissions(rel, false); + + /* + * Call AddRelationNewConstraints to do the work, making sure it works on + * a copy of the Constraint so transformExpr can't modify the original. + * It returns a list of cooked constraints. + * + * If the constraint ends up getting merged with a pre-existing one, it's + * omitted from the returned list, which is what we want: we do not need + * to do any validation work. That can only happen at child tables, + * though, since we disallow merging at the top level. + */ + newcons = AddRelationNewConstraints(rel, NIL, + list_make1(copyObject(constr)), + recursing, !recursing); + + /* Add each constraint to Phase 3's queue */ + foreach(lcon, newcons) + { + CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); + NewConstraint *newcon; + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = ccon->name; + newcon->contype = ccon->contype; + /* ExecQual wants implicit-AND format */ + newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr); + + tab->constraints = lappend(tab->constraints, newcon); + + /* Save the actually assigned name if it was defaulted */ + if (constr->name == NULL) + constr->name = ccon->name; + } + + /* At this point we must have a locked-down name to use */ + Assert(constr->name != NULL); + + /* Advance command counter in case same table is visited multiple times */ + CommandCounterIncrement(); + + /* + * Propagate to children as appropriate. Unlike most other ALTER + * routines, we have to do this one level of recursion at a time; we can't + * use find_all_inheritors to do it in one pass. + */ + children = find_inheritance_children(RelationGetRelid(rel)); + + /* + * If we are told not to recurse, there had better not be any child + * tables; else the addition would put them out of step. + */ + if (children && !recurse) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("constraint must be added to child tables too"))); + + foreach(child, children) + { + Oid childrelid = lfirst_oid(child); + Relation childrel; + AlteredTableInfo *childtab; + + childrel = heap_open(childrelid, AccessExclusiveLock); + CheckTableNotInUse(childrel, "ALTER TABLE"); + + /* Find or create work queue entry for this table */ + childtab = ATGetQueueEntry(wqueue, childrel); + + /* Recurse to child */ + ATAddCheckConstraint(wqueue, childtab, childrel, + constr, recurse, true); + + heap_close(childrel, NoLock); + } +} + /* * Add a foreign-key constraint to a single table * @@ -3867,6 +4462,9 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, Oid pktypoid[INDEX_MAX_KEYS]; Oid fktypoid[INDEX_MAX_KEYS]; Oid opclasses[INDEX_MAX_KEYS]; + Oid pfeqoperators[INDEX_MAX_KEYS]; + Oid ppeqoperators[INDEX_MAX_KEYS]; + Oid ffeqoperators[INDEX_MAX_KEYS]; int i; int numfks, numpks; @@ -3944,6 +4542,9 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, MemSet(pktypoid, 0, sizeof(pktypoid)); MemSet(fktypoid, 0, sizeof(fktypoid)); MemSet(opclasses, 0, sizeof(opclasses)); + MemSet(pfeqoperators, 0, sizeof(pfeqoperators)); + MemSet(ppeqoperators, 0, sizeof(ppeqoperators)); + MemSet(ffeqoperators, 0, sizeof(ffeqoperators)); numfks = transformColumnNameList(RelationGetRelid(rel), fkconstraint->fk_attrs, @@ -3972,7 +4573,14 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, opclasses); } - /* Be sure referencing and referenced column types are comparable */ + /* + * Look up the equality operators to use in the constraint. + * + * Note that we have to be careful about the difference between the actual + * PK column type and the opclass' declared input type, which might be + * only binary-compatible with it. The declared opcintype is the right + * thing to probe pg_amop with. + */ if (numfks != numpks) ereport(ERROR, (errcode(ERRCODE_INVALID_FOREIGN_KEY), @@ -3980,23 +4588,93 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, for (i = 0; i < numpks; i++) { + Oid pktype = pktypoid[i]; + Oid fktype = fktypoid[i]; + Oid fktyped; + HeapTuple cla_ht; + Form_pg_opclass cla_tup; + Oid amid; + Oid opfamily; + Oid opcintype; + Oid pfeqop; + Oid ppeqop; + Oid ffeqop; + int16 eqstrategy; + + /* We need several fields out of the pg_opclass entry */ + cla_ht = SearchSysCache(CLAOID, + ObjectIdGetDatum(opclasses[i]), + 0, 0, 0); + if (!HeapTupleIsValid(cla_ht)) + elog(ERROR, "cache lookup failed for opclass %u", opclasses[i]); + cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht); + amid = cla_tup->opcmethod; + opfamily = cla_tup->opcfamily; + opcintype = cla_tup->opcintype; + ReleaseSysCache(cla_ht); + /* - * pktypoid[i] is the primary key table's i'th key's type fktypoid[i] - * is the foreign key table's i'th key's type - * - * Note that we look for an operator with the PK type on the left; - * when the types are different this is critical because the PK index - * will need operators with the indexkey on the left. (Ordinarily both - * commutator operators will exist if either does, but we won't get - * the right answer from the test below on opclass membership unless - * we select the proper operator.) + * Check it's a btree; currently this can never fail since no other + * index AMs support unique indexes. If we ever did have other types + * of unique indexes, we'd need a way to determine which operator + * strategy number is equality. (Is it reasonable to insist that + * every such index AM use btree's number for equality?) + */ + if (amid != BTREE_AM_OID) + elog(ERROR, "only b-tree indexes are supported for foreign keys"); + eqstrategy = BTEqualStrategyNumber; + + /* + * There had better be a primary equality operator for the index. + * We'll use it for PK = PK comparisons. + */ + ppeqop = get_opfamily_member(opfamily, opcintype, opcintype, + eqstrategy); + + if (!OidIsValid(ppeqop)) + elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", + eqstrategy, opcintype, opcintype, opfamily); + + /* + * Are there equality operators that take exactly the FK type? Assume + * we should look through any domain here. */ - Operator o = oper(list_make1(makeString("=")), - pktypoid[i], fktypoid[i], true); + fktyped = getBaseType(fktype); + + pfeqop = get_opfamily_member(opfamily, opcintype, fktyped, + eqstrategy); + if (OidIsValid(pfeqop)) + ffeqop = get_opfamily_member(opfamily, fktyped, fktyped, + eqstrategy); + else + ffeqop = InvalidOid; /* keep compiler quiet */ + + if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) + { + /* + * Otherwise, look for an implicit cast from the FK type to the + * opcintype, and if found, use the primary equality operator. + * This is a bit tricky because opcintype might be a polymorphic + * type such as ANYARRAY or ANYENUM; so what we have to test is + * whether the two actual column types can be concurrently cast to + * that type. (Otherwise, we'd fail to reject combinations such + * as int[] and point[].) + */ + Oid input_typeids[2]; + Oid target_typeids[2]; + + input_typeids[0] = pktype; + input_typeids[1] = fktype; + target_typeids[0] = opcintype; + target_typeids[1] = opcintype; + if (can_coerce_type(2, input_typeids, target_typeids, + COERCION_IMPLICIT)) + pfeqop = ffeqop = ppeqop; + } - if (o == NULL) + if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_FUNCTION), + (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("foreign key constraint \"%s\" " "cannot be implemented", fkconstraint->constr_name), @@ -4004,44 +4682,12 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, "are of incompatible types: %s and %s.", strVal(list_nth(fkconstraint->fk_attrs, i)), strVal(list_nth(fkconstraint->pk_attrs, i)), - format_type_be(fktypoid[i]), - format_type_be(pktypoid[i])))); - - /* - * Check that the found operator is compatible with the PK index, and - * generate a warning if not, since otherwise costly seqscans will be - * incurred to check FK validity. - */ - if (!op_in_opclass(oprid(o), opclasses[i])) - ereport(WARNING, - (errmsg("foreign key constraint \"%s\" " - "will require costly sequential scans", - fkconstraint->constr_name), - errdetail("Key columns \"%s\" and \"%s\" " - "are of different types: %s and %s.", - strVal(list_nth(fkconstraint->fk_attrs, i)), - strVal(list_nth(fkconstraint->pk_attrs, i)), - format_type_be(fktypoid[i]), - format_type_be(pktypoid[i])))); - - ReleaseSysCache(o); - } - - /* - * Tell Phase 3 to check that the constraint is satisfied by existing rows - * (we can skip this during table creation). - */ - if (!fkconstraint->skip_validation) - { - NewConstraint *newcon; - - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = fkconstraint->constr_name; - newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = RelationGetRelid(pkrel); - newcon->qual = (Node *) fkconstraint; + format_type_be(fktype), + format_type_be(pktype)))); - tab->constraints = lappend(tab->constraints, newcon); + pfeqoperators[i] = pfeqop; + ppeqoperators[i] = ppeqop; + ffeqoperators[i] = ffeqop; } /* @@ -4059,6 +4705,9 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, * constraint */ RelationGetRelid(pkrel), pkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, numpks, fkconstraint->fk_upd_action, fkconstraint->fk_del_action, @@ -4066,13 +4715,33 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, indexOid, NULL, /* no check constraint */ NULL, - NULL); + NULL, + true, /* islocal */ + 0); /* inhcount */ /* * Create the triggers that will enforce the constraint. */ createForeignKeyTriggers(rel, fkconstraint, constrOid); + /* + * Tell Phase 3 to check that the constraint is satisfied by existing rows + * (we can skip this during table creation). + */ + if (!fkconstraint->skip_validation) + { + NewConstraint *newcon; + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = fkconstraint->constr_name; + newcon->contype = CONSTR_FOREIGN; + newcon->refrelid = RelationGetRelid(pkrel); + newcon->conid = constrOid; + newcon->qual = (Node *) fkconstraint; + + tab->constraints = lappend(tab->constraints, newcon); + } + /* * Close pk table, but keep lock until we've committed. */ @@ -4331,19 +5000,32 @@ transformFkeyCheckAttrs(Relation pkrel, static void validateForeignKeyConstraint(FkConstraint *fkconstraint, Relation rel, - Relation pkrel) + Relation pkrel, + Oid constraintOid) { HeapScanDesc scan; HeapTuple tuple; Trigger trig; - ListCell *list; - int count; + + /* + * Build a trigger call structure; we'll need it either way. + */ + MemSet(&trig, 0, sizeof(trig)); + trig.tgoid = InvalidOid; + trig.tgname = fkconstraint->constr_name; + trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN; + trig.tgisconstraint = TRUE; + trig.tgconstrrelid = RelationGetRelid(pkrel); + trig.tgconstraint = constraintOid; + trig.tgdeferrable = FALSE; + trig.tginitdeferred = FALSE; + /* we needn't fill in tgargs */ /* * See if we can do it with a single LEFT JOIN query. A FALSE result * indicates we must proceed with the fire-the-trigger method. */ - if (RI_Initial_Check(fkconstraint, rel, pkrel)) + if (RI_Initial_Check(&trig, rel, pkrel)) return; /* @@ -4351,44 +5033,9 @@ validateForeignKeyConstraint(FkConstraint *fkconstraint, * if that tuple had just been inserted. If any of those fail, it should * ereport(ERROR) and that's that. */ - MemSet(&trig, 0, sizeof(trig)); - trig.tgoid = InvalidOid; - trig.tgname = fkconstraint->constr_name; - trig.tgenabled = TRUE; - trig.tgisconstraint = TRUE; - trig.tgconstrrelid = RelationGetRelid(pkrel); - trig.tgdeferrable = FALSE; - trig.tginitdeferred = FALSE; - - trig.tgargs = (char **) palloc(sizeof(char *) * - (4 + list_length(fkconstraint->fk_attrs) - + list_length(fkconstraint->pk_attrs))); + scan = heap_beginscan(rel, SnapshotNow, 0, NULL); - trig.tgargs[0] = trig.tgname; - trig.tgargs[1] = RelationGetRelationName(rel); - trig.tgargs[2] = RelationGetRelationName(pkrel); - trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype); - count = 4; - foreach(list, fkconstraint->fk_attrs) - { - char *fk_at = strVal(lfirst(list)); - - trig.tgargs[count] = fk_at; - count += 2; - } - count = 5; - foreach(list, fkconstraint->pk_attrs) - { - char *pk_at = strVal(lfirst(list)); - - trig.tgargs[count] = pk_at; - count += 2; - } - trig.tgnargs = count - 1; - - scan = heap_beginscan(rel, SnapshotNow, 0, NULL); - - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { FunctionCallInfoData fcinfo; TriggerData trigdata; @@ -4418,18 +5065,13 @@ validateForeignKeyConstraint(FkConstraint *fkconstraint, } heap_endscan(scan); - - pfree(trig.tgargs); } static void CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint, - ObjectAddress *constrobj, ObjectAddress *trigobj, - bool on_insert) + Oid constraintOid, bool on_insert) { CreateTrigStmt *fk_trigger; - ListCell *fk_attr; - ListCell *pk_attr; fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = fkconstraint->constr_name; @@ -4454,32 +5096,9 @@ CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint, fk_trigger->deferrable = fkconstraint->deferrable; fk_trigger->initdeferred = fkconstraint->initdeferred; fk_trigger->constrrel = fkconstraint->pktable; - fk_trigger->args = NIL; - fk_trigger->args = lappend(fk_trigger->args, - makeString(fkconstraint->constr_name)); - fk_trigger->args = lappend(fk_trigger->args, - makeString(myRel->relname)); - fk_trigger->args = lappend(fk_trigger->args, - makeString(fkconstraint->pktable->relname)); - fk_trigger->args = lappend(fk_trigger->args, - makeString(fkMatchTypeToString(fkconstraint->fk_matchtype))); - if (list_length(fkconstraint->fk_attrs) != list_length(fkconstraint->pk_attrs)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_FOREIGN_KEY), - errmsg("number of referencing and referenced columns for foreign key disagree"))); - - forboth(fk_attr, fkconstraint->fk_attrs, - pk_attr, fkconstraint->pk_attrs) - { - fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr)); - fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr)); - } - trigobj->objectId = CreateTrigger(fk_trigger, true); - - /* Register dependency from trigger to constraint */ - recordDependencyOn(trigobj, constrobj, DEPENDENCY_INTERNAL); + (void) CreateTrigger(fk_trigger, constraintOid); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -4490,29 +5109,17 @@ CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint, */ static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, - Oid constrOid) + Oid constraintOid) { RangeVar *myRel; CreateTrigStmt *fk_trigger; - ListCell *fk_attr; - ListCell *pk_attr; - ObjectAddress trigobj, - constrobj; /* * Reconstruct a RangeVar for my relation (not passed in, unfortunately). */ myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), - pstrdup(RelationGetRelationName(rel))); - - /* - * Preset objectAddress fields - */ - constrobj.classId = ConstraintRelationId; - constrobj.objectId = constrOid; - constrobj.objectSubId = 0; - trigobj.classId = TriggerRelationId; - trigobj.objectSubId = 0; + pstrdup(RelationGetRelationName(rel)), + -1); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -4521,8 +5128,8 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, * Build and execute a CREATE CONSTRAINT TRIGGER statement for the CHECK * action for both INSERTs and UPDATEs on the referencing table. */ - CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, true); - CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, false); + CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, true); + CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, false); /* * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON @@ -4570,27 +5177,9 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, (int) fkconstraint->fk_del_action); break; } - fk_trigger->args = NIL; - fk_trigger->args = lappend(fk_trigger->args, - makeString(fkconstraint->constr_name)); - fk_trigger->args = lappend(fk_trigger->args, - makeString(myRel->relname)); - fk_trigger->args = lappend(fk_trigger->args, - makeString(fkconstraint->pktable->relname)); - fk_trigger->args = lappend(fk_trigger->args, - makeString(fkMatchTypeToString(fkconstraint->fk_matchtype))); - forboth(fk_attr, fkconstraint->fk_attrs, - pk_attr, fkconstraint->pk_attrs) - { - fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr)); - fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr)); - } - - trigobj.objectId = CreateTrigger(fk_trigger, true); - /* Register dependency from trigger to constraint */ - recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL); + (void) CreateTrigger(fk_trigger, constraintOid); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -4640,93 +5229,193 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, (int) fkconstraint->fk_upd_action); break; } - fk_trigger->args = NIL; - fk_trigger->args = lappend(fk_trigger->args, - makeString(fkconstraint->constr_name)); - fk_trigger->args = lappend(fk_trigger->args, - makeString(myRel->relname)); - fk_trigger->args = lappend(fk_trigger->args, - makeString(fkconstraint->pktable->relname)); - fk_trigger->args = lappend(fk_trigger->args, - makeString(fkMatchTypeToString(fkconstraint->fk_matchtype))); - forboth(fk_attr, fkconstraint->fk_attrs, - pk_attr, fkconstraint->pk_attrs) - { - fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr)); - fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr)); - } - - trigobj.objectId = CreateTrigger(fk_trigger, true); - - /* Register dependency from trigger to constraint */ - recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL); -} -/* - * fkMatchTypeToString - - * convert FKCONSTR_MATCH_xxx code to string to use in trigger args - */ -static char * -fkMatchTypeToString(char match_type) -{ - switch (match_type) - { - case FKCONSTR_MATCH_FULL: - return pstrdup("FULL"); - case FKCONSTR_MATCH_PARTIAL: - return pstrdup("PARTIAL"); - case FKCONSTR_MATCH_UNSPECIFIED: - return pstrdup("UNSPECIFIED"); - default: - elog(ERROR, "unrecognized match type: %d", - (int) match_type); - } - return NULL; /* can't get here */ + (void) CreateTrigger(fk_trigger, constraintOid); } /* * ALTER TABLE DROP CONSTRAINT + * + * Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism. */ static void -ATPrepDropConstraint(List **wqueue, Relation rel, - bool recurse, AlterTableCmd *cmd) +ATExecDropConstraint(Relation rel, const char *constrName, + DropBehavior behavior, + bool recurse, bool recursing) { + List *children; + ListCell *child; + Relation conrel; + Form_pg_constraint con; + SysScanDesc scan; + ScanKeyData key; + HeapTuple tuple; + bool found = false; + bool is_check_constraint = false; + + /* At top level, permission check was done in ATPrepCmd, else do it */ + if (recursing) + ATSimplePermissions(rel, false); + + conrel = heap_open(ConstraintRelationId, RowExclusiveLock); + /* - * We don't want errors or noise from child tables, so we have to pass - * down a modified command. + * Find and drop the target constraint */ - if (recurse) + ScanKeyInit(&key, + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + scan = systable_beginscan(conrel, ConstraintRelidIndexId, + true, SnapshotNow, 1, &key); + + while (HeapTupleIsValid(tuple = systable_getnext(scan))) { - AlterTableCmd *childCmd = copyObject(cmd); + ObjectAddress conobj; + + con = (Form_pg_constraint) GETSTRUCT(tuple); + + if (strcmp(NameStr(con->conname), constrName) != 0) + continue; + + /* Don't drop inherited constraints */ + if (con->coninhcount > 0 && !recursing) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", + constrName, RelationGetRelationName(rel)))); + + /* Right now only CHECK constraints can be inherited */ + if (con->contype == CONSTRAINT_CHECK) + is_check_constraint = true; - childCmd->subtype = AT_DropConstraintQuietly; - ATSimpleRecursion(wqueue, rel, childCmd, recurse); + /* + * Perform the actual constraint deletion + */ + conobj.classId = ConstraintRelationId; + conobj.objectId = HeapTupleGetOid(tuple); + conobj.objectSubId = 0; + + performDeletion(&conobj, behavior); + + found = true; } -} -static void -ATExecDropConstraint(Relation rel, const char *constrName, - DropBehavior behavior, bool quiet) -{ - int deleted; + systable_endscan(scan); + + if (!found) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, RelationGetRelationName(rel)))); - deleted = RemoveRelConstraints(rel, constrName, behavior); + /* + * Propagate to children as appropriate. Unlike most other ALTER + * routines, we have to do this one level of recursion at a time; we can't + * use find_all_inheritors to do it in one pass. + */ + if (is_check_constraint) + children = find_inheritance_children(RelationGetRelid(rel)); + else + children = NIL; - if (!quiet) + foreach(child, children) { - /* If zero constraints deleted, complain */ - if (deleted == 0) + Oid childrelid = lfirst_oid(child); + Relation childrel; + + childrel = heap_open(childrelid, AccessExclusiveLock); + CheckTableNotInUse(childrel, "ALTER TABLE"); + + ScanKeyInit(&key, + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(childrelid)); + scan = systable_beginscan(conrel, ConstraintRelidIndexId, + true, SnapshotNow, 1, &key); + + found = false; + + while (HeapTupleIsValid(tuple = systable_getnext(scan))) + { + HeapTuple copy_tuple; + + con = (Form_pg_constraint) GETSTRUCT(tuple); + + /* Right now only CHECK constraints can be inherited */ + if (con->contype != CONSTRAINT_CHECK) + continue; + + if (strcmp(NameStr(con->conname), constrName) != 0) + continue; + + found = true; + + if (con->coninhcount <= 0) /* shouldn't happen */ + elog(ERROR, "relation %u has non-inherited constraint \"%s\"", + childrelid, constrName); + + copy_tuple = heap_copytuple(tuple); + con = (Form_pg_constraint) GETSTRUCT(copy_tuple); + + if (recurse) + { + /* + * If the child constraint has other definition sources, + * just decrement its inheritance count; if not, recurse + * to delete it. + */ + if (con->coninhcount == 1 && !con->conislocal) + { + /* Time to delete this child constraint, too */ + ATExecDropConstraint(childrel, constrName, behavior, + true, true); + } + else + { + /* Child constraint must survive my deletion */ + con->coninhcount--; + simple_heap_update(conrel, ©_tuple->t_self, copy_tuple); + CatalogUpdateIndexes(conrel, copy_tuple); + + /* Make update visible */ + CommandCounterIncrement(); + } + } + else + { + /* + * If we were told to drop ONLY in this table (no + * recursion), we need to mark the inheritors' constraints + * as locally defined rather than inherited. + */ + con->coninhcount--; + con->conislocal = true; + + simple_heap_update(conrel, ©_tuple->t_self, copy_tuple); + CatalogUpdateIndexes(conrel, copy_tuple); + + /* Make update visible */ + CommandCounterIncrement(); + } + + heap_freetuple(copy_tuple); + } + + systable_endscan(scan); + + if (!found) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" does not exist", - constrName))); - /* Otherwise if more than one constraint deleted, notify */ - else if (deleted > 1) - ereport(NOTICE, - (errmsg("multiple constraints named \"%s\" were dropped", - constrName))); + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, + RelationGetRelationName(childrel)))); + + heap_close(childrel, NoLock); } + + heap_close(conrel, RowExclusiveLock); } /* @@ -4744,6 +5433,7 @@ ATPrepAlterColumnType(List **wqueue, Form_pg_attribute attTup; AttrNumber attnum; Oid targettype; + int32 targettypmod; Node *transform; NewColumnValue *newval; ParseState *pstate = make_parsestate(NULL); @@ -4773,12 +5463,7 @@ ATPrepAlterColumnType(List **wqueue, colName))); /* Look up the target type */ - targettype = LookupTypeName(typename); - if (!OidIsValid(targettype)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("type \"%s\" does not exist", - TypeNameToString(typename)))); + targettype = typenameTypeId(NULL, typename, &targettypmod); /* make sure datatype is legal for a column */ CheckAttributeType(colName, targettype); @@ -4830,14 +5515,15 @@ ATPrepAlterColumnType(List **wqueue, transform = coerce_to_target_type(pstate, transform, exprType(transform), - targettype, typename->typmod, + targettype, targettypmod, COERCION_ASSIGNMENT, - COERCE_IMPLICIT_CAST); + COERCE_IMPLICIT_CAST, + -1); if (transform == NULL) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("column \"%s\" cannot be cast to type \"%s\"", - colName, TypeNameToString(typename)))); + errmsg("column \"%s\" cannot be cast to type %s", + colName, format_type_be(targettype)))); /* * Add a work queue item to make ATRewriteTable update the column @@ -4876,6 +5562,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, HeapTuple typeTuple; Form_pg_type tform; Oid targettype; + int32 targettypmod; Node *defaultexpr; Relation attrelation; Relation depRel; @@ -4904,7 +5591,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, colName))); /* Look up the target type (should not fail, since prep found it) */ - typeTuple = typenameType(typename); + typeTuple = typenameType(NULL, typename, &targettypmod); tform = (Form_pg_type) GETSTRUCT(typeTuple); targettype = HeapTupleGetOid(typeTuple); @@ -4927,14 +5614,15 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, defaultexpr = strip_implicit_coercions(defaultexpr); defaultexpr = coerce_to_target_type(NULL, /* no UNKNOWN params */ defaultexpr, exprType(defaultexpr), - targettype, typename->typmod, + targettype, targettypmod, COERCION_ASSIGNMENT, - COERCE_IMPLICIT_CAST); + COERCE_IMPLICIT_CAST, + -1); if (defaultexpr == NULL) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("default for column \"%s\" cannot be cast to type \"%s\"", - colName, TypeNameToString(typename)))); + errmsg("default for column \"%s\" cannot be cast to type %s", + colName, format_type_be(targettype)))); } else defaultexpr = NULL; @@ -5023,7 +5711,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, if (!list_member_oid(tab->changedConstraintOids, foundObject.objectId)) { - char *defstring = pg_get_constraintdef_string(foundObject.objectId); + char *defstring = pg_get_constraintdef_string(foundObject.objectId); /* * Put NORMAL dependencies at the front of the list and @@ -5081,8 +5769,13 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, case OCLASS_LANGUAGE: case OCLASS_OPERATOR: case OCLASS_OPCLASS: + case OCLASS_OPFAMILY: case OCLASS_TRIGGER: case OCLASS_SCHEMA: + case OCLASS_TSPARSER: + case OCLASS_TSDICT: + case OCLASS_TSTEMPLATE: + case OCLASS_TSCONFIG: /* * We don't expect any of these sorts of objects to depend on @@ -5144,7 +5837,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, * copy of the syscache entry, so okay to scribble on.) */ attTup->atttypid = targettype; - attTup->atttypmod = typename->typmod; + attTup->atttypmod = targettypmod; attTup->attndims = list_length(typename->arrayBounds); attTup->attlen = tform->typlen; attTup->attbyval = tform->typbyval; @@ -5186,7 +5879,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, */ RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true); - StoreAttrDefault(rel, attnum, nodeToString(defaultexpr)); + StoreAttrDefault(rel, attnum, defaultexpr); } /* Cleanup */ @@ -5220,10 +5913,10 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) /* * Now we can drop the existing constraints and indexes --- constraints * first, since some of them might depend on the indexes. In fact, we - * have to delete FOREIGN KEY constraints before UNIQUE constraints, - * but we already ordered the constraint list to ensure that would happen. - * It should be okay to use DROP_RESTRICT here, since nothing else should - * be depending on these objects. + * have to delete FOREIGN KEY constraints before UNIQUE constraints, but + * we already ordered the constraint list to ensure that would happen. It + * should be okay to use DROP_RESTRICT here, since nothing else should be + * depending on these objects. */ foreach(l, tab->changedConstraintOids) { @@ -5255,17 +5948,27 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) ListCell *list_item; /* - * We expect that we only have to do raw parsing and parse analysis, not - * any rule rewriting, since these will all be utility statements. + * We expect that we will get only ALTER TABLE and CREATE INDEX + * statements. Hence, there is no need to pass them through + * parse_analyze() or the rewriter, but instead we need to pass them + * through parse_utilcmd.c to make them ready for execution. */ raw_parsetree_list = raw_parser(cmd); querytree_list = NIL; foreach(list_item, raw_parsetree_list) { - Node *parsetree = (Node *) lfirst(list_item); - - querytree_list = list_concat(querytree_list, - parse_analyze(parsetree, NULL, 0)); + Node *stmt = (Node *) lfirst(list_item); + + if (IsA(stmt, IndexStmt)) + querytree_list = lappend(querytree_list, + transformIndexStmt((IndexStmt *) stmt, + cmd)); + else if (IsA(stmt, AlterTableStmt)) + querytree_list = list_concat(querytree_list, + transformAlterTableStmt((AlterTableStmt *) stmt, + cmd)); + else + querytree_list = lappend(querytree_list, stmt); } /* @@ -5274,17 +5977,15 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) */ foreach(list_item, querytree_list) { - Query *query = (Query *) lfirst(list_item); + Node *stm = (Node *) lfirst(list_item); Relation rel; AlteredTableInfo *tab; - Assert(IsA(query, Query)); - Assert(query->commandType == CMD_UTILITY); - switch (nodeTag(query->utilityStmt)) + switch (nodeTag(stm)) { case T_IndexStmt: { - IndexStmt *stmt = (IndexStmt *) query->utilityStmt; + IndexStmt *stmt = (IndexStmt *) stm; AlterTableCmd *newcmd; rel = relation_openrv(stmt->relation, AccessExclusiveLock); @@ -5299,7 +6000,7 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) } case T_AlterTableStmt: { - AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt; + AlterTableStmt *stmt = (AlterTableStmt *) stm; ListCell *lcmd; rel = relation_openrv(stmt->relation, AccessExclusiveLock); @@ -5329,7 +6030,7 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) } default: elog(ERROR, "unexpected statement type: %d", - (int) nodeTag(query->utilityStmt)); + (int) nodeTag(stm)); } } } @@ -5338,11 +6039,14 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) /* * ALTER TABLE OWNER * - * recursing is true if we are recursing from a table to its indexes or - * toast table. We don't allow the ownership of those things to be - * changed separately from the parent table. Also, we can skip permission + * recursing is true if we are recursing from a table to its indexes, + * sequences, or toast table. We don't allow the ownership of those things to + * be changed separately from the parent table. Also, we can skip permission * checks (this is necessary not just an optimization, else we'd fail to * handle toast tables properly). + * + * recursing is also true if ALTER TYPE OWNER is calling us to fix up a + * free-standing composite type. */ void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) @@ -5373,7 +6077,6 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) { case RELKIND_RELATION: case RELKIND_VIEW: - case RELKIND_SEQUENCE: /* ok to change owner */ break; case RELKIND_INDEX: @@ -5396,6 +6099,33 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) newOwnerId = tuple_class->relowner; } break; + case RELKIND_SEQUENCE: + if (!recursing && + tuple_class->relowner != newOwnerId) + { + /* if it's an owned sequence, disallow changing it by itself */ + Oid tableId; + int32 colId; + + if (sequenceIsOwned(relationOid, &tableId, &colId)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot change owner of sequence \"%s\"", + NameStr(tuple_class->relname)), + errdetail("Sequence \"%s\" is linked to table \"%s\".", + NameStr(tuple_class->relname), + get_rel_name(tableId)))); + } + break; + case RELKIND_COMPOSITE_TYPE: + if (recursing) + break; + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a composite type", + NameStr(tuple_class->relname)), + errhint("Use ALTER TYPE instead."))); + break; case RELKIND_TOASTVALUE: if (recursing) break; @@ -5414,8 +6144,8 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) if (tuple_class->relowner != newOwnerId) { Datum repl_val[Natts_pg_class]; - char repl_null[Natts_pg_class]; - char repl_repl[Natts_pg_class]; + bool repl_null[Natts_pg_class]; + bool repl_repl[Natts_pg_class]; Acl *newAcl; Datum aclDatum; bool isNull; @@ -5447,10 +6177,10 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) } } - memset(repl_null, ' ', sizeof(repl_null)); - memset(repl_repl, ' ', sizeof(repl_repl)); + memset(repl_null, false, sizeof(repl_null)); + memset(repl_repl, false, sizeof(repl_repl)); - repl_repl[Anum_pg_class_relowner - 1] = 'r'; + repl_repl[Anum_pg_class_relowner - 1] = true; repl_val[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(newOwnerId); /* @@ -5464,25 +6194,34 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) { newAcl = aclnewowner(DatumGetAclP(aclDatum), tuple_class->relowner, newOwnerId); - repl_repl[Anum_pg_class_relacl - 1] = 'r'; + repl_repl[Anum_pg_class_relacl - 1] = true; repl_val[Anum_pg_class_relacl - 1] = PointerGetDatum(newAcl); } - newtuple = heap_modifytuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl); + newtuple = heap_modify_tuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl); simple_heap_update(class_rel, &newtuple->t_self, newtuple); CatalogUpdateIndexes(class_rel, newtuple); heap_freetuple(newtuple); - /* Update owner dependency reference */ - changeDependencyOnOwner(RelationRelationId, relationOid, newOwnerId); + /* + * Update owner dependency reference, if any. A composite type has + * none, because it's tracked for the pg_type entry instead of here; + * indexes and TOAST tables don't have their own entries either. + */ + if (tuple_class->relkind != RELKIND_COMPOSITE_TYPE && + tuple_class->relkind != RELKIND_INDEX && + tuple_class->relkind != RELKIND_TOASTVALUE) + changeDependencyOnOwner(RelationRelationId, relationOid, + newOwnerId); /* * Also change the ownership of the table's rowtype, if it has one */ if (tuple_class->relkind != RELKIND_INDEX) - AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId); + AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId, + tuple_class->relkind == RELKIND_COMPOSITE_TYPE); /* * If we are operating on a table, also change the ownership of any @@ -5538,7 +6277,7 @@ change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId) HeapTuple tup; /* - * SERIAL sequences are those having an internal dependency on one of the + * SERIAL sequences are those having an auto dependency on one of the * table's columns (we don't care *which* column, exactly). */ depRel = heap_open(DependRelationId, AccessShareLock); @@ -5561,11 +6300,11 @@ change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId) Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup); Relation seqRel; - /* skip dependencies other than internal dependencies on columns */ + /* skip dependencies other than auto dependencies on columns */ if (depForm->refobjsubid == 0 || depForm->classid != RelationRelationId || depForm->objsubid != 0 || - depForm->deptype != DEPENDENCY_INTERNAL) + depForm->deptype != DEPENDENCY_AUTO) continue; /* Use relation_open just in case it's an index */ @@ -5580,7 +6319,7 @@ change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId) } /* We don't need to close the sequence while we alter it. */ - ATExecChangeOwner(depForm->objid, newOwnerId, false); + ATExecChangeOwner(depForm->objid, newOwnerId, true); /* Now we can close it. Keep the lock till end of transaction. */ relation_close(seqRel, NoLock); @@ -5637,28 +6376,6 @@ ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename) Oid tablespaceId; AclResult aclresult; - /* - * We do our own permission checking because we want to allow this on - * indexes. - */ - if (rel->rd_rel->relkind != RELKIND_RELATION && - rel->rd_rel->relkind != RELKIND_INDEX) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table or index", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - - if (!allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); - /* Check that the tablespace exists */ tablespaceId = get_tablespace_oid(tablespacename); if (!OidIsValid(tablespaceId)) @@ -5679,6 +6396,89 @@ ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename) tab->newTableSpace = tablespaceId; } +/* + * ALTER TABLE/INDEX SET (...) or RESET (...) + */ +static void +ATExecSetRelOptions(Relation rel, List *defList, bool isReset) +{ + Oid relid; + Relation pgclass; + HeapTuple tuple; + HeapTuple newtuple; + Datum datum; + bool isnull; + Datum newOptions; + Datum repl_val[Natts_pg_class]; + bool repl_null[Natts_pg_class]; + bool repl_repl[Natts_pg_class]; + + if (defList == NIL) + return; /* nothing to do */ + + pgclass = heap_open(RelationRelationId, RowExclusiveLock); + + /* Get the old reloptions */ + relid = RelationGetRelid(rel); + tuple = SearchSysCache(RELOID, + ObjectIdGetDatum(relid), + 0, 0, 0); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for relation %u", relid); + + datum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions, &isnull); + + /* Generate new proposed reloptions (text array) */ + newOptions = transformRelOptions(isnull ? (Datum) 0 : datum, + defList, false, isReset); + + /* Validate */ + switch (rel->rd_rel->relkind) + { + case RELKIND_RELATION: + case RELKIND_TOASTVALUE: + (void) heap_reloptions(rel->rd_rel->relkind, newOptions, true); + break; + case RELKIND_INDEX: + (void) index_reloptions(rel->rd_am->amoptions, newOptions, true); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table, index, or TOAST table", + RelationGetRelationName(rel)))); + break; + } + + /* + * All we need do here is update the pg_class row; the new options will be + * propagated into relcaches during post-commit cache inval. + */ + memset(repl_val, 0, sizeof(repl_val)); + memset(repl_null, false, sizeof(repl_null)); + memset(repl_repl, false, sizeof(repl_repl)); + + if (newOptions != (Datum) 0) + repl_val[Anum_pg_class_reloptions - 1] = newOptions; + else + repl_null[Anum_pg_class_reloptions - 1] = true; + + repl_repl[Anum_pg_class_reloptions - 1] = true; + + newtuple = heap_modify_tuple(tuple, RelationGetDescr(pgclass), + repl_val, repl_null, repl_repl); + + simple_heap_update(pgclass, &newtuple->t_self, newtuple); + + CatalogUpdateIndexes(pgclass, newtuple); + + heap_freetuple(newtuple); + + ReleaseSysCache(tuple); + + heap_close(pgclass, RowExclusiveLock); +} + /* * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple * rewriting to be done, so we just want to copy the data as fast as possible. @@ -5690,13 +6490,18 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace) Oid oldTableSpace; Oid reltoastrelid; Oid reltoastidxid; + Oid newrelfilenode; RelFileNode newrnode; SMgrRelation dstrel; Relation pg_class; HeapTuple tuple; Form_pg_class rd_rel; + ForkNumber forkNum; - rel = relation_open(tableOid, NoLock); + /* + * Need lock here in case we are recursing to toast table or index + */ + rel = relation_open(tableOid, AccessExclusiveLock); /* * We can never allow moving of shared or nailed-in-cache relations, @@ -5708,6 +6513,12 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace) errmsg("cannot move system relation \"%s\"", RelationGetRelationName(rel)))); + /* Can't move a non-shared relation into pg_global */ + if (newTableSpace == GLOBALTABLESPACE_OID) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("only shared relations can be placed in pg_global tablespace"))); + /* * Don't allow moving temp tables of other backends ... their local buffer * manager is not going to cope. @@ -5741,29 +6552,59 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace) elog(ERROR, "cache lookup failed for relation %u", tableOid); rd_rel = (Form_pg_class) GETSTRUCT(tuple); - /* create another storage file. Is it a little ugly ? */ - /* NOTE: any conflict in relfilenode value will be caught here */ + /* + * Since we copy the file directly without looking at the shared buffers, + * we'd better first flush out any pages of the source relation that are + * in shared buffers. We assume no new changes will be made while we are + * holding exclusive lock on the rel. + */ + FlushRelationBuffers(rel); + + /* + * Relfilenodes are not unique across tablespaces, so we need to allocate + * a new one in the new tablespace. + */ + newrelfilenode = GetNewRelFileNode(newTableSpace, + rel->rd_rel->relisshared, + NULL); + + /* Open old and new relation */ newrnode = rel->rd_node; + newrnode.relNode = newrelfilenode; newrnode.spcNode = newTableSpace; - dstrel = smgropen(newrnode); - smgrcreate(dstrel, rel->rd_istemp, false); - - /* copy relation data to the new physical file */ - copy_relation_data(rel, dstrel); - /* schedule unlinking old physical file */ RelationOpenSmgr(rel); - smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp); /* - * Now drop smgr references. The source was already dropped by - * smgrscheduleunlink. + * Create and copy all forks of the relation, and schedule unlinking + * of old physical files. + * + * NOTE: any conflict in relfilenode value will be caught in + * RelationCreateStorage(). */ + RelationCreateStorage(newrnode, rel->rd_istemp); + + /* copy main fork */ + copy_relation_data(rel->rd_smgr, dstrel, MAIN_FORKNUM, rel->rd_istemp); + + /* copy those extra forks that exist */ + for (forkNum = MAIN_FORKNUM + 1; forkNum <= MAX_FORKNUM; forkNum++) + { + if (smgrexists(rel->rd_smgr, forkNum)) + { + smgrcreate(dstrel, forkNum, false); + copy_relation_data(rel->rd_smgr, dstrel, forkNum, rel->rd_istemp); + } + } + + /* drop old relation, and close new one */ + RelationDropStorage(rel); smgrclose(dstrel); /* update the pg_class row */ rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace; + rd_rel->relfilenode = newrelfilenode; simple_heap_update(pg_class, &tuple->t_self, tuple); CatalogUpdateIndexes(pg_class, tuple); @@ -5787,74 +6628,37 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace) * Copy data, block by block */ static void -copy_relation_data(Relation rel, SMgrRelation dst) +copy_relation_data(SMgrRelation src, SMgrRelation dst, + ForkNumber forkNum, bool istemp) { - SMgrRelation src; bool use_wal; BlockNumber nblocks; BlockNumber blkno; char buf[BLCKSZ]; Page page = (Page) buf; - /* - * Since we copy the file directly without looking at the shared buffers, - * we'd better first flush out any pages of the source relation that are - * in shared buffers. We assume no new changes will be made while we are - * holding exclusive lock on the rel. - */ - FlushRelationBuffers(rel); - /* * We need to log the copied data in WAL iff WAL archiving is enabled AND * it's not a temp rel. */ - use_wal = XLogArchivingActive() && !rel->rd_istemp; + use_wal = XLogArchivingActive() && !istemp; - nblocks = RelationGetNumberOfBlocks(rel); - /* RelationGetNumberOfBlocks will certainly have opened rd_smgr */ - src = rel->rd_smgr; + nblocks = smgrnblocks(src, forkNum); for (blkno = 0; blkno < nblocks; blkno++) { - smgrread(src, blkno, buf); + smgrread(src, forkNum, blkno, buf); /* XLOG stuff */ if (use_wal) - { - xl_heap_newpage xlrec; - XLogRecPtr recptr; - XLogRecData rdata[2]; - - /* NO ELOG(ERROR) from here till newpage op is logged */ - START_CRIT_SECTION(); - - xlrec.node = dst->smgr_rnode; - xlrec.blkno = blkno; - - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfHeapNewpage; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); - - rdata[1].data = (char *) page; - rdata[1].len = BLCKSZ; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; - - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata); - - PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); - - END_CRIT_SECTION(); - } + log_newpage(&dst->smgr_rnode, forkNum, blkno, page); /* * Now write the page. We say isTemp = true even if it's not a temp * rel, because there's no need for smgr to schedule an fsync for this * write; we'll do it ourselves below. */ - smgrwrite(dst, blkno, buf, true); + smgrextend(dst, forkNum, blkno, buf, true); } /* @@ -5871,8 +6675,8 @@ copy_relation_data(Relation rel, SMgrRelation dst) * wouldn't replay our earlier WAL entries. If we do not fsync those pages * here, they might still not be on disk when the crash occurs. */ - if (!rel->rd_istemp) - smgrimmedsync(dst); + if (!istemp) + smgrimmedsync(dst, forkNum); } /* @@ -5882,280 +6686,603 @@ copy_relation_data(Relation rel, SMgrRelation dst) */ static void ATExecEnableDisableTrigger(Relation rel, char *trigname, - bool enable, bool skip_system) + char fires_when, bool skip_system) { - EnableDisableTrigger(rel, trigname, enable, skip_system); + EnableDisableTrigger(rel, trigname, fires_when, skip_system); } /* - * ALTER TABLE CREATE TOAST TABLE + * ALTER TABLE ENABLE/DISABLE RULE * - * Note: this is also invoked from outside this module; in such cases we - * expect the caller to have verified that the relation is a table and we - * have all the right permissions. Callers expect this function - * to end with CommandCounterIncrement if it makes any changes. + * We just pass this off to rewriteDefine.c. */ -void -AlterTableCreateToastTable(Oid relOid, bool silent) +static void +ATExecEnableDisableRule(Relation rel, char *trigname, + char fires_when) { - Relation rel; - HeapTuple reltup; - TupleDesc tupdesc; - bool shared_relation; - Relation class_rel; - Oid toast_relid; - Oid toast_idxid; - char toast_relname[NAMEDATALEN]; - char toast_idxname[NAMEDATALEN]; - IndexInfo *indexInfo; - Oid classObjectId[2]; - ObjectAddress baseobject, - toastobject; + EnableDisableRule(rel, trigname, fires_when); +} + +/* + * ALTER TABLE INHERIT + * + * Add a parent to the child's parents. This verifies that all the columns and + * check constraints of the parent appear in the child and that they have the + * same data types and expressions. + */ +static void +ATExecAddInherit(Relation child_rel, RangeVar *parent) +{ + Relation parent_rel, + catalogRelation; + SysScanDesc scan; + ScanKeyData key; + HeapTuple inheritsTuple; + int32 inhseqno; + List *children; /* - * Grab an exclusive lock on the target table, which we will NOT release - * until end of transaction. (This is probably redundant in all present - * uses...) + * AccessShareLock on the parent is what's obtained during normal CREATE + * TABLE ... INHERITS ..., so should be enough here. */ - rel = heap_open(relOid, AccessExclusiveLock); + parent_rel = heap_openrv(parent, AccessShareLock); /* - * Toast table is shared if and only if its parent is. - * - * We cannot allow toasting a shared relation after initdb (because - * there's no way to mark it toasted in other databases' pg_class). - * Unfortunately we can't distinguish initdb from a manually started - * standalone backend (toasting happens after the bootstrap phase, so - * checking IsBootstrapProcessingMode() won't work). However, we can at - * least prevent this mistake under normal multi-user operation. - */ - shared_relation = rel->rd_rel->relisshared; - if (shared_relation && IsUnderPostmaster) + * Must be owner of both parent and child -- child was checked by + * ATSimplePermissions call in ATPrepCmd + */ + ATSimplePermissions(parent_rel, false); + + /* Permanent rels cannot inherit from temporary ones */ + if (!isTempNamespace(RelationGetNamespace(child_rel)) && + isTempNamespace(RelationGetNamespace(parent_rel))) ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("shared tables cannot be toasted after initdb"))); + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot inherit from temporary relation \"%s\"", + RelationGetRelationName(parent_rel)))); /* - * Is it already toasted? + * Check for duplicates in the list of parents, and determine the highest + * inhseqno already present; we'll use the next one for the new parent. + * (Note: get RowExclusiveLock because we will write pg_inherits below.) + * + * Note: we do not reject the case where the child already inherits from + * the parent indirectly; CREATE TABLE doesn't reject comparable cases. */ - if (rel->rd_rel->reltoastrelid != InvalidOid) + catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock); + ScanKeyInit(&key, + Anum_pg_inherits_inhrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(child_rel))); + scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, + true, SnapshotNow, 1, &key); + + /* inhseqno sequences start at 1 */ + inhseqno = 0; + while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan))) { - if (silent) - { - heap_close(rel, NoLock); - return; - } + Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inheritsTuple); - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("table \"%s\" already has a TOAST table", - RelationGetRelationName(rel)))); + if (inh->inhparent == RelationGetRelid(parent_rel)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("relation \"%s\" would be inherited from more than once", + RelationGetRelationName(parent_rel)))); + if (inh->inhseqno > inhseqno) + inhseqno = inh->inhseqno; } + systable_endscan(scan); /* - * Check to see whether the table actually needs a TOAST table. + * Prevent circularity by seeing if proposed parent inherits from child. + * (In particular, this disallows making a rel inherit from itself.) + * + * This is not completely bulletproof because of race conditions: in + * multi-level inheritance trees, someone else could concurrently be + * making another inheritance link that closes the loop but does not join + * either of the rels we have locked. Preventing that seems to require + * exclusive locks on the entire inheritance tree, which is a cure worse + * than the disease. find_all_inheritors() will cope with circularity + * anyway, so don't sweat it too much. */ - if (!needs_toast_table(rel)) - { - if (silent) - { - heap_close(rel, NoLock); - return; - } + children = find_all_inheritors(RelationGetRelid(child_rel)); + if (list_member_oid(children, RelationGetRelid(parent_rel))) ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("table \"%s\" does not need a TOAST table", - RelationGetRelationName(rel)))); - } + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("circular inheritance not allowed"), + errdetail("\"%s\" is already a child of \"%s\".", + parent->relname, + RelationGetRelationName(child_rel)))); - /* - * Create the toast table and its index - */ - snprintf(toast_relname, sizeof(toast_relname), - "pg_toast_%u", relOid); - snprintf(toast_idxname, sizeof(toast_idxname), - "pg_toast_%u_index", relOid); - - /* this is pretty painful... need a tuple descriptor */ - tupdesc = CreateTemplateTupleDesc(3, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, - "chunk_id", - OIDOID, - -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 2, - "chunk_seq", - INT4OID, - -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, - "chunk_data", - BYTEAOID, - -1, 0); - - /* - * Ensure that the toast table doesn't itself get toasted, or we'll be - * toast :-(. This is essential for chunk_data because type bytea is - * toastable; hit the other two just to be sure. - */ - tupdesc->attrs[0]->attstorage = 'p'; - tupdesc->attrs[1]->attstorage = 'p'; - tupdesc->attrs[2]->attstorage = 'p'; - - /* - * Note: the toast relation is placed in the regular pg_toast namespace - * even if its master relation is a temp table. There cannot be any - * naming collision, and the toast rel will be destroyed when its master - * is, so there's no need to handle the toast rel as temp. - */ - toast_relid = heap_create_with_catalog(toast_relname, - PG_TOAST_NAMESPACE, - rel->rd_rel->reltablespace, - InvalidOid, - rel->rd_rel->relowner, - tupdesc, - RELKIND_TOASTVALUE, - shared_relation, - true, - 0, - ONCOMMIT_NOOP, - true); - - /* make the toast relation visible, else index creation will fail */ - CommandCounterIncrement(); + /* If parent has OIDs then child must have OIDs */ + if (parent_rel->rd_rel->relhasoids && !child_rel->rd_rel->relhasoids) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("table \"%s\" without OIDs cannot inherit from table \"%s\" with OIDs", + RelationGetRelationName(child_rel), + RelationGetRelationName(parent_rel)))); + + /* Match up the columns and bump attinhcount as needed */ + MergeAttributesIntoExisting(child_rel, parent_rel); + + /* Match up the constraints and bump coninhcount as needed */ + MergeConstraintsIntoExisting(child_rel, parent_rel); /* - * Create unique index on chunk_id, chunk_seq. - * - * NOTE: the normal TOAST access routines could actually function with a - * single-column index on chunk_id only. However, the slice access - * routines use both columns for faster access to an individual chunk. In - * addition, we want it to be unique as a check against the possibility of - * duplicate TOAST chunk OIDs. The index might also be a little more - * efficient this way, since btree isn't all that happy with large numbers - * of equal keys. + * OK, it looks valid. Make the catalog entries that show inheritance. */ + StoreCatalogInheritance1(RelationGetRelid(child_rel), + RelationGetRelid(parent_rel), + inhseqno + 1, + catalogRelation); + + /* Now we're done with pg_inherits */ + heap_close(catalogRelation, RowExclusiveLock); + + /* keep our lock on the parent relation until commit */ + heap_close(parent_rel, NoLock); +} + +/* + * Obtain the source-text form of the constraint expression for a check + * constraint, given its pg_constraint tuple + */ +static char * +decompile_conbin(HeapTuple contup, TupleDesc tupdesc) +{ + Form_pg_constraint con; + bool isnull; + Datum attr; + Datum expr; + + con = (Form_pg_constraint) GETSTRUCT(contup); + attr = heap_getattr(contup, Anum_pg_constraint_conbin, tupdesc, &isnull); + if (isnull) + elog(ERROR, "null conbin for constraint %u", HeapTupleGetOid(contup)); + + expr = DirectFunctionCall2(pg_get_expr, attr, + ObjectIdGetDatum(con->conrelid)); + return TextDatumGetCString(expr); +} + +/* + * Determine whether two check constraints are functionally equivalent + * + * The test we apply is to see whether they reverse-compile to the same + * source string. This insulates us from issues like whether attributes + * have the same physical column numbers in parent and child relations. + */ +static bool +constraints_equivalent(HeapTuple a, HeapTuple b, TupleDesc tupleDesc) +{ + Form_pg_constraint acon = (Form_pg_constraint) GETSTRUCT(a); + Form_pg_constraint bcon = (Form_pg_constraint) GETSTRUCT(b); + + if (acon->condeferrable != bcon->condeferrable || + acon->condeferred != bcon->condeferred || + strcmp(decompile_conbin(a, tupleDesc), + decompile_conbin(b, tupleDesc)) != 0) + return false; + else + return true; +} + +/* + * Check columns in child table match up with columns in parent, and increment + * their attinhcount. + * + * Called by ATExecAddInherit + * + * Currently all parent columns must be found in child. Missing columns are an + * error. One day we might consider creating new columns like CREATE TABLE + * does. However, that is widely unpopular --- in the common use case of + * partitioned tables it's a foot-gun. + * + * The data type must match exactly. If the parent column is NOT NULL then + * the child must be as well. Defaults are not compared, however. + */ +static void +MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel) +{ + Relation attrrel; + AttrNumber parent_attno; + int parent_natts; + TupleDesc tupleDesc; + TupleConstr *constr; + HeapTuple tuple; - indexInfo = makeNode(IndexInfo); - indexInfo->ii_NumIndexAttrs = 2; - indexInfo->ii_KeyAttrNumbers[0] = 1; - indexInfo->ii_KeyAttrNumbers[1] = 2; - indexInfo->ii_Expressions = NIL; - indexInfo->ii_ExpressionsState = NIL; - indexInfo->ii_Predicate = NIL; - indexInfo->ii_PredicateState = NIL; - indexInfo->ii_Unique = true; + attrrel = heap_open(AttributeRelationId, RowExclusiveLock); - classObjectId[0] = OID_BTREE_OPS_OID; - classObjectId[1] = INT4_BTREE_OPS_OID; + tupleDesc = RelationGetDescr(parent_rel); + parent_natts = tupleDesc->natts; + constr = tupleDesc->constr; + + for (parent_attno = 1; parent_attno <= parent_natts; parent_attno++) + { + Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1]; + char *attributeName = NameStr(attribute->attname); - toast_idxid = index_create(toast_relid, toast_idxname, InvalidOid, - indexInfo, - BTREE_AM_OID, - rel->rd_rel->reltablespace, - classObjectId, - true, false, true, false); + /* Ignore dropped columns in the parent. */ + if (attribute->attisdropped) + continue; + + /* Find same column in child (matching on column name). */ + tuple = SearchSysCacheCopyAttName(RelationGetRelid(child_rel), + attributeName); + if (HeapTupleIsValid(tuple)) + { + /* Check they are same type and typmod */ + Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple); + + if (attribute->atttypid != childatt->atttypid || + attribute->atttypmod != childatt->atttypmod) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("child table \"%s\" has different type for column \"%s\"", + RelationGetRelationName(child_rel), + attributeName))); + + if (attribute->attnotnull && !childatt->attnotnull) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("column \"%s\" in child table must be marked NOT NULL", + attributeName))); + + /* + * OK, bump the child column's inheritance count. (If we fail + * later on, this change will just roll back.) + */ + childatt->attinhcount++; + simple_heap_update(attrrel, &tuple->t_self, tuple); + CatalogUpdateIndexes(attrrel, tuple); + heap_freetuple(tuple); + } + else + { + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("child table is missing column \"%s\"", + attributeName))); + } + } + + heap_close(attrrel, RowExclusiveLock); +} + +/* + * Check constraints in child table match up with constraints in parent, + * and increment their coninhcount. + * + * Called by ATExecAddInherit + * + * Currently all constraints in parent must be present in the child. One day we + * may consider adding new constraints like CREATE TABLE does. We may also want + * to allow an optional flag on parent table constraints indicating they are + * intended to ONLY apply to the master table, not to the children. That would + * make it possible to ensure no records are mistakenly inserted into the + * master in partitioned tables rather than the appropriate child. + * + * XXX This is O(N^2) which may be an issue with tables with hundreds of + * constraints. As long as tables have more like 10 constraints it shouldn't be + * a problem though. Even 100 constraints ought not be the end of the world. + */ +static void +MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) +{ + Relation catalog_relation; + TupleDesc tuple_desc; + SysScanDesc parent_scan; + ScanKeyData parent_key; + HeapTuple parent_tuple; + + catalog_relation = heap_open(ConstraintRelationId, RowExclusiveLock); + tuple_desc = RelationGetDescr(catalog_relation); + + /* Outer loop scans through the parent's constraint definitions */ + ScanKeyInit(&parent_key, + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(parent_rel))); + parent_scan = systable_beginscan(catalog_relation, ConstraintRelidIndexId, + true, SnapshotNow, 1, &parent_key); + + while (HeapTupleIsValid(parent_tuple = systable_getnext(parent_scan))) + { + Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(parent_tuple); + SysScanDesc child_scan; + ScanKeyData child_key; + HeapTuple child_tuple; + bool found = false; + + if (parent_con->contype != CONSTRAINT_CHECK) + continue; + + /* Search for a child constraint matching this one */ + ScanKeyInit(&child_key, + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(child_rel))); + child_scan = systable_beginscan(catalog_relation, ConstraintRelidIndexId, + true, SnapshotNow, 1, &child_key); + + while (HeapTupleIsValid(child_tuple = systable_getnext(child_scan))) + { + Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple); + HeapTuple child_copy; + + if (child_con->contype != CONSTRAINT_CHECK) + continue; + + if (strcmp(NameStr(parent_con->conname), + NameStr(child_con->conname)) != 0) + continue; + + if (!constraints_equivalent(parent_tuple, child_tuple, tuple_desc)) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("child table \"%s\" has different definition for check constraint \"%s\"", + RelationGetRelationName(child_rel), + NameStr(parent_con->conname)))); + + /* + * OK, bump the child constraint's inheritance count. (If we fail + * later on, this change will just roll back.) + */ + child_copy = heap_copytuple(child_tuple); + child_con = (Form_pg_constraint) GETSTRUCT(child_copy); + child_con->coninhcount++; + simple_heap_update(catalog_relation, &child_copy->t_self, child_copy); + CatalogUpdateIndexes(catalog_relation, child_copy); + heap_freetuple(child_copy); + + found = true; + break; + } + + systable_endscan(child_scan); + + if (!found) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("child table is missing constraint \"%s\"", + NameStr(parent_con->conname)))); + } + + systable_endscan(parent_scan); + heap_close(catalog_relation, RowExclusiveLock); +} + +/* + * ALTER TABLE NO INHERIT + * + * Drop a parent from the child's parents. This just adjusts the attinhcount + * and attislocal of the columns and removes the pg_inherit and pg_depend + * entries. + * + * If attinhcount goes to 0 then attislocal gets set to true. If it goes back + * up attislocal stays true, which means if a child is ever removed from a + * parent then its columns will never be automatically dropped which may + * surprise. But at least we'll never surprise by dropping columns someone + * isn't expecting to be dropped which would actually mean data loss. + * + * coninhcount and conislocal for inherited constraints are adjusted in + * exactly the same way. + */ +static void +ATExecDropInherit(Relation rel, RangeVar *parent) +{ + Relation parent_rel; + Relation catalogRelation; + SysScanDesc scan; + ScanKeyData key[3]; + HeapTuple inheritsTuple, + attributeTuple, + constraintTuple, + depTuple; + List *connames; + bool found = false; /* - * Update toast rel's pg_class entry to show that it has an index. The - * index OID is stored into the reltoastidxid field for easy access by the - * tuple toaster. + * AccessShareLock on the parent is probably enough, seeing that DROP + * TABLE doesn't lock parent tables at all. We need some lock since we'll + * be inspecting the parent's schema. */ - setRelhasindex(toast_relid, true, true, toast_idxid); + parent_rel = heap_openrv(parent, AccessShareLock); /* - * Store the toast table's OID in the parent relation's pg_class row + * We don't bother to check ownership of the parent table --- ownership of + * the child is presumed enough rights. */ - class_rel = heap_open(RelationRelationId, RowExclusiveLock); - reltup = SearchSysCacheCopy(RELOID, - ObjectIdGetDatum(relOid), - 0, 0, 0); - if (!HeapTupleIsValid(reltup)) - elog(ERROR, "cache lookup failed for relation %u", relOid); - - ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid; + /* + * Find and destroy the pg_inherits entry linking the two, or error out if + * there is none. + */ + catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock); + ScanKeyInit(&key[0], + Anum_pg_inherits_inhrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, + true, SnapshotNow, 1, key); - simple_heap_update(class_rel, &reltup->t_self, reltup); + while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan))) + { + Oid inhparent; - /* Keep catalog indexes current */ - CatalogUpdateIndexes(class_rel, reltup); + inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent; + if (inhparent == RelationGetRelid(parent_rel)) + { + simple_heap_delete(catalogRelation, &inheritsTuple->t_self); + found = true; + break; + } + } - heap_freetuple(reltup); + systable_endscan(scan); + heap_close(catalogRelation, RowExclusiveLock); - heap_close(class_rel, RowExclusiveLock); + if (!found) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("relation \"%s\" is not a parent of relation \"%s\"", + RelationGetRelationName(parent_rel), + RelationGetRelationName(rel)))); /* - * Register dependency from the toast table to the master, so that the - * toast table will be deleted if the master is. + * Search through child columns looking for ones matching parent rel */ - baseobject.classId = RelationRelationId; - baseobject.objectId = relOid; - baseobject.objectSubId = 0; - toastobject.classId = RelationRelationId; - toastobject.objectId = toast_relid; - toastobject.objectSubId = 0; + catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock); + ScanKeyInit(&key[0], + Anum_pg_attribute_attrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId, + true, SnapshotNow, 1, key); + while (HeapTupleIsValid(attributeTuple = systable_getnext(scan))) + { + Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(attributeTuple); - recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL); + /* Ignore if dropped or not inherited */ + if (att->attisdropped) + continue; + if (att->attinhcount <= 0) + continue; + + if (SearchSysCacheExistsAttName(RelationGetRelid(parent_rel), + NameStr(att->attname))) + { + /* Decrement inhcount and possibly set islocal to true */ + HeapTuple copyTuple = heap_copytuple(attributeTuple); + Form_pg_attribute copy_att = (Form_pg_attribute) GETSTRUCT(copyTuple); + + copy_att->attinhcount--; + if (copy_att->attinhcount == 0) + copy_att->attislocal = true; + + simple_heap_update(catalogRelation, ©Tuple->t_self, copyTuple); + CatalogUpdateIndexes(catalogRelation, copyTuple); + heap_freetuple(copyTuple); + } + } + systable_endscan(scan); + heap_close(catalogRelation, RowExclusiveLock); /* - * Clean up and make changes visible + * Likewise, find inherited check constraints and disinherit them. + * To do this, we first need a list of the names of the parent's check + * constraints. (We cheat a bit by only checking for name matches, + * assuming that the expressions will match.) */ - heap_close(rel, NoLock); + catalogRelation = heap_open(ConstraintRelationId, RowExclusiveLock); + ScanKeyInit(&key[0], + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(parent_rel))); + scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, + true, SnapshotNow, 1, key); - CommandCounterIncrement(); -} + connames = NIL; -/* - * Check to see whether the table needs a TOAST table. It does only if - * (1) there are any toastable attributes, and (2) the maximum length - * of a tuple could exceed TOAST_TUPLE_THRESHOLD. (We don't want to - * create a toast table for something like "f1 varchar(20)".) - */ -static bool -needs_toast_table(Relation rel) -{ - int32 data_length = 0; - bool maxlength_unknown = false; - bool has_toastable_attrs = false; - TupleDesc tupdesc; - Form_pg_attribute *att; - int32 tuple_length; - int i; + while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) + { + Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); - tupdesc = rel->rd_att; - att = tupdesc->attrs; + if (con->contype == CONSTRAINT_CHECK) + connames = lappend(connames, pstrdup(NameStr(con->conname))); + } - for (i = 0; i < tupdesc->natts; i++) + systable_endscan(scan); + + /* Now scan the child's constraints */ + ScanKeyInit(&key[0], + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, + true, SnapshotNow, 1, key); + + while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) { - if (att[i]->attisdropped) + Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); + bool match; + ListCell *lc; + + if (con->contype != CONSTRAINT_CHECK) continue; - data_length = att_align(data_length, att[i]->attalign); - if (att[i]->attlen > 0) + + match = false; + foreach (lc, connames) { - /* Fixed-length types are never toastable */ - data_length += att[i]->attlen; + if (strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0) + { + match = true; + break; + } } - else - { - int32 maxlen = type_maximum_size(att[i]->atttypid, - att[i]->atttypmod); - if (maxlen < 0) - maxlength_unknown = true; - else - data_length += maxlen; - if (att[i]->attstorage != 'p') - has_toastable_attrs = true; + if (match) + { + /* Decrement inhcount and possibly set islocal to true */ + HeapTuple copyTuple = heap_copytuple(constraintTuple); + Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); + if (copy_con->coninhcount <= 0) /* shouldn't happen */ + elog(ERROR, "relation %u has non-inherited constraint \"%s\"", + RelationGetRelid(rel), NameStr(copy_con->conname)); + + copy_con->coninhcount--; + if (copy_con->coninhcount == 0) + copy_con->conislocal = true; + + simple_heap_update(catalogRelation, ©Tuple->t_self, copyTuple); + CatalogUpdateIndexes(catalogRelation, copyTuple); + heap_freetuple(copyTuple); } } - if (!has_toastable_attrs) - return false; /* nothing to toast? */ - if (maxlength_unknown) - return true; /* any unlimited-length attrs? */ - tuple_length = MAXALIGN(offsetof(HeapTupleHeaderData, t_bits) + - BITMAPLEN(tupdesc->natts)) + - MAXALIGN(data_length); - return (tuple_length > TOAST_TUPLE_THRESHOLD); + + systable_endscan(scan); + heap_close(catalogRelation, RowExclusiveLock); + + /* + * Drop the dependency + * + * There's no convenient way to do this, so go trawling through pg_depend + */ + catalogRelation = heap_open(DependRelationId, RowExclusiveLock); + + ScanKeyInit(&key[0], + Anum_pg_depend_classid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationRelationId)); + ScanKeyInit(&key[1], + Anum_pg_depend_objid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + ScanKeyInit(&key[2], + Anum_pg_depend_objsubid, + BTEqualStrategyNumber, F_INT4EQ, + Int32GetDatum(0)); + + scan = systable_beginscan(catalogRelation, DependDependerIndexId, true, + SnapshotNow, 3, key); + + while (HeapTupleIsValid(depTuple = systable_getnext(scan))) + { + Form_pg_depend dep = (Form_pg_depend) GETSTRUCT(depTuple); + + if (dep->refclassid == RelationRelationId && + dep->refobjid == RelationGetRelid(parent_rel) && + dep->refobjsubid == 0 && + dep->deptype == DEPENDENCY_NORMAL) + simple_heap_delete(catalogRelation, &depTuple->t_self); + } + + systable_endscan(scan); + heap_close(catalogRelation, RowExclusiveLock); + + /* keep our lock on the parent relation until commit */ + heap_close(parent_rel, NoLock); } @@ -6165,7 +7292,8 @@ needs_toast_table(Relation rel) * Note: caller must have checked ownership of the relation already */ void -AlterTableNamespace(RangeVar *relation, const char *newschema) +AlterTableNamespace(RangeVar *relation, const char *newschema, + ObjectType stmttype) { Relation rel; Oid relid; @@ -6173,18 +7301,80 @@ AlterTableNamespace(RangeVar *relation, const char *newschema) Oid nspOid; Relation classRel; - rel = heap_openrv(relation, AccessExclusiveLock); - - /* heap_openrv allows TOAST, but we don't want to */ - if (rel->rd_rel->relkind == RELKIND_TOASTVALUE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a TOAST relation", - RelationGetRelationName(rel)))); + rel = relation_openrv(relation, AccessExclusiveLock); relid = RelationGetRelid(rel); oldNspOid = RelationGetNamespace(rel); + /* Check relation type against type specified in the ALTER command */ + switch (stmttype) + { + case OBJECT_TABLE: + /* + * For mostly-historical reasons, we allow ALTER TABLE to apply + * to all relation types. + */ + break; + + case OBJECT_SEQUENCE: + if (rel->rd_rel->relkind != RELKIND_SEQUENCE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a sequence", + RelationGetRelationName(rel)))); + break; + + case OBJECT_VIEW: + if (rel->rd_rel->relkind != RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a view", + RelationGetRelationName(rel)))); + break; + + default: + elog(ERROR, "unrecognized object type: %d", (int) stmttype); + } + + /* Can we change the schema of this tuple? */ + switch (rel->rd_rel->relkind) + { + case RELKIND_RELATION: + case RELKIND_VIEW: + /* ok to change schema */ + break; + case RELKIND_SEQUENCE: + { + /* if it's an owned sequence, disallow moving it by itself */ + Oid tableId; + int32 colId; + + if (sequenceIsOwned(relid, &tableId, &colId)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot move an owned sequence into another schema"), + errdetail("Sequence \"%s\" is linked to table \"%s\".", + RelationGetRelationName(rel), + get_rel_name(tableId)))); + } + break; + case RELKIND_COMPOSITE_TYPE: + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a composite type", + RelationGetRelationName(rel)), + errhint("Use ALTER TYPE instead."))); + break; + case RELKIND_INDEX: + case RELKIND_TOASTVALUE: + /* FALL THRU */ + default: + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table, view, or sequence", + RelationGetRelationName(rel)))); + } + /* get schema OID and check its permissions */ nspOid = LookupCreationNamespace(newschema); @@ -6213,7 +7403,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema) AlterRelationNamespaceInternal(classRel, relid, oldNspOid, nspOid, true); /* Fix the table's rowtype too */ - AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false); + AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false, false); /* Fix other dependent stuff */ if (rel->rd_rel->relkind == RELKIND_RELATION) @@ -6325,7 +7515,7 @@ AlterSeqNamespaces(Relation classRel, Relation rel, HeapTuple tup; /* - * SERIAL sequences are those having an internal dependency on one of the + * SERIAL sequences are those having an auto dependency on one of the * table's columns (we don't care *which* column, exactly). */ depRel = heap_open(DependRelationId, AccessShareLock); @@ -6348,11 +7538,11 @@ AlterSeqNamespaces(Relation classRel, Relation rel, Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup); Relation seqRel; - /* skip dependencies other than internal dependencies on columns */ + /* skip dependencies other than auto dependencies on columns */ if (depForm->refobjsubid == 0 || depForm->classid != RelationRelationId || depForm->objsubid != 0 || - depForm->deptype != DEPENDENCY_INTERNAL) + depForm->deptype != DEPENDENCY_AUTO) continue; /* Use relation_open just in case it's an index */ @@ -6376,7 +7566,7 @@ AlterSeqNamespaces(Relation classRel, Relation rel, * them to the new namespace, too. */ AlterTypeNamespaceInternal(RelationGetForm(seqRel)->reltype, - newNspOid, false); + newNspOid, false, false); /* Now we can close it. Keep the lock till end of transaction. */ relation_close(seqRel, NoLock);