]> granicus.if.org Git - postgresql/blobdiff - src/backend/parser/parse_utilcmd.c
Prevent adding relations to a concurrently dropped schema.
[postgresql] / src / backend / parser / parse_utilcmd.c
index 0078814905d12ed8638af2d39d18274dba3edbba..99157c534934a5418d52354e6746814bc71cf0c2 100644 (file)
@@ -16,7 +16,7 @@
  * a quick copyObject() call before manipulating the query tree.
  *
  *
- * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *     src/backend/parser/parse_utilcmd.c
@@ -26,8 +26,6 @@
 
 #include "postgres.h"
 
-#include "access/genam.h"
-#include "access/heapam.h"
 #include "access/reloptions.h"
 #include "catalog/dependency.h"
 #include "catalog/heap.h"
 #include "parser/parse_utilcmd.h"
 #include "parser/parser.h"
 #include "rewrite/rewriteManip.h"
-#include "storage/lock.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
-#include "utils/relcache.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
@@ -105,8 +102,8 @@ static void transformColumnDefinition(CreateStmtContext *cxt,
                                                  ColumnDef *column);
 static void transformTableConstraint(CreateStmtContext *cxt,
                                                 Constraint *constraint);
-static void transformInhRelation(CreateStmtContext *cxt,
-                                        InhRelation *inhrelation);
+static void transformTableLikeClause(CreateStmtContext *cxt,
+                                        TableLikeClause *table_like_clause);
 static void transformOfType(CreateStmtContext *cxt,
                                TypeName *ofTypename);
 static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt);
@@ -148,6 +145,8 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
        List       *result;
        List       *save_alist;
        ListCell   *elements;
+       Oid                     namespaceid;
+       Oid                     existing_relid;
 
        /*
         * We must not scribble on the passed-in CreateStmt, so copy it.  (This is
@@ -155,6 +154,29 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
         */
        stmt = (CreateStmt *) copyObject(stmt);
 
+       /*
+        * Look up the creation namespace.      This also checks permissions on the
+        * target namespace, locks it against concurrent drops, checks for a
+        * preexisting relation in that namespace with the same name, and updates
+        * stmt->relation->relpersistence if the select namespace is temporary.
+        */
+       namespaceid =
+               RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock,
+                                                                                        &existing_relid);
+
+       /*
+        * If the relation already exists and the user specified "IF NOT EXISTS",
+        * bail out with a NOTICE.
+        */
+       if (stmt->if_not_exists && OidIsValid(existing_relid))
+       {
+               ereport(NOTICE,
+                               (errcode(ERRCODE_DUPLICATE_TABLE),
+                                errmsg("relation \"%s\" already exists, skipping",
+                                               stmt->relation->relname)));
+               return NIL;
+       }
+
        /*
         * If the target relation name isn't schema-qualified, make it so.  This
         * prevents some corner cases in which added-on rewritten commands might
@@ -164,11 +186,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
         */
        if (stmt->relation->schemaname == NULL
                && stmt->relation->relpersistence != RELPERSISTENCE_TEMP)
-       {
-               Oid                     namespaceid = RangeVarGetCreationNamespace(stmt->relation);
-
                stmt->relation->schemaname = get_namespace_name(namespaceid);
-       }
 
        /* Set up pstate and CreateStmtContext */
        pstate = make_parsestate(NULL);
@@ -216,8 +234,8 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
                                transformTableConstraint(&cxt, (Constraint *) element);
                                break;
 
-                       case T_InhRelation:
-                               transformInhRelation(&cxt, (InhRelation *) element);
+                       case T_TableLikeClause:
+                               transformTableLikeClause(&cxt, (TableLikeClause *) element);
                                break;
 
                        default:
