]> granicus.if.org Git - postgresql/blob - src/backend/commands/lockcmds.c
Fix LOCK TABLE to eliminate the race condition that could make it give weird
[postgresql] / src / backend / commands / lockcmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * lockcmds.c
4  *        LOCK command support code
5  *
6  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/lockcmds.c,v 1.24 2009/05/12 16:43:32 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "catalog/namespace.h"
19 #include "catalog/pg_inherits_fn.h"
20 #include "commands/lockcmds.h"
21 #include "miscadmin.h"
22 #include "parser/parse_clause.h"
23 #include "storage/lmgr.h"
24 #include "utils/acl.h"
25 #include "utils/lsyscache.h"
26
27 static void LockTableRecurse(Oid reloid, RangeVar *rv,
28                                                          LOCKMODE lockmode, bool nowait, bool recurse);
29
30
31 /*
32  * LOCK TABLE
33  */
34 void
35 LockTableCommand(LockStmt *lockstmt)
36 {
37         ListCell   *p;
38
39         /*
40          * Iterate over the list and process the named relations one at a time
41          */
42         foreach(p, lockstmt->relations)
43         {
44                 RangeVar   *relation = (RangeVar *) lfirst(p);
45                 bool            recurse = interpretInhOption(relation->inhOpt);
46                 Oid                     reloid;
47
48                 reloid = RangeVarGetRelid(relation, false);
49
50                 LockTableRecurse(reloid, relation,
51                                                  lockstmt->mode, lockstmt->nowait, recurse);
52         }
53 }
54
55 /*
56  * Apply LOCK TABLE recursively over an inheritance tree
57  *
58  * At top level, "rv" is the original command argument; we use it to throw
59  * an appropriate error message if the relation isn't there.  Below top level,
60  * "rv" is NULL and we should just silently ignore any dropped child rel.
61  */
62 static void
63 LockTableRecurse(Oid reloid, RangeVar *rv,
64                                  LOCKMODE lockmode, bool nowait, bool recurse)
65 {
66         Relation        rel;
67         AclResult       aclresult;
68
69         /*
70          * Acquire the lock.  We must do this first to protect against
71          * concurrent drops.  Note that a lock against an already-dropped
72          * relation's OID won't fail.
73          */
74         if (nowait)
75         {
76                 if (!ConditionalLockRelationOid(reloid, lockmode))
77                 {
78                         /* try to throw error by name; relation could be deleted... */
79                         char       *relname = rv ? rv->relname : get_rel_name(reloid);
80
81                         if (relname)
82                                 ereport(ERROR,
83                                                 (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
84                                                  errmsg("could not obtain lock on relation \"%s\"",
85                                                                 relname)));
86                         else
87                                 ereport(ERROR,
88                                                 (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
89                                           errmsg("could not obtain lock on relation with OID %u",
90                                                          reloid)));
91                 }
92         }
93         else
94                 LockRelationOid(reloid, lockmode);
95
96         /*
97          * Now that we have the lock, check to see if the relation really exists
98          * or not.
99          */
100         rel = try_relation_open(reloid, NoLock);
101
102         if (!rel)
103         {
104                 /* Release useless lock */
105                 UnlockRelationOid(reloid, lockmode);
106
107                 /* At top level, throw error; otherwise, ignore this child rel */
108                 if (rv)
109                 {
110                         if (rv->schemaname)
111                                 ereport(ERROR,
112                                                 (errcode(ERRCODE_UNDEFINED_TABLE),
113                                                  errmsg("relation \"%s.%s\" does not exist",
114                                                                 rv->schemaname, rv->relname)));
115                         else
116                                 ereport(ERROR,
117                                                 (errcode(ERRCODE_UNDEFINED_TABLE),
118                                                  errmsg("relation \"%s\" does not exist",
119                                                                 rv->relname)));
120                 }
121
122                 return;
123         }
124
125         /* Verify adequate privilege */
126         if (lockmode == AccessShareLock)
127                 aclresult = pg_class_aclcheck(reloid, GetUserId(),
128                                                                           ACL_SELECT);
129         else
130                 aclresult = pg_class_aclcheck(reloid, GetUserId(),
131                                                                           ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE);
132         if (aclresult != ACLCHECK_OK)
133                 aclcheck_error(aclresult, ACL_KIND_CLASS,
134                                            RelationGetRelationName(rel));
135
136         /* Currently, we only allow plain tables to be locked */
137         if (rel->rd_rel->relkind != RELKIND_RELATION)
138                 ereport(ERROR,
139                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
140                                  errmsg("\"%s\" is not a table",
141                                                 RelationGetRelationName(rel))));
142
143         /*
144          * If requested, recurse to children.  We use find_inheritance_children
145          * not find_all_inheritors to avoid taking locks far in advance of
146          * checking privileges.  This means we'll visit multiply-inheriting
147          * children more than once, but that's no problem.
148          */
149         if (recurse)
150         {
151                 List   *children = find_inheritance_children(reloid, NoLock);
152                 ListCell *lc;
153
154                 foreach(lc, children)
155                 {
156                         Oid                     childreloid = lfirst_oid(lc);
157
158                         LockTableRecurse(childreloid, NULL, lockmode, nowait, recurse);
159                 }
160         }
161
162         relation_close(rel, NoLock);    /* close rel, keep lock */
163 }