]> granicus.if.org Git - postgresql/commitdiff
Improve TRUNCATE by avoiding early lock queue
authorMichael Paquier <michael@paquier.xyz>
Fri, 10 Aug 2018 16:26:59 +0000 (18:26 +0200)
committerMichael Paquier <michael@paquier.xyz>
Fri, 10 Aug 2018 16:26:59 +0000 (18:26 +0200)
A caller of TRUNCATE could previously queue for an access exclusive lock
on a relation it may not have permission to truncate, potentially
interfering with users authorized to work on it.  This can be very
intrusive depending on the lock attempted to be taken.  For example,
pg_authid could be blocked, preventing any authentication attempt to
happen on a PostgreSQL instance.

This commit fixes the case of TRUNCATE so as RangeVarGetRelidExtended is
used with a callback doing the necessary ACL checks at an earlier stage,
avoiding lock queuing issues, so as an immediate failure happens for
unprivileged users instead of waiting on a lock that would not be
taken.

This is rather similar to the type of work done in cbe24a6 for CLUSTER,
and the code of TRUNCATE is this time refactored so as there is no
user-facing changes.  As the commit for CLUSTER, no back-patch is done.

Reported-by: Lloyd Albin, Jeremy Schneider
Author: Michael Paquier
Reviewed by: Nathan Bossart, Kyotaro Horiguchi
Discussion: https://postgr.es/m/152512087100.19803.12733865831237526317@wrigleys.postgresql.org
Discussion: https://postgr.es/m/20180806165816.GA19883@paquier.xyz

src/backend/commands/tablecmds.c
src/test/isolation/expected/truncate-conflict.out [new file with mode: 0644]
src/test/isolation/isolation_schedule
src/test/isolation/specs/truncate-conflict.spec [new file with mode: 0644]

index eb2d33dd86dcd0137bb65f9071273c9ad123e8b2..cef663284078fb0d47b5383a087336c452a9aca0 100644 (file)
@@ -300,7 +300,10 @@ struct DropRelationCallbackState
 #define child_dependency_type(child_is_partition)      \
        ((child_is_partition) ? DEPENDENCY_AUTO : DEPENDENCY_NORMAL)
 
-static void truncate_check_rel(Relation rel);
+static void truncate_check_rel(Oid relid, Form_pg_class reltuple);
+static void truncate_check_activity(Relation rel);
+static void RangeVarCallbackForTruncate(const RangeVar *relation,
+                                                       Oid relId, Oid oldRelId, void *arg);
 static List *MergeAttributes(List *schema, List *supers, char relpersistence,
                                bool is_partition, List **supOids, List **supconstr,
                                int *supOidCount);
@@ -1336,15 +1339,26 @@ ExecuteTruncate(TruncateStmt *stmt)
                bool            recurse = rv->inh;
                Oid                     myrelid;
 
-               rel = heap_openrv(rv, AccessExclusiveLock);
-               myrelid = RelationGetRelid(rel);
+               myrelid = RangeVarGetRelidExtended(rv, AccessExclusiveLock,
+                                                                                  0, RangeVarCallbackForTruncate,
+                                                                                  NULL);
+
+               /* open the relation, we already hold a lock on it */
+               rel = heap_open(myrelid, NoLock);
+
                /* don't throw error for "TRUNCATE foo, foo" */
                if (list_member_oid(relids, myrelid))
                {
                        heap_close(rel, AccessExclusiveLock);
                        continue;
                }
-               truncate_check_rel(rel);
+
+               /*
+                * RangeVarGetRelidExtended() has done most checks with its callback,
+                * but other checks with the now-opened Relation remain.
+                */
+               truncate_check_activity(rel);
+
                rels = lappend(rels, rel);
                relids = lappend_oid(relids, myrelid);
                /* Log this relation only if needed for logical decoding */
@@ -1367,7 +1381,9 @@ ExecuteTruncate(TruncateStmt *stmt)
 
                                /* find_all_inheritors already got lock */
                                rel = heap_open(childrelid, NoLock);
-                               truncate_check_rel(rel);
+                               truncate_check_rel(RelationGetRelid(rel), rel->rd_rel);
+                               truncate_check_activity(rel);
+
                                rels = lappend(rels, rel);
                                relids = lappend_oid(relids, childrelid);
                                /* Log this relation only if needed for logical decoding */
@@ -1450,7 +1466,8 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
                                ereport(NOTICE,
                                                (errmsg("truncate cascades to table \"%s\"",
                                                                RelationGetRelationName(rel))));
-                               truncate_check_rel(rel);
+                               truncate_check_rel(relid, rel->rd_rel);
+                               truncate_check_activity(rel);
                                rels = lappend(rels, rel);
                                relids = lappend_oid(relids, relid);
                                /* Log this relation only if needed for logical decoding */
