]> granicus.if.org Git - postgresql/commitdiff
Prevent adding relations to a concurrently dropped schema.
authorRobert Haas <rhaas@postgresql.org>
Mon, 16 Jan 2012 14:34:21 +0000 (09:34 -0500)
committerRobert Haas <rhaas@postgresql.org>
Mon, 16 Jan 2012 14:49:34 +0000 (09:49 -0500)
In the previous coding, it was possible for a relation to be created
via CREATE TABLE, CREATE VIEW, CREATE SEQUENCE, CREATE FOREIGN TABLE,
etc.  in a schema while that schema was meanwhile being concurrently
dropped.  This led to a pg_class entry with an invalid relnamespace
value.  The same problem could occur if a relation was moved using
ALTER .. SET SCHEMA while the target schema was being concurrently
dropped.  This patch prevents both of those scenarios by locking the
schema to which the relation is being added using AccessShareLock,
which conflicts with the AccessExclusiveLock taken by DROP.

As a desirable side effect, this also prevents the use of CREATE OR
REPLACE VIEW to queue for an AccessExclusiveLock on a relation on which
you have no rights: that will now fail immediately with a permissions
error, before trying to obtain a lock.

We need similar protection for all other object types, but as everything
other than relations uses a slightly different set of code paths, I'm
leaving that for a separate commit.

Original complaint (as far as I could find) about CREATE by Nikhil
Sontakke; risk for ALTER .. SET SCHEMA pointed out by Tom Lane;
further details by Dan Farina; patch by me; review by Hitoshi Harada.

src/backend/catalog/namespace.c
src/backend/commands/tablecmds.c
src/backend/commands/typecmds.c
src/backend/commands/view.c
src/backend/executor/execMain.c
src/backend/parser/parse_utilcmd.c
src/include/catalog/namespace.h

index a9a64fe5f21d8141f6e70e6118ee8369ef70d8f4..80d6fc7d0c2c721b43751cfe0d1d7233796f7d31 100644 (file)
@@ -480,31 +480,131 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
 
 /*
  * RangeVarGetAndCheckCreationNamespace
- *             As RangeVarGetCreationNamespace, but with a permissions check.
+ *
+ * This function returns the OID of the namespace in which a new relation
+ * with a given name should be created.  If the user does not have CREATE
+ * permission on the target namespace, this function will instead signal
+ * an ERROR.
+ *
+ * If non-NULL, *existing_oid is set to the OID of any existing relation with
+ * the same name which already exists in that namespace, or to InvalidOid if
+ * no such relation exists.
+ *
+ * If lockmode != NoLock, the specified lock mode is acquire on the existing
+ * relation, if any, provided that the current user owns the target relation.
+ * However, if lockmode != NoLock and the user does not own the target
+ * relation, we throw an ERROR, as we must not try to lock relations the
+ * user does not have permissions on.
+ *
+ * As a side effect, this function acquires AccessShareLock on the target
+ * namespace.  Without this, the namespace could be dropped before our
+ * transaction commits, leaving behind relations with relnamespace pointing
+ * to a no-longer-exstant namespace.
+ *
+ * As a further side-effect, if the select namespace is a temporary namespace,
+ * we mark the RangeVar as RELPERSISTENCE_TEMP.
  */
 Oid
-RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation)
+RangeVarGetAndCheckCreationNamespace(RangeVar *relation,
+                                                                        LOCKMODE lockmode,
+                                                                        Oid *existing_relation_id)
 {
-       Oid                     namespaceId;
+       uint64          inval_count;
+       Oid                     relid;
+       Oid                     oldrelid = InvalidOid;
+       Oid                     nspid;
+       Oid                     oldnspid = InvalidOid;
+       bool            retry = false;
 
-       namespaceId = RangeVarGetCreationNamespace(newRelation);
+       /*
+        * We check the catalog name and then ignore it.
+        */
+       if (relation->catalogname)
+       {
+               if (strcmp(relation->catalogname, get_database_name(MyDatabaseId)) != 0)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("cross-database references are not implemented: \"%s.%s.%s\"",
+                                                       relation->catalogname, relation->schemaname,
+                                                       relation->relname)));
+       }
 
        /*
-        * Check we have permission to create there. Skip check if bootstrapping,
-        * since permissions machinery may not be working yet.
+        * As in RangeVarGetRelidExtended(), we guard against concurrent DDL
+        * operations by tracking whether any invalidation messages are processed
+        * while we're doing the name lookups and acquiring locks.  See comments
+        * in that function for a more detailed explanation of this logic.
         */
