From: Robert Haas Date: Wed, 21 Dec 2011 20:17:28 +0000 (-0500) Subject: Improve behavior of concurrent CLUSTER. X-Git-Tag: REL9_2_BETA1~663 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=cbe24a6dd8fb224b9585f25b882d5ffdb55a0ba5;p=postgresql Improve behavior of concurrent CLUSTER. In the previous coding, a user could queue up for an AccessExclusiveLock on a table they did not have permission to cluster, thus potentially interfering with access by authorized users who got stuck waiting behind the AccessExclusiveLock. This approach avoids that. cluster() has the same permissions-checking requirements as REINDEX TABLE, so this commit moves the now-shared callback to tablecmds.c and renames it, per discussion with Noah Misch. --- diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index e805e28a39..997c1bde6b 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -25,6 +25,7 @@ #include "catalog/dependency.h" #include "catalog/heap.h" #include "catalog/index.h" +#include "catalog/namespace.h" #include "catalog/toasting.h" #include "commands/cluster.h" #include "commands/tablecmds.h" @@ -106,15 +107,12 @@ cluster(ClusterStmt *stmt, bool isTopLevel) indexOid = InvalidOid; Relation rel; - /* Find and lock the table */ - rel = heap_openrv(stmt->relation, AccessExclusiveLock); - - tableOid = RelationGetRelid(rel); - - /* Check permissions */ - if (!pg_class_ownercheck(tableOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); + /* Find, lock, and check permissions on the table */ + tableOid = RangeVarGetRelidExtended(stmt->relation, + AccessExclusiveLock, + false, false, + RangeVarCallbackOwnsTable, NULL); + rel = heap_open(tableOid, NoLock); /* * Reject clustering a remote temp table ... their local buffer diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 386b95bca2..a3c5277f1e 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -25,6 +25,7 @@ #include "catalog/pg_tablespace.h" #include "commands/dbcommands.h" #include "commands/defrem.h" +#include "commands/tablecmds.h" #include "commands/tablespace.h" #include "mb/pg_wchar.h" #include "miscadmin.h" @@ -63,8 +64,6 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo, static Oid GetIndexOpClass(List *opclass, Oid attrType, char *accessMethodName, Oid accessMethodId); static char *ChooseIndexNameAddition(List *colnames); -static void RangeVarCallbackForReindexTable(const RangeVar *relation, - Oid relId, Oid oldRelId, void *arg); static void RangeVarCallbackForReindexIndex(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg); @@ -1809,7 +1808,7 @@ ReindexTable(RangeVar *relation) /* The lock level used here should match reindex_relation(). */ heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false, - RangeVarCallbackForReindexTable, NULL); + RangeVarCallbackOwnsTable, NULL); if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST)) ereport(NOTICE, @@ -1817,37 +1816,6 @@ ReindexTable(RangeVar *relation) relation->relname))); } -/* - * Check permissions on table before acquiring relation lock. - */ -static void -RangeVarCallbackForReindexTable(const RangeVar *relation, - Oid relId, Oid oldRelId, void *arg) -{ - char relkind; - - /* Nothing to do if the relation was not found. */ - if (!OidIsValid(relId)) - return; - - /* - * If the relation does exist, check whether it's an index. But note - * that the relation might have been dropped between the time we did the - * name lookup and now. In that case, there's nothing to do. - */ - relkind = get_rel_relkind(relId); - if (!relkind) - return; - if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", relation->relname))); - - /* Check permissions */ - if (!pg_class_ownercheck(relId, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname); -} - /* * ReindexDatabase * Recreate indexes of a database. diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 61689b1338..65a28bfb9a 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -9933,3 +9933,38 @@ AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid, } } } + +/* + * This is intended as a callback for RangeVarGetRelidExtended(). It allows + * the table to be locked only if (1) it's a plain table or TOAST table and + * (2) the current user is the owner (or the superuser). This meets the + * permission-checking needs of both CLUTER and REINDEX TABLE; we expose it + * here so that it can be used by both. + */ +void +RangeVarCallbackOwnsTable(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg) +{ + char relkind; + + /* Nothing to do if the relation was not found. */ + if (!OidIsValid(relId)) + return; + + /* + * If the relation does exist, check whether it's an index. But note + * that the relation might have been dropped between the time we did the + * name lookup and now. In that case, there's nothing to do. + */ + relkind = get_rel_relkind(relId); + if (!relkind) + return; + if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table", relation->relname))); + + /* Check permissions */ + if (!pg_class_ownercheck(relId, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname); +} diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 20632eb87b..2a3b4a2e14 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -71,4 +71,7 @@ extern void AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid, SubTransactionId parentSubid); +extern void RangeVarCallbackOwnsTable(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg); + #endif /* TABLECMDS_H */