@@ -1700,38 +1717,47 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
 }
 
 /*
- * Check that a given rel is safe to truncate.  Subroutine for ExecuteTruncate
+ * Check that a given relation is safe to truncate.  Subroutine for
+ * ExecuteTruncate() and RangeVarCallbackForTruncate().
  */
 static void
-truncate_check_rel(Relation rel)
+truncate_check_rel(Oid relid, Form_pg_class reltuple)
 {
        AclResult       aclresult;
+       char       *relname = NameStr(reltuple->relname);
 
        /*
         * Only allow truncate on regular tables and partitioned tables (although,
         * the latter are only being included here for the following checks; no
         * physical truncation will occur in their case.)
         */
-       if (rel->rd_rel->relkind != RELKIND_RELATION &&
-               rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+       if (reltuple->relkind != RELKIND_RELATION &&
+               reltuple->relkind != RELKIND_PARTITIONED_TABLE)
                ereport(ERROR,
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                errmsg("\"%s\" is not a table",
-                                               RelationGetRelationName(rel))));
+                                errmsg("\"%s\" is not a table", relname)));
 
        /* Permissions checks */
-       aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
-                                                                 ACL_TRUNCATE);
+       aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_TRUNCATE);
        if (aclresult != ACLCHECK_OK)
-               aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
-                                          RelationGetRelationName(rel));
+               aclcheck_error(aclresult, get_relkind_objtype(reltuple->relkind),
+                                          relname);
 
-       if (!allowSystemTableMods && IsSystemRelation(rel))
+       if (!allowSystemTableMods && IsSystemClass(relid, reltuple))
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                 errmsg("permission denied: \"%s\" is a system catalog",
-                                               RelationGetRelationName(rel))));
+                                               relname)));
+}
 