-       if (!IsBootstrapProcessingMode())
+       for (;;)
        {
                AclResult       aclresult;
 
-               aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
-                                                                                 ACL_CREATE);
+               inval_count = SharedInvalidMessageCounter;
+
+               /* Look up creation namespace and check for existing relation. */
+               nspid = RangeVarGetCreationNamespace(relation);
+               Assert(OidIsValid(nspid));
+               if (existing_relation_id != NULL)
+                       relid = get_relname_relid(relation->relname, nspid);
+               else
+                       relid = InvalidOid;
+
+               /*
+                * In bootstrap processing mode, we don't bother with permissions
+                * or locking.  Permissions might not be working yet, and locking is
+                * unnecessary.
+                */
+               if (IsBootstrapProcessingMode())
+                       break;
+
+               /* Check namespace permissions. */
+               aclresult = pg_namespace_aclcheck(nspid, GetUserId(), ACL_CREATE);
                if (aclresult != ACLCHECK_OK)
                        aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
-                                                  get_namespace_name(namespaceId));
+                                                  get_namespace_name(nspid));
+
+               if (retry)
+               {
+                       /* If nothing changed, we're done. */
+                       if (relid == oldrelid && nspid == oldnspid)
+                               break;
+                       /* If creation namespace has changed, give up old lock. */
+                       if (nspid != oldnspid)
+                               UnlockDatabaseObject(NamespaceRelationId, oldnspid, 0,
+                                                                        AccessShareLock);
+                       /* If name points to something different, give up old lock. */
+                       if (relid != oldrelid && OidIsValid(oldrelid) && lockmode != NoLock)
+                               UnlockRelationOid(oldrelid, lockmode);
+               }
+
+               /* Lock namespace. */
+               if (nspid != oldnspid)
+                       LockDatabaseObject(NamespaceRelationId, nspid, 0, AccessShareLock);
+
+               /* Lock relation, if required if and we have permission. */
+               if (lockmode != NoLock && OidIsValid(relid))
+               {
+                       if (!pg_class_ownercheck(relid, GetUserId()))
+                               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+                                                          relation->relname);
+                       if (relid != oldrelid)
+                               LockRelationOid(relid, lockmode);
+               }
+
+               /* If no invalidation message were processed, we're done! */
+               if (inval_count == SharedInvalidMessageCounter)
+                       break;
+
+               /* Something may have changed, so recheck our work. */
+               retry = true;
+               oldrelid = relid;
+               oldnspid = nspid;
        }
 
-       return namespaceId;
+       RangeVarAdjustRelationPersistence(relation, nspid);
+       if (existing_relation_id != NULL)
+               *existing_relation_id = relid;
+       return nspid;
 }
 
 /*
index c37301671e4a8f45f58d15ac54c224a71f580eb3..d0843b2f5889f5319b70b08b1853a1203c5b0111 100644 (file)
@@ -451,10 +451,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
 
        /*
         * Look up the namespace in which we are supposed to create the relation,
-        * and check we have permission to create there.
+        * check we have permission to create there, lock it against concurrent
+        * drop, and mark stmt->relation as RELPERSISTENCE_TEMP if a temporary
+        * namespace is selected.
         */