@@ -283,7 +301,14 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
        {
                char       *typname = strVal(linitial(column->typeName->names));
 
-               if (strcmp(typname, "serial") == 0 ||
+               if (strcmp(typname, "smallserial") == 0 ||
+                       strcmp(typname, "serial2") == 0)
+               {
+                       is_serial = true;
+                       column->typeName->names = NIL;
+                       column->typeName->typeOid = INT2OID;
+               }
+               else if (strcmp(typname, "serial") == 0 ||
                        strcmp(typname, "serial4") == 0)
                {
                        is_serial = true;
@@ -343,7 +368,10 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
                if (cxt->rel)
                        snamespaceid = RelationGetNamespace(cxt->rel);
                else
+               {
                        snamespaceid = RangeVarGetCreationNamespace(cxt->relation);
+                       RangeVarAdjustRelationPersistence(cxt->relation, snamespaceid);
+               }
                snamespace = get_namespace_name(snamespaceid);
                sname = ChooseRelationName(cxt->relation->relname,
                                                                   column->colname,
@@ -524,6 +552,31 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
                                break;
                }
        }
+
+       /*
+        * Generate ALTER FOREIGN TABLE ALTER COLUMN statement which adds 
+        * per-column foreign data wrapper options for this column.
+        */
+       if (column->fdwoptions != NIL)
+       {
+               AlterTableStmt *stmt;
+               AlterTableCmd  *cmd;
+
+               cmd = makeNode(AlterTableCmd);
+               cmd->subtype = AT_AlterColumnGenericOptions;
+               cmd->name = column->colname;
+               cmd->def = (Node *) column->fdwoptions;
+               cmd->behavior = DROP_RESTRICT;
+               cmd->missing_ok = false;
+
+               stmt = makeNode(AlterTableStmt);
+               stmt->relation = cxt->relation;
+               stmt->cmds = NIL;
+               stmt->relkind = OBJECT_FOREIGN_TABLE;
+               stmt->cmds = lappend(stmt->cmds, cmd);
+
+               cxt->alist = lappend(cxt->alist, stmt);
+       }
 }
 
 /*
@@ -568,14 +621,14 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint)
 }
 
 /*
- * transformInhRelation
+ * transformTableLikeClause
  *
- * Change the LIKE <subtable> portion of a CREATE TABLE statement into
+ * Change the LIKE <srctable> portion of a CREATE TABLE statement into
  * column definitions which recreate the user defined column portions of
- * <subtable>.
+ * <srctable>.
  */
 static void
-transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation)
+transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_clause)
 {
        AttrNumber      parent_attno;
        Relation        relation;
@@ -584,17 +637,19 @@ transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation)
        AclResult       aclresult;
        char       *comment;
 
-       relation = parserOpenTable(cxt->pstate, inhRelation->relation,
+       relation = parserOpenTable(cxt->pstate, table_like_clause->relation,
                                                           AccessShareLock);
 
-       if (relation->rd_rel->relkind != RELKIND_RELATION)
+       if (relation->rd_rel->relkind != RELKIND_RELATION
+               && relation->rd_rel->relkind != RELKIND_VIEW
+               && relation->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
                ereport(ERROR,
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                errmsg("inherited relation \"%s\" is not a table",
-                                               inhRelation->relation->relname)));
+                                errmsg("LIKE source relation \"%s\" is not a table, view, or foreign table",
+                                               table_like_clause->relation->relname)));
 
        /*
-        * Check for SELECT privilages
+        * Check for SELECT privileges
         */
        aclresult = pg_class_aclcheck(RelationGetRelid(relation), GetUserId(),
                                                                  ACL_SELECT);
@@ -651,7 +706,7 @@ transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation)
                 * Copy default, if present and the default has been requested
                 */
                if (attribute->atthasdef &&
-                       (inhRelation->options & CREATE_TABLE_LIKE_DEFAULTS))
+                       (table_like_clause->options & CREATE_TABLE_LIKE_DEFAULTS))
                {
                        Node       *this_default = NULL;
                        AttrDefault *attrdef;
@@ -679,13 +734,13 @@ transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation)
                }
 
                /* Likewise, copy storage if requested */
-               if (inhRelation->options & CREATE_TABLE_LIKE_STORAGE)
+               if (table_like_clause->options & CREATE_TABLE_LIKE_STORAGE)
                        def->storage = attribute->attstorage;
                else
                        def->storage = 0;
 
                /* Likewise, copy comment if requested */
-               if ((inhRelation->options & CREATE_TABLE_LIKE_COMMENTS) &&
+               if ((table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) &&
                        (comment = GetComment(attribute->attrelid,
                                                                  RelationRelationId,
                                                                  attribute->attnum)) != NULL)
@@ -707,7 +762,7 @@ transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation)
         * Copy CHECK constraints if requested, being careful to adjust attribute
         * numbers
         */
-       if ((inhRelation->options & CREATE_TABLE_LIKE_CONSTRAINTS) &&
+       if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) &&
                tupleDesc->constr)
        {
                AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns);
@@ -730,7 +785,7 @@ transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation)
                        cxt->ckconstraints = lappend(cxt->ckconstraints, n);
 
                        /* Copy comment on constraint */