+/*
+ * Set of extra sanity checks to check if a given relation is safe to
+ * truncate.  This is split with truncate_check_rel() as
+ * RangeVarCallbackForTruncate() cannot open a Relation yet.
+ */
+static void
+truncate_check_activity(Relation rel)
+{
        /*
         * Don't allow truncate on temp tables of other backends ... their local
         * buffer manager is not going to cope.
@@ -13419,6 +13445,28 @@ RangeVarCallbackOwnsTable(const RangeVar *relation,
                aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relId)), relation->relname);
 }
 
+/*
+ * Callback to RangeVarGetRelidExtended() for TRUNCATE processing.
+ */
+static void
+RangeVarCallbackForTruncate(const RangeVar *relation,
+                                                       Oid relId, Oid oldRelId, void *arg)
+{
+       HeapTuple       tuple;
+
+       /* Nothing to do if the relation was not found. */
+       if (!OidIsValid(relId))
+               return;
+
+       tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId));
+       if (!HeapTupleIsValid(tuple))   /* should not happen */
+               elog(ERROR, "cache lookup failed for relation %u", relId);
+
+       truncate_check_rel(relId, (Form_pg_class) GETSTRUCT(tuple));
+
+       ReleaseSysCache(tuple);
+}
+
 /*
  * Callback to RangeVarGetRelidExtended(), similar to
  * RangeVarCallbackOwnsTable() but without checks on the type of the relation.
diff --git a/src/test/isolation/expected/truncate-conflict.out b/src/test/isolation/expected/truncate-conflict.out
new file mode 100644 (file)
index 0000000..2c10f8d
--- /dev/null
@@ -0,0 +1,99 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_begin s1_tab_lookup s2_auth s2_truncate s1_commit s2_reset
+step s1_begin: BEGIN;
+step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
+?column?       
+
+t              
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_truncate: TRUNCATE truncate_tab;
+ERROR:  permission denied for table truncate_tab
+step s1_commit: COMMIT;
+step s2_reset: RESET ROLE;
+
+starting permutation: s1_begin s2_auth s2_truncate s1_tab_lookup s1_commit s2_reset
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_truncate: TRUNCATE truncate_tab;
+ERROR:  permission denied for table truncate_tab
+step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
+?column?       
+
+t              
+step s1_commit: COMMIT;
+step s2_reset: RESET ROLE;
+
+starting permutation: s1_begin s2_auth s1_tab_lookup s2_truncate s1_commit s2_reset
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
+?column?       
+
+t              
+step s2_truncate: TRUNCATE truncate_tab;
+ERROR:  permission denied for table truncate_tab
+step s1_commit: COMMIT;
+step s2_reset: RESET ROLE;
+
+starting permutation: s2_auth s2_truncate s1_begin s1_tab_lookup s1_commit s2_reset
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_truncate: TRUNCATE truncate_tab;
+ERROR:  permission denied for table truncate_tab
+step s1_begin: BEGIN;
+step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
+?column?       
+
+t              
+step s1_commit: COMMIT;
+step s2_reset: RESET ROLE;
+
+starting permutation: s1_begin s1_tab_lookup s2_grant s2_auth s2_truncate s1_commit s2_reset
+step s1_begin: BEGIN;
+step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
+?column?       
+
+t              
+step s2_grant: GRANT TRUNCATE ON truncate_tab TO regress_truncate_conflict;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_truncate: TRUNCATE truncate_tab; <waiting ...>
+step s1_commit: COMMIT;
+step s2_truncate: <... completed>
+step s2_reset: RESET ROLE;
+
+starting permutation: s1_begin s2_grant s2_auth s2_truncate s1_tab_lookup s1_commit s2_reset
+step s1_begin: BEGIN;
+step s2_grant: GRANT TRUNCATE ON truncate_tab TO regress_truncate_conflict;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_truncate: TRUNCATE truncate_tab;
+step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
+?column?       
+
+t              
+step s1_commit: COMMIT;
+step s2_reset: RESET ROLE;
+
+starting permutation: s1_begin s2_grant s2_auth s1_tab_lookup s2_truncate s1_commit s2_reset
+step s1_begin: BEGIN;
+step s2_grant: GRANT TRUNCATE ON truncate_tab TO regress_truncate_conflict;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
+?column?       
+
+t              
+step s2_truncate: TRUNCATE truncate_tab; <waiting ...>
+step s1_commit: COMMIT;
+step s2_truncate: <... completed>
+step s2_reset: RESET ROLE;
+
+starting permutation: s2_grant s2_auth s2_truncate s1_begin s1_tab_lookup s1_commit s2_reset
+step s2_grant: GRANT TRUNCATE ON truncate_tab TO regress_truncate_conflict;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_truncate: TRUNCATE truncate_tab;
+step s1_begin: BEGIN;
+step s1_tab_lookup: SELECT count(*) >= 0 FROM truncate_tab;
+?column?       
+
+t              
+step s1_commit: COMMIT;
+step s2_reset: RESET ROLE;
index d5594e80e2312b860522fb7e4abc01d5108566d2..48ae74073991847280943d20b5141e66683c3abc 100644 (file)
@@ -76,3 +76,4 @@ test: partition-key-update-2
 test: partition-key-update-3
 test: partition-key-update-4
 test: plpgsql-toast
+test: truncate-conflict
diff --git a/src/test/isolation/specs/truncate-conflict.spec b/src/test/isolation/specs/truncate-conflict.spec
new file mode 100644 (file)
index 0000000..3c1b1d1
--- /dev/null
@@ -0,0 +1,38 @@
+# Tests for locking conflicts with TRUNCATE commands.
+
+setup
+{
+       CREATE ROLE regress_truncate_conflict;
+       CREATE TABLE truncate_tab (a int);
+}
+
+teardown
+{
+       DROP TABLE truncate_tab;
+       DROP ROLE regress_truncate_conflict;
+}
+
+session "s1"
+step "s1_begin"          { BEGIN; }
+step "s1_tab_lookup"     { SELECT count(*) >= 0 FROM truncate_tab; }
+step "s1_commit"         { COMMIT; }
+
+session "s2"
+step "s2_grant"          { GRANT TRUNCATE ON truncate_tab TO regress_truncate_conflict; }
+step "s2_auth"           { SET ROLE regress_truncate_conflict; }
+step "s2_truncate"       { TRUNCATE truncate_tab; }
+step "s2_reset"          { RESET ROLE; }
+
+# The role doesn't have privileges to truncate the table, so TRUNCATE should
+# immediately fail without waiting for a lock.
+permutation "s1_begin" "s1_tab_lookup" "s2_auth" "s2_truncate" "s1_commit" "s2_reset"
+permutation "s1_begin" "s2_auth" "s2_truncate" "s1_tab_lookup" "s1_commit" "s2_reset"
+permutation "s1_begin" "s2_auth" "s1_tab_lookup" "s2_truncate" "s1_commit" "s2_reset"
+permutation "s2_auth" "s2_truncate" "s1_begin" "s1_tab_lookup" "s1_commit" "s2_reset"
+
+# The role has privileges to truncate the table, TRUNCATE will block if
+# another session holds a lock on the table and succeed in all cases.
+permutation "s1_begin" "s1_tab_lookup" "s2_grant" "s2_auth" "s2_truncate" "s1_commit" "s2_reset"
+permutation "s1_begin" "s2_grant" "s2_auth" "s2_truncate" "s1_tab_lookup" "s1_commit" "s2_reset"
+permutation "s1_begin" "s2_grant" "s2_auth" "s1_tab_lookup" "s2_truncate" "s1_commit" "s2_reset"
+permutation "s2_grant" "s2_auth" "s2_truncate" "s1_begin" "s1_tab_lookup" "s1_commit" "s2_reset"