-       namespaceId = RangeVarGetAndCheckCreationNamespace(stmt->relation);
-       RangeVarAdjustRelationPersistence(stmt->relation, namespaceId);
+       namespaceId =
+               RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock, NULL);
 
        /*
         * Security check: disallow creating temp tables from security-restricted
@@ -9417,6 +9419,7 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt)
        Oid                     oldNspOid;
        Oid                     nspOid;
        Relation        classRel;
+       RangeVar   *newrv;
 
        relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
                                                                         false, false,
@@ -9441,8 +9444,9 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt)
                                                get_rel_name(tableId))));
        }
 
-       /* get schema OID and check its permissions */
-       nspOid = LookupCreationNamespace(stmt->newschema);
+       /* Get and lock schema OID and check its permissions. */
+       newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1);
+       nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL);
 
        /* common checks on switching namespaces */
        CheckSetNamespace(oldNspOid, nspOid, RelationRelationId, relid);
index 0f8af31feeff0953bbd7783b2e53fc7e6ac4b8a4..0043bf1fee29ec66829951199ebf55f82838b63a 100644 (file)
@@ -2005,7 +2005,8 @@ DefineCompositeType(const RangeVar *typevar, List *coldeflist)
         * check is here mainly to get a better error message about a "type"
         * instead of below about a "relation".
         */
-       typeNamespace = RangeVarGetCreationNamespace(createStmt->relation);
+       typeNamespace = RangeVarGetAndCheckCreationNamespace(createStmt->relation,
+                                                                                                                NoLock, NULL);
        RangeVarAdjustRelationPersistence(createStmt->relation, typeNamespace);
        old_type_oid =
                GetSysCacheOid2(TYPENAMENSP,
index ff9c44908a3af8d9e07d901a1c0b62eda36c6533..c3520ae03c677b323e2a9cb4821a082f2bbff21d 100644 (file)
@@ -98,10 +98,12 @@ isViewOnTempTable_walker(Node *node, void *context)
  *---------------------------------------------------------------------
  */
 static Oid
-DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
-                                         Oid namespaceId, List *options)
+DefineVirtualRelation(RangeVar *relation, List *tlist, bool replace,
+                                         List *options)
 {
        Oid                     viewOid;
+       Oid                     namespaceId;
+       LOCKMODE        lockmode;
        CreateStmt *createStmt = makeNode(CreateStmt);
        List       *attrList;
        ListCell   *t;
@@ -159,9 +161,14 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
                                 errmsg("view must have at least one column")));
 
        /*
-        * Check to see if we want to replace an existing view.
+        * Look up, check permissions on, and lock the creation namespace; also
+        * check for a preexisting view with the same name.  This will also set
+        * relation->relpersistence to RELPERSISTENCE_TEMP if the selected
+        * namespace is temporary.
         */
-       viewOid = get_relname_relid(relation->relname, namespaceId);
+       lockmode = replace ? AccessExclusiveLock : NoLock;
+       namespaceId =
+               RangeVarGetAndCheckCreationNamespace(relation, lockmode, &viewOid);
 
        if (OidIsValid(viewOid) && replace)
        {
@@ -170,24 +177,16 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
                List       *atcmds = NIL;
                AlterTableCmd *atcmd;
 
-               /*
-                * Yes.  Get exclusive lock on the existing view ...
-                */
-               rel = relation_open(viewOid, AccessExclusiveLock);
+               /* Relation is already locked, but we must build a relcache entry. */
+               rel = relation_open(viewOid, NoLock);
 
-               /*
-                * Make sure it *is* a view, and do permissions checks.
-                */
+               /* Make sure it *is* a view. */
                if (rel->rd_rel->relkind != RELKIND_VIEW)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("\"%s\" is not a view",
                                                        RelationGetRelationName(rel))));
 
-               if (!pg_class_ownercheck(viewOid, GetUserId()))
-                       aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
-                                                  RelationGetRelationName(rel));
-
                /* Also check it's not in use already */
                CheckTableNotInUse(rel, "CREATE OR REPLACE VIEW");
 
