]> granicus.if.org Git - postgresql/commitdiff
Fix LOCK TABLE to eliminate the race condition that could make it give weird
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 12 May 2009 16:43:32 +0000 (16:43 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 12 May 2009 16:43:32 +0000 (16:43 +0000)
errors when tables are concurrently dropped.  To do this we must take lock
on each relation before we check its privileges.  The old code was trying
to do that the other way around, which is a bit pointless when there are lots
of other commands that lock relations before checking privileges.  I did keep
it checking each relation's privilege before locking the next relation, which
is a detail that ALTER TABLE isn't too picky about.

src/backend/access/heap/heapam.c
src/backend/commands/lockcmds.c
src/include/access/heapam.h

index 43af0c7e04875c02a8f8fb70da0864c2a12e9be2..272fb32c58f232456878e20b0a53ed6779b95a21 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.274 2009/01/20 18:59:36 heikki Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.275 2009/05/12 16:43:32 tgl Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -947,56 +947,6 @@ try_relation_open(Oid relationId, LOCKMODE lockmode)
        return r;
 }
 
-/* ----------------
- *             relation_open_nowait - open but don't wait for lock
- *
- *             Same as relation_open, except throw an error instead of waiting
- *             when the requested lock is not immediately obtainable.
- * ----------------
- */
-Relation
-relation_open_nowait(Oid relationId, LOCKMODE lockmode)
-{
-       Relation        r;
-
-       Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
-
-       /* Get the lock before trying to open the relcache entry */
-       if (lockmode != NoLock)
-       {
-               if (!ConditionalLockRelationOid(relationId, lockmode))
-               {
-                       /* try to throw error by name; relation could be deleted... */
-                       char       *relname = get_rel_name(relationId);
-
-                       if (relname)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-                                                errmsg("could not obtain lock on relation \"%s\"",
-                                                               relname)));
-                       else
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-                                         errmsg("could not obtain lock on relation with OID %u",
-                                                        relationId)));
-               }
-       }
-
-       /* The relcache does all the real work... */
-       r = RelationIdGetRelation(relationId);
-
-       if (!RelationIsValid(r))
-               elog(ERROR, "could not open relation with OID %u", relationId);
-
-       /* Make note that we've accessed a temporary relation */
-       if (r->rd_istemp)
-               MyXactAccessedTempRel = true;
-
-       pgstat_initstats(r);
-
-       return r;
-}
-
 /* ----------------
  *             relation_openrv - open any relation specified by a RangeVar
  *
index 84b152a20075851e212b61b7608bcc13d7db6d5b..1e9b5dd961a110e271289487292f63b6acaa34a0 100644 (file)
@@ -1,14 +1,14 @@
 /*-------------------------------------------------------------------------
  *
  * lockcmds.c
- *       Lock command support code
+ *       LOCK command support code
  *
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/lockcmds.c,v 1.23 2009/05/12 03:11:01 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/lockcmds.c,v 1.24 2009/05/12 16:43:32 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "commands/lockcmds.h"
 #include "miscadmin.h"
 #include "parser/parse_clause.h"
+#include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/lsyscache.h"
-#include "utils/rel.h"
+
+static void LockTableRecurse(Oid reloid, RangeVar *rv,
+                                                        LOCKMODE lockmode, bool nowait, bool recurse);
 
 
 /*
@@ -34,57 +37,127 @@ LockTableCommand(LockStmt *lockstmt)
        ListCell   *p;
 
        /*
-        * Iterate over the list and open, lock, and close the relations one at a
-        * time
+        * Iterate over the list and process the named relations one at a time
         */
-
        foreach(p, lockstmt->relations)
        {
-               RangeVar   *relation = lfirst(p);
-               Oid                     reloid;
+               RangeVar   *relation = (RangeVar *) lfirst(p);
                bool            recurse = interpretInhOption(relation->inhOpt);
-               List       *children_and_self;
-               ListCell   *child;
+               Oid                     reloid;
 
                reloid = RangeVarGetRelid(relation, false);
 
-               /* XXX NoLock here is not really a good idea */
-               if (recurse)
-                       children_and_self = find_all_inheritors(reloid, NoLock);
-               else
-                       children_and_self = list_make1_oid(reloid);
+               LockTableRecurse(reloid, relation,
+                                                lockstmt->mode, lockstmt->nowait, recurse);
+       }
+}
 
