X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fcommands%2Ftablecmds.c;h=6c60ddd5c104f79909116f844a56df99bc7747bb;hb=65e3ea76417d1baab158fd8305ebed4f43141c7a;hp=1aaa0bd86fe91b9663a193ff5633fc33646c9d23;hpb=cd902b331dc4b0c170e800441a98f9213d98b46b;p=postgresql diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 1aaa0bd86f..6c60ddd5c1 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.252 2008/05/09 23:32:04 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.273 2008/12/13 19:13:44 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -17,6 +17,8 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/reloptions.h" +#include "access/relscan.h" +#include "access/sysattr.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" @@ -33,9 +35,11 @@ #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" #include "catalog/pg_type_fn.h" +#include "catalog/storage.h" #include "catalog/toasting.h" #include "commands/cluster.h" #include "commands/defrem.h" +#include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" @@ -43,6 +47,7 @@ #include "executor/executor.h" #include "miscadmin.h" #include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" #include "nodes/parsenodes.h" #include "optimizer/clauses.h" #include "optimizer/plancat.h" @@ -58,6 +63,8 @@ #include "parser/parser.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteHandler.h" +#include "storage/bufmgr.h" +#include "storage/lmgr.h" #include "storage/smgr.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -165,6 +172,53 @@ typedef struct NewColumnValue ExprState *exprstate; /* execution state */ } NewColumnValue; +/* + * Error-reporting support for RemoveRelations + */ +struct dropmsgstrings +{ + char kind; + int nonexistent_code; + const char *nonexistent_msg; + const char *skipping_msg; + const char *nota_msg; + const char *drophint_msg; +}; + +static const struct dropmsgstrings dropmsgstringarray[] = { + {RELKIND_RELATION, + ERRCODE_UNDEFINED_TABLE, + gettext_noop("table \"%s\" does not exist"), + gettext_noop("table \"%s\" does not exist, skipping"), + gettext_noop("\"%s\" is not a table"), + gettext_noop("Use DROP TABLE to remove a table.")}, + {RELKIND_SEQUENCE, + ERRCODE_UNDEFINED_TABLE, + gettext_noop("sequence \"%s\" does not exist"), + gettext_noop("sequence \"%s\" does not exist, skipping"), + gettext_noop("\"%s\" is not a sequence"), + gettext_noop("Use DROP SEQUENCE to remove a sequence.")}, + {RELKIND_VIEW, + ERRCODE_UNDEFINED_TABLE, + gettext_noop("view \"%s\" does not exist"), + gettext_noop("view \"%s\" does not exist, skipping"), + gettext_noop("\"%s\" is not a view"), + gettext_noop("Use DROP VIEW to remove a view.")}, + {RELKIND_INDEX, + ERRCODE_UNDEFINED_OBJECT, + gettext_noop("index \"%s\" does not exist"), + gettext_noop("index \"%s\" does not exist, skipping"), + gettext_noop("\"%s\" is not an index"), + gettext_noop("Use DROP INDEX to remove an index.")}, + {RELKIND_COMPOSITE_TYPE, + ERRCODE_UNDEFINED_OBJECT, + gettext_noop("type \"%s\" does not exist"), + gettext_noop("type \"%s\" does not exist, skipping"), + gettext_noop("\"%s\" is not a type"), + gettext_noop("Use DROP TYPE to remove a type.")}, + {'\0', 0, NULL, NULL, NULL, NULL} +}; + static void truncate_check_rel(Relation rel); static List *MergeAttributes(List *schema, List *supers, bool istemp, @@ -266,7 +320,8 @@ static void ATExecEnableDisableRule(Relation rel, char *rulename, char fires_when); static void ATExecAddInherit(Relation rel, RangeVar *parent); static void ATExecDropInherit(Relation rel, RangeVar *parent); -static void copy_relation_data(Relation rel, SMgrRelation dst); +static void copy_relation_data(SMgrRelation rel, SMgrRelation dst, + ForkNumber forkNum, bool istemp); /* ---------------------------------------------------------------- @@ -494,22 +549,197 @@ DefineRelation(CreateStmt *stmt, char relkind) } /* - * RemoveRelation - * Deletes a relation. + * Emit the right error or warning message for a "DROP" command issued on a + * non-existent relation + */ +static void +DropErrorMsgNonExistent(const char *relname, char rightkind, bool missing_ok) +{ + const struct dropmsgstrings *rentry; + + for (rentry = dropmsgstringarray; rentry->kind != '\0'; rentry++) + { + if (rentry->kind == rightkind) + { + if (!missing_ok) + { + ereport(ERROR, + (errcode(rentry->nonexistent_code), + errmsg(rentry->nonexistent_msg, relname))); + } + else + { + ereport(NOTICE, (errmsg(rentry->skipping_msg, relname))); + break; + } + } + } + + Assert(rentry->kind != '\0'); /* Should be impossible */ +} + +/* + * Emit the right error message for a "DROP" command issued on a + * relation of the wrong type + */ +static void +DropErrorMsgWrongType(const char *relname, char wrongkind, char rightkind) +{ + const struct dropmsgstrings *rentry; + const struct dropmsgstrings *wentry; + + for (rentry = dropmsgstringarray; rentry->kind != '\0'; rentry++) + if (rentry->kind == rightkind) + break; + Assert(rentry->kind != '\0'); + + for (wentry = dropmsgstringarray; wentry->kind != '\0'; wentry++) + if (wentry->kind == wrongkind) + break; + /* wrongkind could be something we don't have in our table... */ + + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg(rentry->nota_msg, relname), + (wentry->kind != '\0') ? errhint(wentry->drophint_msg) : 0)); +} + +/* + * RemoveRelations + * Implements DROP TABLE, DROP INDEX, DROP SEQUENCE, DROP VIEW */ void -RemoveRelation(const RangeVar *relation, DropBehavior behavior) +RemoveRelations(DropStmt *drop) { - Oid relOid; - ObjectAddress object; + ObjectAddresses *objects; + char relkind; + ListCell *cell; - relOid = RangeVarGetRelid(relation, false); + /* + * First we identify all the relations, then we delete them in a single + * performMultipleDeletions() call. This is to avoid unwanted + * DROP RESTRICT errors if one of the relations depends on another. + */ - object.classId = RelationRelationId; - object.objectId = relOid; - object.objectSubId = 0; + /* Determine required relkind */ + switch (drop->removeType) + { + case OBJECT_TABLE: + relkind = RELKIND_RELATION; + break; - performDeletion(&object, behavior); + case OBJECT_INDEX: + relkind = RELKIND_INDEX; + break; + + case OBJECT_SEQUENCE: + relkind = RELKIND_SEQUENCE; + break; + + case OBJECT_VIEW: + relkind = RELKIND_VIEW; + break; + + default: + elog(ERROR, "unrecognized drop object type: %d", + (int) drop->removeType); + relkind = 0; /* keep compiler quiet */ + break; + } + + /* Lock and validate each relation; build a list of object addresses */ + objects = new_object_addresses(); + + foreach(cell, drop->objects) + { + RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell)); + Oid relOid; + HeapTuple tuple; + Form_pg_class classform; + ObjectAddress obj; + + /* + * These next few steps are a great deal like relation_openrv, but we + * don't bother building a relcache entry since we don't need it. + * + * Check for shared-cache-inval messages before trying to access the + * relation. This is needed to cover the case where the name + * identifies a rel that has been dropped and recreated since the + * start of our transaction: if we don't flush the old syscache entry, + * then we'll latch onto that entry and suffer an error later. + */ + AcceptInvalidationMessages(); + + /* Look up the appropriate relation using namespace search */ + relOid = RangeVarGetRelid(rel, true); + + /* Not there? */ + if (!OidIsValid(relOid)) + { + DropErrorMsgNonExistent(rel->relname, relkind, drop->missing_ok); + continue; + } + + /* + * In DROP INDEX, attempt to acquire lock on the parent table before + * locking the index. index_drop() will need this anyway, and since + * regular queries lock tables before their indexes, we risk deadlock + * if we do it the other way around. No error if we don't find a + * pg_index entry, though --- that most likely means it isn't an + * index, and we'll fail below. + */ + if (relkind == RELKIND_INDEX) + { + tuple = SearchSysCache(INDEXRELID, + ObjectIdGetDatum(relOid), + 0, 0, 0); + if (HeapTupleIsValid(tuple)) + { + Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple); + + LockRelationOid(index->indrelid, AccessExclusiveLock); + ReleaseSysCache(tuple); + } + } + + /* Get the lock before trying to fetch the syscache entry */ + LockRelationOid(relOid, AccessExclusiveLock); + + tuple = SearchSysCache(RELOID, + ObjectIdGetDatum(relOid), + 0, 0, 0); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for relation %u", relOid); + classform = (Form_pg_class) GETSTRUCT(tuple); + + if (classform->relkind != relkind) + DropErrorMsgWrongType(rel->relname, classform->relkind, relkind); + + /* Allow DROP to either table owner or schema owner */ + if (!pg_class_ownercheck(relOid, GetUserId()) && + !pg_namespace_ownercheck(classform->relnamespace, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + rel->relname); + + if (!allowSystemTableMods && IsSystemClass(classform)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied: \"%s\" is a system catalog", + rel->relname))); + + /* OK, we're ready to delete this one */ + obj.classId = RelationRelationId; + obj.objectId = relOid; + obj.objectSubId = 0; + + add_exact_object_address(&obj, objects); + + ReleaseSysCache(tuple); + } + + performMultipleDeletions(objects, drop->behavior); + + free_object_addresses(objects); } /* @@ -529,6 +759,7 @@ ExecuteTruncate(TruncateStmt *stmt) { List *rels = NIL; List *relids = NIL; + List *seq_relids = NIL; EState *estate; ResultRelInfo *resultRelInfos; ResultRelInfo *resultRelInfo; @@ -543,6 +774,12 @@ ExecuteTruncate(TruncateStmt *stmt) Relation rel; rel = heap_openrv(rv, AccessExclusiveLock); + /* don't throw error for "TRUNCATE foo, foo" */ + if (list_member_oid(relids, RelationGetRelid(rel))) + { + heap_close(rel, AccessExclusiveLock); + continue; + } truncate_check_rel(rel); rels = lappend(rels, rel); relids = lappend_oid(relids, RelationGetRelid(rel)); @@ -594,6 +831,40 @@ ExecuteTruncate(TruncateStmt *stmt) heap_truncate_check_FKs(rels, false); #endif + /* + * If we are asked to restart sequences, find all the sequences, + * lock them (we only need AccessShareLock because that's all that + * ALTER SEQUENCE takes), and check permissions. We want to do this + * early since it's pointless to do all the truncation work only to fail + * on sequence permissions. + */ + if (stmt->restart_seqs) + { + foreach(cell, rels) + { + Relation rel = (Relation) lfirst(cell); + List *seqlist = getOwnedSequences(RelationGetRelid(rel)); + ListCell *seqcell; + + foreach(seqcell, seqlist) + { + Oid seq_relid = lfirst_oid(seqcell); + Relation seq_rel; + + seq_rel = relation_open(seq_relid, AccessShareLock); + + /* This check must match AlterSequence! */ + if (!pg_class_ownercheck(seq_relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + RelationGetRelationName(seq_rel)); + + seq_relids = lappend_oid(seq_relids, seq_relid); + + relation_close(seq_rel, NoLock); + } + } + } + /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); @@ -692,6 +963,25 @@ ExecuteTruncate(TruncateStmt *stmt) heap_close(rel, NoLock); } + + /* + * Lastly, restart any owned sequences if we were asked to. This is done + * last because it's nontransactional: restarts will not roll back if + * we abort later. Hence it's important to postpone them as long as + * possible. (This is also a big reason why we locked and + * permission-checked the sequences beforehand.) + */ + if (stmt->restart_seqs) + { + List *options = list_make1(makeDefElem("restart", NULL)); + + foreach(cell, seq_relids) + { + Oid seq_relid = lfirst_oid(cell); + + AlterSequenceInternal(seq_relid, options); + } + } } /* @@ -700,6 +990,8 @@ ExecuteTruncate(TruncateStmt *stmt) static void truncate_check_rel(Relation rel) { + AclResult aclresult; + /* Only allow truncate on regular tables */ if (rel->rd_rel->relkind != RELKIND_RELATION) ereport(ERROR, @@ -708,8 +1000,10 @@ truncate_check_rel(Relation rel) RelationGetRelationName(rel)))); /* Permissions checks */ - if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), + ACL_TRUNCATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_CLASS, RelationGetRelationName(rel)); if (!allowSystemTableMods && IsSystemRelation(rel)) @@ -1377,7 +1671,7 @@ StoreCatalogInheritance1(Oid relationId, Oid parentOid, { TupleDesc desc = RelationGetDescr(inhRelation); Datum datum[Natts_pg_inherits]; - char nullarr[Natts_pg_inherits]; + bool nullarr[Natts_pg_inherits]; ObjectAddress childobject, parentobject; HeapTuple tuple; @@ -1389,11 +1683,11 @@ StoreCatalogInheritance1(Oid relationId, Oid parentOid, datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */ datum[2] = Int16GetDatum(seqNumber); /* inhseqno */ - nullarr[0] = ' '; - nullarr[1] = ' '; - nullarr[2] = ' '; + nullarr[0] = false; + nullarr[1] = false; + nullarr[2] = false; - tuple = heap_formtuple(desc, datum, nullarr); + tuple = heap_form_tuple(desc, datum, nullarr); simple_heap_insert(inhRelation, tuple); @@ -1917,6 +2211,44 @@ AlterTable(AlterTableStmt *stmt) CheckTableNotInUse(rel, "ALTER TABLE"); + /* Check relation type against type specified in the ALTER command */ + switch (stmt->relkind) + { + case OBJECT_TABLE: + /* + * For mostly-historical reasons, we allow ALTER TABLE to apply + * to all relation types. + */ + break; + + case OBJECT_INDEX: + if (rel->rd_rel->relkind != RELKIND_INDEX) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not an index", + RelationGetRelationName(rel)))); + break; + + case OBJECT_SEQUENCE: + if (rel->rd_rel->relkind != RELKIND_SEQUENCE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a sequence", + RelationGetRelationName(rel)))); + break; + + case OBJECT_VIEW: + if (rel->rd_rel->relkind != RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a view", + RelationGetRelationName(rel)))); + break; + + default: + elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind); + } + ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt)); } @@ -2002,6 +2334,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ATPrepAddColumn(wqueue, rel, recurse, cmd); pass = AT_PASS_ADD_COL; break; + case AT_AddColumnToView: /* add column via CREATE OR REPLACE VIEW */ + ATSimplePermissions(rel, true); + /* Performs own recursion */ + ATPrepAddColumn(wqueue, rel, recurse, cmd); + pass = AT_PASS_ADD_COL; + break; case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ /* @@ -2223,6 +2561,7 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, switch (cmd->subtype) { case AT_AddColumn: /* ADD COLUMN */ + case AT_AddColumnToView: /* add column via CREATE OR REPLACE VIEW */ ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def); break; case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ @@ -3119,12 +3458,11 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, Relation pgclass, attrdesc; HeapTuple reltup; - HeapTuple attributeTuple; - Form_pg_attribute attribute; - FormData_pg_attribute attributeD; + FormData_pg_attribute attribute; int i; int minattnum, maxatts; + char relkind; HeapTuple typeTuple; Oid typeOid; int32 typmod; @@ -3197,6 +3535,7 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, colDef->colname, RelationGetRelationName(rel)))); minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts; + relkind = ((Form_pg_class) GETSTRUCT(reltup))->relkind; maxatts = minattnum + 1; if (maxatts > MaxHeapAttributeNumber) ereport(ERROR, @@ -3212,37 +3551,27 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, /* make sure datatype is legal for a column */ CheckAttributeType(colDef->colname, typeOid); - attributeTuple = heap_addheader(Natts_pg_attribute, - false, - ATTRIBUTE_TUPLE_SIZE, - (void *) &attributeD); - - attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple); - - attribute->attrelid = myrelid; - namestrcpy(&(attribute->attname), colDef->colname); - attribute->atttypid = typeOid; - attribute->attstattarget = -1; - attribute->attlen = tform->typlen; - attribute->attcacheoff = -1; - attribute->atttypmod = typmod; - attribute->attnum = i; - attribute->attbyval = tform->typbyval; - attribute->attndims = list_length(colDef->typename->arrayBounds); - attribute->attstorage = tform->typstorage; - attribute->attalign = tform->typalign; - attribute->attnotnull = colDef->is_not_null; - attribute->atthasdef = false; - attribute->attisdropped = false; - attribute->attislocal = colDef->is_local; - attribute->attinhcount = colDef->inhcount; + attribute.attrelid = myrelid; + namestrcpy(&(attribute.attname), colDef->colname); + attribute.atttypid = typeOid; + attribute.attstattarget = -1; + attribute.attlen = tform->typlen; + attribute.attcacheoff = -1; + attribute.atttypmod = typmod; + attribute.attnum = i; + attribute.attbyval = tform->typbyval; + attribute.attndims = list_length(colDef->typename->arrayBounds); + attribute.attstorage = tform->typstorage; + attribute.attalign = tform->typalign; + attribute.attnotnull = colDef->is_not_null; + attribute.atthasdef = false; + attribute.attisdropped = false; + attribute.attislocal = colDef->is_local; + attribute.attinhcount = colDef->inhcount; ReleaseSysCache(typeTuple); - simple_heap_insert(attrdesc, attributeTuple); - - /* Update indexes on pg_attribute */ - CatalogUpdateIndexes(attrdesc, attributeTuple); + InsertPgAttributeTuple(attrdesc, &attribute, NULL); heap_close(attrdesc, RowExclusiveLock); @@ -3271,7 +3600,7 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, RawColumnDefault *rawEnt; rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault)); - rawEnt->attnum = attribute->attnum; + rawEnt->attnum = attribute.attnum; rawEnt->raw_default = copyObject(colDef->raw_default); /* @@ -3305,48 +3634,53 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, * Note: we use build_column_default, and not just the cooked default * returned by AddRelationNewConstraints, so that the right thing happens * when a datatype's default applies. + * + * We skip this logic completely for views. */ - defval = (Expr *) build_column_default(rel, attribute->attnum); + if (relkind != RELKIND_VIEW) { + defval = (Expr *) build_column_default(rel, attribute.attnum); - if (!defval && GetDomainConstraints(typeOid) != NIL) - { - Oid baseTypeId; - int32 baseTypeMod; - - baseTypeMod = typmod; - baseTypeId = getBaseTypeAndTypmod(typeOid, &baseTypeMod); - defval = (Expr *) makeNullConst(baseTypeId, baseTypeMod); - defval = (Expr *) coerce_to_target_type(NULL, - (Node *) defval, - baseTypeId, - typeOid, - typmod, - COERCION_ASSIGNMENT, - COERCE_IMPLICIT_CAST); - if (defval == NULL) /* should not happen */ - elog(ERROR, "failed to coerce base type to domain"); - } + if (!defval && GetDomainConstraints(typeOid) != NIL) + { + Oid baseTypeId; + int32 baseTypeMod; + + baseTypeMod = typmod; + baseTypeId = getBaseTypeAndTypmod(typeOid, &baseTypeMod); + defval = (Expr *) makeNullConst(baseTypeId, baseTypeMod); + defval = (Expr *) coerce_to_target_type(NULL, + (Node *) defval, + baseTypeId, + typeOid, + typmod, + COERCION_ASSIGNMENT, + COERCE_IMPLICIT_CAST, + -1); + if (defval == NULL) /* should not happen */ + elog(ERROR, "failed to coerce base type to domain"); + } - if (defval) - { - NewColumnValue *newval; + if (defval) + { + NewColumnValue *newval; - newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue)); - newval->attnum = attribute->attnum; - newval->expr = defval; + newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue)); + newval->attnum = attribute.attnum; + newval->expr = defval; - tab->newvals = lappend(tab->newvals, newval); - } + tab->newvals = lappend(tab->newvals, newval); + } - /* - * If the new column is NOT NULL, tell Phase 3 it needs to test that. - */ - tab->new_notnull |= colDef->is_not_null; + /* + * If the new column is NOT NULL, tell Phase 3 it needs to test that. + */ + tab->new_notnull |= colDef->is_not_null; + } /* * Add needed dependency entries for the new column. */ - add_column_datatype_dependency(myrelid, i, attribute->atttypid); + add_column_datatype_dependency(myrelid, i, attribute.atttypid); } /* @@ -3608,9 +3942,9 @@ ATExecSetStatistics(Relation rel, const char *colName, Node *newValue) errmsg("statistics target %d is too low", newtarget))); } - else if (newtarget > 1000) + else if (newtarget > 10000) { - newtarget = 1000; + newtarget = 10000; ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("lowering statistics target to %d", @@ -4320,11 +4654,11 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, /* * Otherwise, look for an implicit cast from the FK type to the * opcintype, and if found, use the primary equality operator. - * This is a bit tricky because opcintype might be a generic type - * such as ANYARRAY, and so what we have to test is whether the - * two actual column types can be concurrently cast to that type. - * (Otherwise, we'd fail to reject combinations such as int[] and - * point[].) + * This is a bit tricky because opcintype might be a polymorphic + * type such as ANYARRAY or ANYENUM; so what we have to test is + * whether the two actual column types can be concurrently cast to + * that type. (Otherwise, we'd fail to reject combinations such + * as int[] and point[].) */ Oid input_typeids[2]; Oid target_typeids[2]; @@ -4784,7 +5118,8 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, * Reconstruct a RangeVar for my relation (not passed in, unfortunately). */ myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), - pstrdup(RelationGetRelationName(rel))); + pstrdup(RelationGetRelationName(rel)), + -1); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -5182,12 +5517,13 @@ ATPrepAlterColumnType(List **wqueue, transform, exprType(transform), targettype, targettypmod, COERCION_ASSIGNMENT, - COERCE_IMPLICIT_CAST); + COERCE_IMPLICIT_CAST, + -1); if (transform == NULL) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("column \"%s\" cannot be cast to type \"%s\"", - colName, TypeNameToString(typename)))); + errmsg("column \"%s\" cannot be cast to type %s", + colName, format_type_be(targettype)))); /* * Add a work queue item to make ATRewriteTable update the column @@ -5280,12 +5616,13 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, defaultexpr, exprType(defaultexpr), targettype, targettypmod, COERCION_ASSIGNMENT, - COERCE_IMPLICIT_CAST); + COERCE_IMPLICIT_CAST, + -1); if (defaultexpr == NULL) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("default for column \"%s\" cannot be cast to type \"%s\"", - colName, TypeNameToString(typename)))); + errmsg("default for column \"%s\" cannot be cast to type %s", + colName, format_type_be(targettype)))); } else defaultexpr = NULL; @@ -5807,8 +6144,8 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) if (tuple_class->relowner != newOwnerId) { Datum repl_val[Natts_pg_class]; - char repl_null[Natts_pg_class]; - char repl_repl[Natts_pg_class]; + bool repl_null[Natts_pg_class]; + bool repl_repl[Natts_pg_class]; Acl *newAcl; Datum aclDatum; bool isNull; @@ -5840,10 +6177,10 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) } } - memset(repl_null, ' ', sizeof(repl_null)); - memset(repl_repl, ' ', sizeof(repl_repl)); + memset(repl_null, false, sizeof(repl_null)); + memset(repl_repl, false, sizeof(repl_repl)); - repl_repl[Anum_pg_class_relowner - 1] = 'r'; + repl_repl[Anum_pg_class_relowner - 1] = true; repl_val[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(newOwnerId); /* @@ -5857,11 +6194,11 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) { newAcl = aclnewowner(DatumGetAclP(aclDatum), tuple_class->relowner, newOwnerId); - repl_repl[Anum_pg_class_relacl - 1] = 'r'; + repl_repl[Anum_pg_class_relacl - 1] = true; repl_val[Anum_pg_class_relacl - 1] = PointerGetDatum(newAcl); } - newtuple = heap_modifytuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl); + newtuple = heap_modify_tuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl); simple_heap_update(class_rel, &newtuple->t_self, newtuple); CatalogUpdateIndexes(class_rel, newtuple); @@ -6073,8 +6410,8 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset) bool isnull; Datum newOptions; Datum repl_val[Natts_pg_class]; - char repl_null[Natts_pg_class]; - char repl_repl[Natts_pg_class]; + bool repl_null[Natts_pg_class]; + bool repl_repl[Natts_pg_class]; if (defList == NIL) return; /* nothing to do */ @@ -6118,17 +6455,17 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset) * propagated into relcaches during post-commit cache inval. */ memset(repl_val, 0, sizeof(repl_val)); - memset(repl_null, ' ', sizeof(repl_null)); - memset(repl_repl, ' ', sizeof(repl_repl)); + memset(repl_null, false, sizeof(repl_null)); + memset(repl_repl, false, sizeof(repl_repl)); if (newOptions != (Datum) 0) repl_val[Anum_pg_class_reloptions - 1] = newOptions; else - repl_null[Anum_pg_class_reloptions - 1] = 'n'; + repl_null[Anum_pg_class_reloptions - 1] = true; - repl_repl[Anum_pg_class_reloptions - 1] = 'r'; + repl_repl[Anum_pg_class_reloptions - 1] = true; - newtuple = heap_modifytuple(tuple, RelationGetDescr(pgclass), + newtuple = heap_modify_tuple(tuple, RelationGetDescr(pgclass), repl_val, repl_null, repl_repl); simple_heap_update(pgclass, &newtuple->t_self, newtuple); @@ -6153,11 +6490,13 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace) Oid oldTableSpace; Oid reltoastrelid; Oid reltoastidxid; + Oid newrelfilenode; RelFileNode newrnode; SMgrRelation dstrel; Relation pg_class; HeapTuple tuple; Form_pg_class rd_rel; + ForkNumber forkNum; /* * Need lock here in case we are recursing to toast table or index @@ -6213,29 +6552,59 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace) elog(ERROR, "cache lookup failed for relation %u", tableOid); rd_rel = (Form_pg_class) GETSTRUCT(tuple); - /* create another storage file. Is it a little ugly ? */ - /* NOTE: any conflict in relfilenode value will be caught here */ + /* + * Since we copy the file directly without looking at the shared buffers, + * we'd better first flush out any pages of the source relation that are + * in shared buffers. We assume no new changes will be made while we are + * holding exclusive lock on the rel. + */ + FlushRelationBuffers(rel); + + /* + * Relfilenodes are not unique across tablespaces, so we need to allocate + * a new one in the new tablespace. + */ + newrelfilenode = GetNewRelFileNode(newTableSpace, + rel->rd_rel->relisshared, + NULL); + + /* Open old and new relation */ newrnode = rel->rd_node; + newrnode.relNode = newrelfilenode; newrnode.spcNode = newTableSpace; - dstrel = smgropen(newrnode); - smgrcreate(dstrel, rel->rd_istemp, false); - /* copy relation data to the new physical file */ - copy_relation_data(rel, dstrel); - - /* schedule unlinking old physical file */ RelationOpenSmgr(rel); - smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp); /* - * Now drop smgr references. The source was already dropped by - * smgrscheduleunlink. + * Create and copy all forks of the relation, and schedule unlinking + * of old physical files. + * + * NOTE: any conflict in relfilenode value will be caught in + * RelationCreateStorage(). */ + RelationCreateStorage(newrnode, rel->rd_istemp); + + /* copy main fork */ + copy_relation_data(rel->rd_smgr, dstrel, MAIN_FORKNUM, rel->rd_istemp); + + /* copy those extra forks that exist */ + for (forkNum = MAIN_FORKNUM + 1; forkNum <= MAX_FORKNUM; forkNum++) + { + if (smgrexists(rel->rd_smgr, forkNum)) + { + smgrcreate(dstrel, forkNum, false); + copy_relation_data(rel->rd_smgr, dstrel, forkNum, rel->rd_istemp); + } + } + + /* drop old relation, and close new one */ + RelationDropStorage(rel); smgrclose(dstrel); /* update the pg_class row */ rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace; + rd_rel->relfilenode = newrelfilenode; simple_heap_update(pg_class, &tuple->t_self, tuple); CatalogUpdateIndexes(pg_class, tuple); @@ -6259,47 +6628,37 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace) * Copy data, block by block */ static void -copy_relation_data(Relation rel, SMgrRelation dst) +copy_relation_data(SMgrRelation src, SMgrRelation dst, + ForkNumber forkNum, bool istemp) { - SMgrRelation src; bool use_wal; BlockNumber nblocks; BlockNumber blkno; char buf[BLCKSZ]; Page page = (Page) buf; - /* - * Since we copy the file directly without looking at the shared buffers, - * we'd better first flush out any pages of the source relation that are - * in shared buffers. We assume no new changes will be made while we are - * holding exclusive lock on the rel. - */ - FlushRelationBuffers(rel); - /* * We need to log the copied data in WAL iff WAL archiving is enabled AND * it's not a temp rel. */ - use_wal = XLogArchivingActive() && !rel->rd_istemp; + use_wal = XLogArchivingActive() && !istemp; - nblocks = RelationGetNumberOfBlocks(rel); - /* RelationGetNumberOfBlocks will certainly have opened rd_smgr */ - src = rel->rd_smgr; + nblocks = smgrnblocks(src, forkNum); for (blkno = 0; blkno < nblocks; blkno++) { - smgrread(src, blkno, buf); + smgrread(src, forkNum, blkno, buf); /* XLOG stuff */ if (use_wal) - log_newpage(&dst->smgr_rnode, blkno, page); + log_newpage(&dst->smgr_rnode, forkNum, blkno, page); /* * Now write the page. We say isTemp = true even if it's not a temp * rel, because there's no need for smgr to schedule an fsync for this * write; we'll do it ourselves below. */ - smgrextend(dst, blkno, buf, true); + smgrextend(dst, forkNum, blkno, buf, true); } /* @@ -6316,8 +6675,8 @@ copy_relation_data(Relation rel, SMgrRelation dst) * wouldn't replay our earlier WAL entries. If we do not fsync those pages * here, they might still not be on disk when the crash occurs. */ - if (!rel->rd_istemp) - smgrimmedsync(dst); + if (!istemp) + smgrimmedsync(dst, forkNum); } /* @@ -6933,7 +7292,8 @@ ATExecDropInherit(Relation rel, RangeVar *parent) * Note: caller must have checked ownership of the relation already */ void -AlterTableNamespace(RangeVar *relation, const char *newschema) +AlterTableNamespace(RangeVar *relation, const char *newschema, + ObjectType stmttype) { Relation rel; Oid relid; @@ -6946,6 +7306,36 @@ AlterTableNamespace(RangeVar *relation, const char *newschema) relid = RelationGetRelid(rel); oldNspOid = RelationGetNamespace(rel); + /* Check relation type against type specified in the ALTER command */ + switch (stmttype) + { + case OBJECT_TABLE: + /* + * For mostly-historical reasons, we allow ALTER TABLE to apply + * to all relation types. + */ + break; + + case OBJECT_SEQUENCE: + if (rel->rd_rel->relkind != RELKIND_SEQUENCE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a sequence", + RelationGetRelationName(rel)))); + break; + + case OBJECT_VIEW: + if (rel->rd_rel->relkind != RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a view", + RelationGetRelationName(rel)))); + break; + + default: + elog(ERROR, "unrecognized object type: %d", (int) stmttype); + } + /* Can we change the schema of this tuple? */ switch (rel->rd_rel->relkind) {