#include "utils/acl.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
+#include "rewrite/rewriteHandler.h"
+#include "access/heapam.h"
+#include "nodes/nodeFuncs.h"
-static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait);
-static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode);
+static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid);
+static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode, Oid userid);
static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid,
Oid oldrelid, void *arg);
+static void LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait);
/*
* LOCK TABLE
RangeVarCallbackForLockTable,
(void *) &lockstmt->mode);
- if (recurse)
- LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait);
+ if (get_rel_relkind(reloid) == RELKIND_VIEW)
+ LockViewRecurse(reloid, reloid, lockstmt->mode, lockstmt->nowait);
+ else if (recurse)
+ LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait, GetUserId());
}
}
return; /* woops, concurrently dropped; no permissions
* check */
- /* Currently, we only allow plain tables to be locked */
- if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+
+ /* Currently, we only allow plain tables or views to be locked */
+ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
+ relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
+ errmsg("\"%s\" is not a table or a view",
rv->relname)));
/* Check permissions. */
- aclresult = LockTableAclCheck(relid, lockmode);
+ aclresult = LockTableAclCheck(relid, lockmode, GetUserId());
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
}
* multiply-inheriting children more than once, but that's no problem.
*/
static void
-LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
+LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid)
{
List *children;
ListCell *lc;
AclResult aclresult;
/* Check permissions before acquiring the lock. */
- aclresult = LockTableAclCheck(childreloid, lockmode);
+ aclresult = LockTableAclCheck(childreloid, lockmode, userid);
if (aclresult != ACLCHECK_OK)
{
char *relname = get_rel_name(childreloid);
continue;
}
- LockTableRecurse(childreloid, lockmode, nowait);
+ LockTableRecurse(childreloid, lockmode, nowait, userid);
}
}
+/*
+ * Apply LOCK TABLE recursively over a view
+ *
+ * All tables and views appearing in the view definition query are locked
+ * recursively with the same lock mode.
+ */
+
+typedef struct
+{
+ Oid root_reloid;
+ LOCKMODE lockmode;
+ bool nowait;
+ Oid viewowner;
+ Oid viewoid;
+} LockViewRecurse_context;
+
+static bool
+LockViewRecurse_walker(Node *node, LockViewRecurse_context *context)
+{
+ if (node == NULL)
+ return false;
+
+ if (IsA(node, Query))
+ {
+ Query *query = (Query *) node;
+ ListCell *rtable;
+
+ foreach(rtable, query->rtable)
+ {
+ RangeTblEntry *rte = lfirst(rtable);
+ AclResult aclresult;
+
+ Oid relid = rte->relid;
+ char relkind = rte->relkind;
+ char *relname = get_rel_name(relid);
+
+ /* The OLD and NEW placeholder entries in the view's rtable are skipped. */
+ if (relid == context->viewoid &&
+ (!strcmp(rte->eref->aliasname, "old") || !strcmp(rte->eref->aliasname, "new")))
+ continue;
+
+ /* Currently, we only allow plain tables or views to be locked. */
+ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
+ relkind != RELKIND_VIEW)
+ continue;
+
+ /* Check infinite recursion in the view definition. */
+ if (relid == context->root_reloid)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("infinite recursion detected in rules for relation \"%s\"",
+ get_rel_name(context->root_reloid))));
+
+ /* Check permissions with the view owner's privilege. */
+ aclresult = LockTableAclCheck(relid, context->lockmode, context->viewowner);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, get_relkind_objtype(relkind), relname);
+
+ /* We have enough rights to lock the relation; do so. */
+ if (!context->nowait)
+ LockRelationOid(relid, context->lockmode);
+ else if (!ConditionalLockRelationOid(relid, context->lockmode))
+ ereport(ERROR,
+ (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ errmsg("could not obtain lock on relation \"%s\"",
+ relname)));
+
+ if (relkind == RELKIND_VIEW)
+ LockViewRecurse(relid, context->root_reloid, context->lockmode, context->nowait);
+ else if (rte->inh)
+ LockTableRecurse(relid, context->lockmode, context->nowait, context->viewowner);
+ }
+
+ return query_tree_walker(query,
+ LockViewRecurse_walker,
+ context,
+ QTW_IGNORE_JOINALIASES);
+ }
+
+ return expression_tree_walker(node,
+ LockViewRecurse_walker,
+ context);
+}
+
+static void
+LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait)
+{
+ LockViewRecurse_context context;
+
+ Relation view;
+ Query *viewquery;
+
+ view = heap_open(reloid, NoLock);
+ viewquery = get_view_query(view);
+ heap_close(view, NoLock);
+
+ context.root_reloid = root_reloid;
+ context.lockmode = lockmode;
+ context.nowait = nowait;
+ context.viewowner = view->rd_rel->relowner;
+ context.viewoid = reloid;
+
+ LockViewRecurse_walker((Node *) viewquery, &context);
+}
+
/*
* Check whether the current user is permitted to lock this relation.
*/
static AclResult
-LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
+LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid)
{
AclResult aclresult;
AclMode aclmask;
else
aclmask = ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE;
- aclresult = pg_class_aclcheck(reloid, GetUserId(), aclmask);
+ aclresult = pg_class_aclcheck(reloid, userid, aclmask);
return aclresult;
}
CREATE SCHEMA lock_schema1;
SET search_path = lock_schema1;
CREATE TABLE lock_tbl1 (a BIGINT);
-CREATE VIEW lock_view1 AS SELECT 1;
+CREATE TABLE lock_tbl1a (a BIGINT);
+CREATE VIEW lock_view1 AS SELECT * FROM lock_tbl1;
+CREATE VIEW lock_view2(a,b) AS SELECT * FROM lock_tbl1, lock_tbl1a;
+CREATE VIEW lock_view3 AS SELECT * from lock_view2;
+CREATE VIEW lock_view4 AS SELECT (select a from lock_tbl1a limit 1) from lock_tbl1;
+CREATE VIEW lock_view5 AS SELECT * from lock_tbl1 where a in (select * from lock_tbl1a);
+CREATE VIEW lock_view6 AS SELECT * from (select * from lock_tbl1) sub;
CREATE ROLE regress_rol_lock1;
ALTER ROLE regress_rol_lock1 SET search_path = lock_schema1;
GRANT USAGE ON SCHEMA lock_schema1 TO regress_rol_lock1;
LOCK TABLE lock_tbl1 IN SHARE ROW EXCLUSIVE MODE NOWAIT;
LOCK TABLE lock_tbl1 IN EXCLUSIVE MODE NOWAIT;
LOCK TABLE lock_tbl1 IN ACCESS EXCLUSIVE MODE NOWAIT;
-LOCK TABLE lock_view1 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-table
-ERROR: "lock_view1" is not a table
+ROLLBACK;
+-- Verify that we can lock views.
+BEGIN TRANSACTION;
+LOCK TABLE lock_view1 IN EXCLUSIVE MODE;
+-- lock_view1 and lock_tbl1 are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
+ relname
+------------
+ lock_tbl1
+ lock_view1
+(2 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view2 IN EXCLUSIVE MODE;
+-- lock_view1, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
+ relname
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view2
+(3 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view3 IN EXCLUSIVE MODE;
+-- lock_view3, lock_view2, lock_tbl1, and lock_tbl1a are locked recursively.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
+ relname
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view2
+ lock_view3
+(4 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view4 IN EXCLUSIVE MODE;
+-- lock_view4, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
+ relname
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view4
+(3 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view5 IN EXCLUSIVE MODE;
+-- lock_view5, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
+ relname
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view5
+(3 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view6 IN EXCLUSIVE MODE;
+-- lock_view6 an lock_tbl1 are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
+ relname
+------------
+ lock_tbl1
+ lock_view6
+(2 rows)
+
ROLLBACK;
-- Verify that we can lock a table with inheritance children.
CREATE TABLE lock_tbl2 (b BIGINT) INHERITS (lock_tbl1);
--
-- Clean up
--
+DROP VIEW lock_view6;
+DROP VIEW lock_view5;
+DROP VIEW lock_view4;
+DROP VIEW lock_view3;
+DROP VIEW lock_view2;
DROP VIEW lock_view1;
DROP TABLE lock_tbl3;
DROP TABLE lock_tbl2;
DROP TABLE lock_tbl1;
+DROP TABLE lock_tbl1a;
DROP SCHEMA lock_schema1 CASCADE;
DROP ROLE regress_rol_lock1;
-- atomic ops tests
CREATE SCHEMA lock_schema1;
SET search_path = lock_schema1;
CREATE TABLE lock_tbl1 (a BIGINT);
-CREATE VIEW lock_view1 AS SELECT 1;
+CREATE TABLE lock_tbl1a (a BIGINT);
+CREATE VIEW lock_view1 AS SELECT * FROM lock_tbl1;
+CREATE VIEW lock_view2(a,b) AS SELECT * FROM lock_tbl1, lock_tbl1a;
+CREATE VIEW lock_view3 AS SELECT * from lock_view2;
+CREATE VIEW lock_view4 AS SELECT (select a from lock_tbl1a limit 1) from lock_tbl1;
+CREATE VIEW lock_view5 AS SELECT * from lock_tbl1 where a in (select * from lock_tbl1a);
+CREATE VIEW lock_view6 AS SELECT * from (select * from lock_tbl1) sub;
CREATE ROLE regress_rol_lock1;
ALTER ROLE regress_rol_lock1 SET search_path = lock_schema1;
GRANT USAGE ON SCHEMA lock_schema1 TO regress_rol_lock1;
LOCK TABLE lock_tbl1 IN SHARE ROW EXCLUSIVE MODE NOWAIT;
LOCK TABLE lock_tbl1 IN EXCLUSIVE MODE NOWAIT;
LOCK TABLE lock_tbl1 IN ACCESS EXCLUSIVE MODE NOWAIT;
-LOCK TABLE lock_view1 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-table
+ROLLBACK;
+
+-- Verify that we can lock views.
+BEGIN TRANSACTION;
+LOCK TABLE lock_view1 IN EXCLUSIVE MODE;
+-- lock_view1 and lock_tbl1 are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view2 IN EXCLUSIVE MODE;
+-- lock_view1, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view3 IN EXCLUSIVE MODE;
+-- lock_view3, lock_view2, lock_tbl1, and lock_tbl1a are locked recursively.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view4 IN EXCLUSIVE MODE;
+-- lock_view4, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view5 IN EXCLUSIVE MODE;
+-- lock_view5, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view6 IN EXCLUSIVE MODE;
+-- lock_view6 an lock_tbl1 are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
+ order by relname;
ROLLBACK;
-- Verify that we can lock a table with inheritance children.
--
-- Clean up
--
+DROP VIEW lock_view6;
+DROP VIEW lock_view5;
+DROP VIEW lock_view4;
+DROP VIEW lock_view3;
+DROP VIEW lock_view2;
DROP VIEW lock_view1;
DROP TABLE lock_tbl3;
DROP TABLE lock_tbl2;
DROP TABLE lock_tbl1;
+DROP TABLE lock_tbl1a;
DROP SCHEMA lock_schema1 CASCADE;
DROP ROLE regress_rol_lock1;