-                       if ((inhRelation->options & CREATE_TABLE_LIKE_COMMENTS) &&
+                       if ((table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) &&
                                (comment = GetComment(get_constraint_oid(RelationGetRelid(relation),
                                                                                                                 n->conname, false),
                                                                          ConstraintRelationId,
@@ -753,7 +808,7 @@ transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation)
        /*
         * Likewise, copy indexes if requested
         */
-       if ((inhRelation->options & CREATE_TABLE_LIKE_INDEXES) &&
+       if ((table_like_clause->options & CREATE_TABLE_LIKE_INDEXES) &&
                relation->rd_rel->relhasindex)
        {
                AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns);
@@ -774,7 +829,7 @@ transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation)
                        index_stmt = generateClonedIndexStmt(cxt, parent_index, attmap);
 
                        /* Copy comment on index */
-                       if (inhRelation->options & CREATE_TABLE_LIKE_COMMENTS)
+                       if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS)
                        {
                                comment = GetComment(parent_index_oid, RelationRelationId, 0);
 
@@ -821,7 +876,6 @@ static void
 transformOfType(CreateStmtContext *cxt, TypeName *ofTypename)
 {
        HeapTuple       tuple;
-       Form_pg_type typ;
        TupleDesc       tupdesc;
        int                     i;
        Oid                     ofTypeId;
@@ -830,7 +884,6 @@ transformOfType(CreateStmtContext *cxt, TypeName *ofTypename)
 
        tuple = typenameType(NULL, ofTypename, NULL);
        check_of_type(tuple);
-       typ = (Form_pg_type) GETSTRUCT(tuple);
        ofTypeId = HeapTupleGetOid(tuple);
        ofTypename->typeOid = ofTypeId;         /* cached for later */
 
@@ -1459,21 +1512,21 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("\"%s\" is not a unique index", index_name),
-                                        errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
+                                        errdetail("Cannot create a primary key or unique constraint using such an index."),
                                         parser_errposition(cxt->pstate, constraint->location)));
 
                if (RelationGetIndexExpressions(index_rel) != NIL)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("index \"%s\" contains expressions", index_name),
-                                        errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
+                                        errdetail("Cannot create a primary key or unique constraint using such an index."),
                                         parser_errposition(cxt->pstate, constraint->location)));
 
                if (RelationGetIndexPredicate(index_rel) != NIL)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("\"%s\" is a partial index", index_name),
-                                        errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
+                                        errdetail("Cannot create a primary key or unique constraint using such an index."),
                                         parser_errposition(cxt->pstate, constraint->location)));
 
                /*
@@ -1498,7 +1551,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
                if (index_rel->rd_rel->relam != get_am_oid(DEFAULT_INDEX_TYPE, false))
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                        errmsg("index \"%s\" is not a b-tree", index_name),
+                                        errmsg("index \"%s\" is not a btree", index_name),
                                         parser_errposition(cxt->pstate, constraint->location)));
 
                /* Must get indclass the hard way */
@@ -1543,7 +1596,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
                                ereport(ERROR,
                                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                                 errmsg("index \"%s\" does not have default sorting behavior", index_name),
-                                                errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
+                                                errdetail("Cannot create a primary key or unique constraint using such an index."),
                                         parser_errposition(cxt->pstate, constraint->location)));
 
                        constraint->keys = lappend(constraint->keys, makeString(attname));
@@ -1729,7 +1782,8 @@ transformFKConstraints(CreateStmtContext *cxt,
 
        /*
         * If CREATE TABLE or adding a column with NULL default, we can safely
-        * skip validation of the constraint.
+        * skip validation of FK constraints, and nonetheless mark them valid.
+        * (This will override any user-supplied NOT VALID flag.)
         */
        if (skipValidation)
        {
@@ -2388,8 +2442,8 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
  * and detect inconsistent/misplaced constraint attributes.
  *
  * NOTE: currently, attributes are only supported for FOREIGN KEY, UNIQUE,
- * and PRIMARY KEY constraints, but someday they ought to be supported
- * for other constraint types.
+ * EXCLUSION, and PRIMARY KEY constraints, but someday they ought to be
+ * supported for other constraint types.
  */
 static void
 transformConstraintAttrs(CreateStmtContext *cxt, List *constraintList)
@@ -2520,8 +2574,8 @@ transformColumnType(CreateStmtContext *cxt, ColumnDef *column)
                Form_pg_type typtup = (Form_pg_type) GETSTRUCT(ctype);
 
                LookupCollation(cxt->pstate,
-                                                                 column->collClause->collname,
-                                                                 column->collClause->location);
+                                               column->collClause->collname,
+                                               column->collClause->location);
                /* Complain if COLLATE is applied to an uncollatable type */
                if (!OidIsValid(typtup->typcollation))
                        ereport(ERROR,