@@ -428,7 +427,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
 {
        Query      *viewParse;
        Oid                     viewOid;
-       Oid                     namespaceId;
        RangeVar   *view;
 
        /*
@@ -514,10 +512,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
                                                view->relname)));
        }
 
-       /* Might also need to make it temporary if placed in temp schema. */
-       namespaceId = RangeVarGetCreationNamespace(view);
-       RangeVarAdjustRelationPersistence(view, namespaceId);
-
        /*
         * Create the view relation
         *
@@ -525,7 +519,7 @@ DefineView(ViewStmt *stmt, const char *queryString)
         * aborted.
         */
        viewOid = DefineVirtualRelation(view, viewParse->targetList,
-                                                                       stmt->replace, namespaceId, stmt->options);
+                                                                       stmt->replace, stmt->options);
 
        /*
         * The relation we have just created is not visible to any other commands
index 569d0ba7ade1e0cd64c5566f2ecbbd38773f2b25..422f737e82dcb1202d3f9e927fe87f06f6b33c7b 100644 (file)
@@ -2532,11 +2532,13 @@ OpenIntoRel(QueryDesc *queryDesc)
        }
 
        /*
-        * Find namespace to create in, check its permissions
+        * Find namespace to create in, check its permissions, lock it against
+        * concurrent drop, and mark into->rel as RELPERSISTENCE_TEMP if the
+        * selected namespace is temporary.
         */
        intoName = into->rel->relname;
-       namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel);
-       RangeVarAdjustRelationPersistence(into->rel, namespaceId);
+       namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel, NoLock,
+                                                                                                          NULL);
 
        /*
         * Security check: disallow creating temp tables from security-restricted
index 335bdc6b0754aabb0bbcac43dcfeb8c794817c32..99157c534934a5418d52354e6746814bc71cf0c2 100644 (file)
@@ -146,6 +146,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
        List       *save_alist;
        ListCell   *elements;
        Oid                     namespaceid;
+       Oid                     existing_relid;
 
        /*
         * We must not scribble on the passed-in CreateStmt, so copy it.  (This is
@@ -155,30 +156,25 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
 
        /*
         * Look up the creation namespace.      This also checks permissions on the
-        * target namespace, so that we throw any permissions error as early as
-        * possible.
+        * target namespace, locks it against concurrent drops, checks for a
+        * preexisting relation in that namespace with the same name, and updates
+        * stmt->relation->relpersistence if the select namespace is temporary.
         */
-       namespaceid = RangeVarGetAndCheckCreationNamespace(stmt->relation);
-       RangeVarAdjustRelationPersistence(stmt->relation, namespaceid);
+       namespaceid =
+               RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock,
+                                                                                        &existing_relid);
 
        /*
         * If the relation already exists and the user specified "IF NOT EXISTS",
         * bail out with a NOTICE.
         */
-       if (stmt->if_not_exists)
+       if (stmt->if_not_exists && OidIsValid(existing_relid))
        {
-               Oid                     existing_relid;
-
-               existing_relid = get_relname_relid(stmt->relation->relname,
-                                                                                  namespaceid);
-               if (existing_relid != InvalidOid)
-               {
-                       ereport(NOTICE,
-                                       (errcode(ERRCODE_DUPLICATE_TABLE),
-                                        errmsg("relation \"%s\" already exists, skipping",
-                                                       stmt->relation->relname)));
-                       return NIL;
-               }
+               ereport(NOTICE,
+                               (errcode(ERRCODE_DUPLICATE_TABLE),
+                                errmsg("relation \"%s\" already exists, skipping",
+                                               stmt->relation->relname)));
+               return NIL;
        }
 
        /*
index 37b259dc9f4739c9cd0228f9504390382bf5f524..fa3ba5bd102e6320346a9c9cf54ad8366b4cb737 100644 (file)
@@ -58,7 +58,9 @@ extern Oid    RangeVarGetRelidExtended(const RangeVar *relation,
                                                 RangeVarGetRelidCallback callback,
                                                 void *callback_arg);
 extern Oid     RangeVarGetCreationNamespace(const RangeVar *newRelation);
-extern Oid     RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
+extern Oid     RangeVarGetAndCheckCreationNamespace(RangeVar *newRelation,
+                                                                        LOCKMODE lockmode,
+                                                                        Oid *existing_relation_id);
 extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
 extern Oid     RelnameGetRelid(const char *relname);
 extern bool RelationIsVisible(Oid relid);