-               foreach(child, children_and_self)
+/*
+ * Apply LOCK TABLE recursively over an inheritance tree
+ *
+ * At top level, "rv" is the original command argument; we use it to throw
+ * an appropriate error message if the relation isn't there.  Below top level,
+ * "rv" is NULL and we should just silently ignore any dropped child rel.
+ */
+static void
+LockTableRecurse(Oid reloid, RangeVar *rv,
+                                LOCKMODE lockmode, bool nowait, bool recurse)
+{
+       Relation        rel;
+       AclResult       aclresult;
+
+       /*
+        * Acquire the lock.  We must do this first to protect against
+        * concurrent drops.  Note that a lock against an already-dropped
+        * relation's OID won't fail.
+        */
+       if (nowait)
+       {
+               if (!ConditionalLockRelationOid(reloid, lockmode))
                {
-                       Oid                     childreloid = lfirst_oid(child);
-                       Relation        rel;
-                       AclResult       aclresult;
-
-                       /* We don't want to open the relation until we've checked privilege. */
-                       if (lockstmt->mode == AccessShareLock)
-                               aclresult = pg_class_aclcheck(childreloid, GetUserId(),
-                                                                                         ACL_SELECT);
+                       /* try to throw error by name; relation could be deleted... */
+                       char       *relname = rv ? rv->relname : get_rel_name(reloid);
+
+                       if (relname)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                                errmsg("could not obtain lock on relation \"%s\"",
+                                                               relname)));
                        else
-                               aclresult = pg_class_aclcheck(childreloid, GetUserId(),
-                                                                                         ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE);
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                         errmsg("could not obtain lock on relation with OID %u",
+                                                        reloid)));
+               }
+       }
+       else
+               LockRelationOid(reloid, lockmode);
 
-                       if (aclresult != ACLCHECK_OK)
-                               aclcheck_error(aclresult, ACL_KIND_CLASS,
-                                                          get_rel_name(childreloid));
+       /*
+        * Now that we have the lock, check to see if the relation really exists
+        * or not.
+        */
+       rel = try_relation_open(reloid, NoLock);
 
-                       if (lockstmt->nowait)
-                               rel = relation_open_nowait(childreloid, lockstmt->mode);
-                       else
-                               rel = relation_open(childreloid, lockstmt->mode);
+       if (!rel)
+       {
+               /* Release useless lock */
+               UnlockRelationOid(reloid, lockmode);
 
-                       /* Currently, we only allow plain tables to be locked */
-                       if (rel->rd_rel->relkind != RELKIND_RELATION)
+               /* At top level, throw error; otherwise, ignore this child rel */
+               if (rv)
+               {
+                       if (rv->schemaname)
                                ereport(ERROR,
-                                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                                errmsg("\"%s\" is not a table",
-                                                               get_rel_name(childreloid))));
+                                               (errcode(ERRCODE_UNDEFINED_TABLE),
+                                                errmsg("relation \"%s.%s\" does not exist",
+                                                               rv->schemaname, rv->relname)));
+                       else
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_UNDEFINED_TABLE),
+                                                errmsg("relation \"%s\" does not exist",
+                                                               rv->relname)));
+               }
+
+               return;
+       }
 
-                       relation_close(rel, NoLock);    /* close rel, keep lock */
+       /* Verify adequate privilege */
+       if (lockmode == AccessShareLock)
+               aclresult = pg_class_aclcheck(reloid, GetUserId(),
+                                                                         ACL_SELECT);
+       else
+               aclresult = pg_class_aclcheck(reloid, GetUserId(),
+                                                                         ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE);
+       if (aclresult != ACLCHECK_OK)
+               aclcheck_error(aclresult, ACL_KIND_CLASS,
+                                          RelationGetRelationName(rel));
+
+       /* Currently, we only allow plain tables to be locked */
+       if (rel->rd_rel->relkind != RELKIND_RELATION)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("\"%s\" is not a table",
+                                               RelationGetRelationName(rel))));
+
+       /*
+        * If requested, recurse to children.  We use find_inheritance_children
+        * not find_all_inheritors to avoid taking locks far in advance of
+        * checking privileges.  This means we'll visit multiply-inheriting
+        * children more than once, but that's no problem.
+        */
+       if (recurse)
+       {
+               List   *children = find_inheritance_children(reloid, NoLock);
+               ListCell *lc;
+
+               foreach(lc, children)
+               {
+                       Oid                     childreloid = lfirst_oid(lc);
+
+                       LockTableRecurse(childreloid, NULL, lockmode, nowait, recurse);
                }
        }
+
+       relation_close(rel, NoLock);    /* close rel, keep lock */
 }
index d773652d0d5f60f51f7b5ad8856fada5ef9baa25..ec49192cacc09c7135d501a710bd9b7c1e9df281 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.141 2009/01/01 17:23:56 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.142 2009/05/12 16:43:32 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -49,7 +49,6 @@ typedef enum
 /* in heap/heapam.c */
 extern Relation relation_open(Oid relationId, LOCKMODE lockmode);
 extern Relation try_relation_open(Oid relationId, LOCKMODE lockmode);
-extern Relation relation_open_nowait(Oid relationId, LOCKMODE lockmode);
 extern Relation relation_openrv(const RangeVar *relation, LOCKMODE lockmode);
 extern Relation try_relation_openrv(const RangeVar *relation, LOCKMODE lockmode);
 extern void relation_close(Relation relation, LOCKMODE lockmode);