From 4240e429d0c2d889d0cda23c618f94e12c13ade7 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Fri, 8 Jul 2011 22:19:30 -0400 Subject: [PATCH] Try to acquire relation locks in RangeVarGetRelid. In the previous coding, we would look up a relation in RangeVarGetRelid, lock the resulting OID, and then AcceptInvalidationMessages(). While this was sufficient to ensure that we noticed any changes to the relation definition before building the relcache entry, it didn't handle the possibility that the name we looked up no longer referenced the same OID. This was particularly problematic in the case where a table had been dropped and recreated: we'd latch on to the entry for the old relation and fail later on. Now, we acquire the relation lock inside RangeVarGetRelid, and retry the name lookup if we notice that invalidation messages have been processed meanwhile. Many operations that would previously have failed with an error in the presence of concurrent DDL will now succeed. There is a good deal of work remaining to be done here: many callers of RangeVarGetRelid still pass NoLock for one reason or another. In addition, nothing in this patch guards against the possibility that the meaning of an unqualified name might change due to the creation of a relation in a schema earlier in the user's search path than the one where it was previously found. Furthermore, there's nothing at all here to guard against similar race conditions for non-relations. For all that, it's a start. Noah Misch and Robert Haas --- src/backend/access/heap/heapam.c | 42 ++------ src/backend/catalog/aclchk.c | 7 +- src/backend/catalog/index.c | 3 +- src/backend/catalog/namespace.c | 149 ++++++++++++++++++++++------ src/backend/commands/alter.c | 7 +- src/backend/commands/indexcmds.c | 13 ++- src/backend/commands/lockcmds.c | 136 +++++++++++-------------- src/backend/commands/sequence.c | 15 ++- src/backend/commands/tablecmds.c | 13 ++- src/backend/commands/trigger.c | 18 +++- src/backend/commands/vacuum.c | 11 +- src/backend/parser/parse_relation.c | 8 +- src/backend/parser/parse_type.c | 10 +- src/backend/rewrite/rewriteDefine.c | 12 ++- src/backend/rewrite/rewriteRemove.c | 10 +- src/backend/storage/ipc/sinval.c | 6 ++ src/backend/storage/lmgr/lmgr.c | 9 +- src/backend/tcop/utility.c | 17 ++-- src/backend/utils/adt/acl.c | 3 +- src/backend/utils/adt/regproc.c | 8 +- src/backend/utils/adt/ruleutils.c | 11 +- src/include/catalog/namespace.h | 4 +- src/include/commands/trigger.h | 2 +- src/include/rewrite/rewriteRemove.h | 2 +- src/include/storage/sinval.h | 4 + src/pl/plpgsql/src/pl_comp.c | 12 +-- 26 files changed, 337 insertions(+), 195 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index c9b1d5fd04..9f1bcf1de4 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -975,26 +975,11 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode) { Oid relOid; - /* - * Check for shared-cache-inval messages before trying to open the - * relation. This is needed to cover the case where the name identifies a - * rel that has been dropped and recreated since the start of our - * transaction: if we don't flush the old syscache entry then we'll latch - * onto that entry and suffer an error when we do RelationIdGetRelation. - * Note that relation_open does not need to do this, since a relation's - * OID never changes. - * - * We skip this if asked for NoLock, on the assumption that the caller has - * already ensured some appropriate lock is held. - */ - if (lockmode != NoLock) - AcceptInvalidationMessages(); - - /* Look up the appropriate relation using namespace search */ - relOid = RangeVarGetRelid(relation, false); + /* Look up and lock the appropriate relation using namespace search */ + relOid = RangeVarGetRelid(relation, lockmode, false, false); /* Let relation_open do the rest */ - return relation_open(relOid, lockmode); + return relation_open(relOid, NoLock); } /* ---------------- @@ -1012,30 +997,15 @@ relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode, { Oid relOid; - /* - * Check for shared-cache-inval messages before trying to open the - * relation. This is needed to cover the case where the name identifies a - * rel that has been dropped and recreated since the start of our - * transaction: if we don't flush the old syscache entry then we'll latch - * onto that entry and suffer an error when we do RelationIdGetRelation. - * Note that relation_open does not need to do this, since a relation's - * OID never changes. - * - * We skip this if asked for NoLock, on the assumption that the caller has - * already ensured some appropriate lock is held. - */ - if (lockmode != NoLock) - AcceptInvalidationMessages(); - - /* Look up the appropriate relation using namespace search */ - relOid = RangeVarGetRelid(relation, missing_ok); + /* Look up and lock the appropriate relation using namespace search */ + relOid = RangeVarGetRelid(relation, lockmode, missing_ok, false); /* Return NULL on not-found */ if (!OidIsValid(relOid)) return NULL; /* Let relation_open do the rest */ - return relation_open(relOid, lockmode); + return relation_open(relOid, NoLock); } /* ---------------- diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index df32731b87..9a125bdac6 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -583,6 +583,11 @@ ExecGrantStmt_oids(InternalGrant *istmt) * objectNamesToOids * * Turn a list of object names of a given type into an Oid list. + * + * XXX: This function doesn't take any sort of locks on the objects whose + * names it looks up. In the face of concurrent DDL, we might easily latch + * onto an old version of an object, causing the GRANT or REVOKE statement + * to fail. */ static List * objectNamesToOids(GrantObjectType objtype, List *objnames) @@ -601,7 +606,7 @@ objectNamesToOids(GrantObjectType objtype, List *objnames) RangeVar *relvar = (RangeVar *) lfirst(cell); Oid relOid; - relOid = RangeVarGetRelid(relvar, false); + relOid = RangeVarGetRelid(relvar, NoLock, false, false); objects = lappend_oid(objects, relOid); } break; diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 39ba4869af..83aae7abad 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -2942,7 +2942,8 @@ reindex_relation(Oid relid, int flags) /* * Open and lock the relation. ShareLock is sufficient since we only need - * to prevent schema and data changes in it. + * to prevent schema and data changes in it. The lock level used here + * should match ReindexTable(). */ rel = heap_open(relid, ShareLock); diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index ce795a61c5..8d426951ba 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -44,6 +44,8 @@ #include "parser/parse_func.h" #include "storage/backendid.h" #include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/sinval.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/guc.h" @@ -215,14 +217,20 @@ Datum pg_is_other_temp_schema(PG_FUNCTION_ARGS); * Given a RangeVar describing an existing relation, * select the proper namespace and look up the relation OID. * - * If the relation is not found, return InvalidOid if failOK = true, + * If the relation is not found, return InvalidOid if missing_ok = true, * otherwise raise an error. + * + * If nowait = true, throw an error if we'd have to wait for a lock. */ Oid -RangeVarGetRelid(const RangeVar *relation, bool failOK) +RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok, + bool nowait) { + uint64 inval_count; Oid namespaceId; Oid relId; + Oid oldRelId = InvalidOid; + bool retry = false; /* * We check the catalog name and then ignore it. @@ -238,37 +246,120 @@ RangeVarGetRelid(const RangeVar *relation, bool failOK) } /* - * Some non-default relpersistence value may have been specified. The - * parser never generates such a RangeVar in simple DML, but it can happen - * in contexts such as "CREATE TEMP TABLE foo (f1 int PRIMARY KEY)". Such - * a command will generate an added CREATE INDEX operation, which must be - * careful to find the temp table, even when pg_temp is not first in the - * search path. + * DDL operations can change the results of a name lookup. Since all + * such operations will generate invalidation messages, we keep track + * of whether any such messages show up while we're performing the + * operation, and retry until either (1) no more invalidation messages + * show up or (2) the answer doesn't change. + * + * But if lockmode = NoLock, then we assume that either the caller is OK + * with the answer changing under them, or that they already hold some + * appropriate lock, and therefore return the first answer we get without + * checking for invalidation messages. Also, if the requested lock is + * already held, no LockRelationOid will not AcceptInvalidationMessages, + * so we may fail to notice a change. We could protect against that case + * by calling AcceptInvalidationMessages() before beginning this loop, + * but that would add a significant amount overhead, so for now we don't. */ - if (relation->relpersistence == RELPERSISTENCE_TEMP) + for (;;) { - if (relation->schemaname) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("temporary tables cannot specify a schema name"))); - if (OidIsValid(myTempNamespace)) - relId = get_relname_relid(relation->relname, myTempNamespace); - else /* this probably can't happen? */ - relId = InvalidOid; - } - else if (relation->schemaname) - { - /* use exact schema given */ - namespaceId = LookupExplicitNamespace(relation->schemaname); - relId = get_relname_relid(relation->relname, namespaceId); - } - else - { - /* search the namespace path */ - relId = RelnameGetRelid(relation->relname); + /* + * Remember this value, so that, after looking up the relation name + * and locking its OID, we can check whether any invalidation messages + * have been processed that might require a do-over. + */ + inval_count = SharedInvalidMessageCounter; + + /* + * Some non-default relpersistence value may have been specified. The + * parser never generates such a RangeVar in simple DML, but it can + * happen in contexts such as "CREATE TEMP TABLE foo (f1 int PRIMARY + * KEY)". Such a command will generate an added CREATE INDEX + * operation, which must be careful to find the temp table, even when + * pg_temp is not first in the search path. + */ + if (relation->relpersistence == RELPERSISTENCE_TEMP) + { + if (relation->schemaname) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("temporary tables cannot specify a schema name"))); + if (OidIsValid(myTempNamespace)) + relId = get_relname_relid(relation->relname, myTempNamespace); + else /* this probably can't happen? */ + relId = InvalidOid; + } + else if (relation->schemaname) + { + /* use exact schema given */ + namespaceId = LookupExplicitNamespace(relation->schemaname); + relId = get_relname_relid(relation->relname, namespaceId); + } + else + { + /* search the namespace path */ + relId = RelnameGetRelid(relation->relname); + } + + /* + * If no lock requested, we assume the caller knows what they're + * doing. They should have already acquired a heavyweight lock on + * this relation earlier in the processing of this same statement, + * so it wouldn't be appropriate to AcceptInvalidationMessages() + * here, as that might pull the rug out from under them. + */ + if (lockmode == NoLock) + break; + + /* + * If, upon retry, we get back the same OID we did last time, then + * the invalidation messages we processed did not change the final + * answer. So we're done. + */ + if (retry && relId == oldRelId) + break; + + /* + * Lock relation. This will also accept any pending invalidation + * messages. If we got back InvalidOid, indicating not found, then + * there's nothing to lock, but we accept invalidation messages + * anyway, to flush any negative catcache entries that may be + * lingering. + */ + if (!OidIsValid(relId)) + AcceptInvalidationMessages(); + else if (!nowait) + LockRelationOid(relId, lockmode); + else if (!ConditionalLockRelationOid(relId, lockmode)) + { + if (relation->schemaname) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s.%s\"", + relation->schemaname, relation->relname))); + else + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s\"", + relation->relname))); + } + + /* + * If no invalidation message were processed, we're done! + */ + if (inval_count == SharedInvalidMessageCounter) + break; + + /* + * Something may have changed. Let's repeat the name lookup, to + * make sure this name still references the same relation it did + * previously. + */ + retry = true; + oldRelId = relId; } - if (!OidIsValid(relId) && !failOK) + if (!OidIsValid(relId) && !missing_ok) { if (relation->schemaname) ereport(ERROR, diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 08ee93b8ea..e1be451e35 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -108,7 +108,12 @@ ExecRenameStmt(RenameStmt *stmt) CheckRelationOwnership(stmt->relation, true); - relid = RangeVarGetRelid(stmt->relation, false); + /* + * Lock level used here should match what will be taken later, + * in RenameRelation, renameatt, or renametrig. + */ + relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, + false, false); switch (stmt->renameType) { diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index b7c021d943..5024854081 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -1531,9 +1531,15 @@ ReindexIndex(RangeVar *indexRelation) Oid indOid; HeapTuple tuple; - indOid = RangeVarGetRelid(indexRelation, false); + /* + * XXX: This is not safe in the presence of concurrent DDL. We should + * take AccessExclusiveLock here, but that would violate the rule that + * indexes should only be locked after their parent tables. For now, + * we live with it. + */ + indOid = RangeVarGetRelid(indexRelation, NoLock, false, false); tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid)); - if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ + if (!HeapTupleIsValid(tuple)) elog(ERROR, "cache lookup failed for relation %u", indOid); if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX) @@ -1562,7 +1568,8 @@ ReindexTable(RangeVar *relation) Oid heapOid; HeapTuple tuple; - heapOid = RangeVarGetRelid(relation, false); + /* The lock level used here should match reindex_relation(). */ + heapOid = RangeVarGetRelid(relation, ShareLock, false, false); tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid)); if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ elog(ERROR, "cache lookup failed for relation %u", heapOid); diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c index b2c6ea5559..875f868b96 100644 --- a/src/backend/commands/lockcmds.c +++ b/src/backend/commands/lockcmds.c @@ -24,8 +24,8 @@ #include "utils/acl.h" #include "utils/lsyscache.h" -static void LockTableRecurse(Oid reloid, RangeVar *rv, - LOCKMODE lockmode, bool nowait, bool recurse); +static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, + bool recurse); /* @@ -36,100 +36,40 @@ LockTableCommand(LockStmt *lockstmt) { ListCell *p; + /* + * During recovery we only accept these variations: LOCK TABLE foo IN + * ACCESS SHARE MODE LOCK TABLE foo IN ROW SHARE MODE LOCK TABLE foo + * IN ROW EXCLUSIVE MODE This test must match the restrictions defined + * in LockAcquire() + */ + if (lockstmt->mode > RowExclusiveLock) + PreventCommandDuringRecovery("LOCK TABLE"); + /* * Iterate over the list and process the named relations one at a time */ foreach(p, lockstmt->relations) { - RangeVar *relation = (RangeVar *) lfirst(p); - bool recurse = interpretInhOption(relation->inhOpt); + RangeVar *rv = (RangeVar *) lfirst(p); + Relation rel; + bool recurse = interpretInhOption(rv->inhOpt); Oid reloid; - reloid = RangeVarGetRelid(relation, false); - - /* - * During recovery we only accept these variations: LOCK TABLE foo IN - * ACCESS SHARE MODE LOCK TABLE foo IN ROW SHARE MODE LOCK TABLE foo - * IN ROW EXCLUSIVE MODE This test must match the restrictions defined - * in LockAcquire() - */ - if (lockstmt->mode > RowExclusiveLock) - PreventCommandDuringRecovery("LOCK TABLE"); + reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait); + rel = relation_open(reloid, NoLock); - LockTableRecurse(reloid, relation, - lockstmt->mode, lockstmt->nowait, recurse); + LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse); } } /* * Apply LOCK TABLE recursively over an inheritance tree - * - * At top level, "rv" is the original command argument; we use it to throw - * an appropriate error message if the relation isn't there. Below top level, - * "rv" is NULL and we should just silently ignore any dropped child rel. */ static void -LockTableRecurse(Oid reloid, RangeVar *rv, - LOCKMODE lockmode, bool nowait, bool recurse) +LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse) { - Relation rel; AclResult aclresult; - - /* - * Acquire the lock. We must do this first to protect against concurrent - * drops. Note that a lock against an already-dropped relation's OID - * won't fail. - */ - if (nowait) - { - if (!ConditionalLockRelationOid(reloid, lockmode)) - { - /* try to throw error by name; relation could be deleted... */ - char *relname = rv ? rv->relname : get_rel_name(reloid); - - if (relname) - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation \"%s\"", - relname))); - else - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation with OID %u", - reloid))); - } - } - else - LockRelationOid(reloid, lockmode); - - /* - * Now that we have the lock, check to see if the relation really exists - * or not. - */ - rel = try_relation_open(reloid, NoLock); - - if (!rel) - { - /* Release useless lock */ - UnlockRelationOid(reloid, lockmode); - - /* At top level, throw error; otherwise, ignore this child rel */ - if (rv) - { - if (rv->schemaname) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s.%s\" does not exist", - rv->schemaname, rv->relname))); - else - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s\" does not exist", - rv->relname))); - } - - return; - } + Oid reloid = RelationGetRelid(rel); /* Verify adequate privilege */ if (lockmode == AccessShareLock) @@ -159,12 +99,48 @@ LockTableRecurse(Oid reloid, RangeVar *rv, { List *children = find_inheritance_children(reloid, NoLock); ListCell *lc; + Relation childrel; foreach(lc, children) { Oid childreloid = lfirst_oid(lc); - LockTableRecurse(childreloid, NULL, lockmode, nowait, recurse); + /* + * Acquire the lock, to protect against concurrent drops. Note + * that a lock against an already-dropped relation's OID won't + * fail. + */ + if (!nowait) + LockRelationOid(childreloid, lockmode); + else if (!ConditionalLockRelationOid(childreloid, lockmode)) + { + /* try to throw error by name; relation could be deleted... */ + char *relname = get_rel_name(childreloid); + + if (relname) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s\"", + relname))); + else + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation with OID %u", + reloid))); + } + + /* + * Now that we have the lock, check to see if the relation really + * exists or not. + */ + childrel = try_relation_open(childreloid, NoLock); + if (!childrel) + { + /* Release useless lock */ + UnlockRelationOid(childreloid, lockmode); + } + + LockTableRecurse(childrel, lockmode, nowait, recurse); } } diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index be04177a2e..de6e2a3e33 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -427,8 +427,8 @@ AlterSequence(AlterSeqStmt *stmt) FormData_pg_sequence new; List *owned_by; - /* open and AccessShareLock sequence */ - relid = RangeVarGetRelid(stmt->sequence, false); + /* Open and lock sequence. */ + relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false, false); init_sequence(relid, &elm, &seqrel); /* allow ALTER to sequence owner only */ @@ -507,7 +507,16 @@ nextval(PG_FUNCTION_ARGS) Oid relid; sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin)); - relid = RangeVarGetRelid(sequence, false); + + /* + * XXX: This is not safe in the presence of concurrent DDL, but + * acquiring a lock here is more expensive than letting nextval_internal + * do it, since the latter maintains a cache that keeps us from hitting + * the lock manager more than once per transaction. It's not clear + * whether the performance penalty is material in practice, but for now, + * we do it this way. + */ + relid = RangeVarGetRelid(sequence, NoLock, false, false); PG_RETURN_INT64(nextval_internal(relid)); } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c1af12a5a3..295a1ff6e6 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -764,8 +764,15 @@ RemoveRelations(DropStmt *drop) */ AcceptInvalidationMessages(); - /* Look up the appropriate relation using namespace search */ - relOid = RangeVarGetRelid(rel, true); + /* + * Look up the appropriate relation using namespace search. + * + * XXX: Doing this without a lock is unsafe in the presence of + * concurrent DDL, but acquiring a lock here might violate the rule + * that a table must be locked before its corresponding index. + * So, for now, we ignore the hazard. + */ + relOid = RangeVarGetRelid(rel, NoLock, true, false); /* Not there? */ if (!OidIsValid(relOid)) @@ -2234,6 +2241,8 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype) /* * Grab an exclusive lock on the target table, index, sequence or view, * which we will NOT release until end of transaction. + * + * Lock level used here should match ExecRenameStmt */ targetrelation = relation_open(myrelid, AccessExclusiveLock); diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 9cf8372e96..cadf3a1545 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -194,7 +194,17 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, RelationGetRelationName(rel)))); if (stmt->isconstraint && stmt->constrrel != NULL) - constrrelid = RangeVarGetRelid(stmt->constrrel, false); + { + /* + * We must take a lock on the target relation to protect against + * concurrent drop. It's not clear that AccessShareLock is strong + * enough, but we certainly need at least that much... otherwise, + * we might end up creating a pg_constraint entry referencing a + * nonexistent table. + */ + constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false, + false); + } /* permission checks */ if (!isInternal) @@ -1020,11 +1030,15 @@ ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid) * DropTrigger - drop an individual trigger by name */ void -DropTrigger(Oid relid, const char *trigname, DropBehavior behavior, +DropTrigger(RangeVar *relation, const char *trigname, DropBehavior behavior, bool missing_ok) { + Oid relid; ObjectAddress object; + /* lock level should match RemoveTriggerById */ + relid = RangeVarGetRelid(relation, ShareRowExclusiveLock, false, false); + object.classId = TriggerRelationId; object.objectId = get_trigger_oid(relid, trigname, missing_ok); object.objectSubId = 0; diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 9303967863..889737e26e 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -318,7 +318,16 @@ get_rel_oids(Oid relid, const RangeVar *vacrel) /* Process a specific relation */ Oid relid; - relid = RangeVarGetRelid(vacrel, false); + /* + * Since we don't take a lock here, the relation might be gone, + * or the RangeVar might no longer refer to the OID we look up + * here. In the former case, VACUUM will do nothing; in the + * latter case, it will process the OID we looked up here, rather + * than the new one. Neither is ideal, but there's little practical + * alternative, since we're going to commit this transaction and + * begin a new one between now and then. + */ + relid = RangeVarGetRelid(vacrel, NoLock, false, false); /* Make a relation list entry for this guy */ oldcontext = MemoryContextSwitchTo(vac_context); diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c index f2ccf0d745..3840c2f3ed 100644 --- a/src/backend/parser/parse_relation.c +++ b/src/backend/parser/parse_relation.c @@ -273,11 +273,17 @@ searchRangeTable(ParseState *pstate, RangeVar *relation) * If it's an unqualified name, check for possible CTE matches. A CTE * hides any real relation matches. If no CTE, look for a matching * relation. + * + * NB: It's not critical that RangeVarGetRelid return the correct answer + * here in the face of concurrent DDL. If it doesn't, the worst case + * scenario is a less-clear error message. Also, the tables involved in + * the query are already locked, which reduces the number of cases in + * which surprising behavior can occur. So we do the name lookup unlocked. */ if (!relation->schemaname) cte = scanNameSpaceForCTE(pstate, refname, &ctelevelsup); if (!cte) - relId = RangeVarGetRelid(relation, true); + relId = RangeVarGetRelid(relation, NoLock, true, false); /* Now look for RTEs matching either the relation/CTE or the alias */ for (levelsup = 0; diff --git a/src/backend/parser/parse_type.c b/src/backend/parser/parse_type.c index ac62cbc8c3..2122032ce2 100644 --- a/src/backend/parser/parse_type.c +++ b/src/backend/parser/parse_type.c @@ -108,8 +108,14 @@ LookupTypeName(ParseState *pstate, const TypeName *typeName, break; } - /* look up the field */ - relid = RangeVarGetRelid(rel, false); + /* + * Look up the field. + * + * XXX: As no lock is taken here, this might fail in the presence + * of concurrent DDL. But taking a lock would carry a performance + * penalty and would also require a permissions check. + */ + relid = RangeVarGetRelid(rel, NoLock, false, false); attnum = get_attnum(relid, field); if (attnum == InvalidAttrNumber) ereport(ERROR, diff --git a/src/backend/rewrite/rewriteDefine.c b/src/backend/rewrite/rewriteDefine.c index 60b988175b..17db70e74a 100644 --- a/src/backend/rewrite/rewriteDefine.c +++ b/src/backend/rewrite/rewriteDefine.c @@ -196,11 +196,15 @@ DefineRule(RuleStmt *stmt, const char *queryString) Node *whereClause; Oid relId; - /* Parse analysis ... */ + /* Parse analysis. */ transformRuleStmt(stmt, queryString, &actions, &whereClause); - /* ... find the relation ... */ - relId = RangeVarGetRelid(stmt->relation, false); + /* + * Find and lock the relation. Lock level should match + * DefineQueryRewrite. + */ + relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false, + false); /* ... and execute */ DefineQueryRewrite(stmt->rulename, @@ -242,6 +246,8 @@ DefineQueryRewrite(char *rulename, * grab ShareRowExclusiveLock to lock out insert/update/delete actions and * to ensure that we lock out current CREATE RULE statements; but because * of race conditions in access to catalog entries, we can't do that yet. + * + * Note that this lock level should match the one used in DefineRule. */ event_relation = heap_open(event_relid, AccessExclusiveLock); diff --git a/src/backend/rewrite/rewriteRemove.c b/src/backend/rewrite/rewriteRemove.c index b9dc7c5d9f..cec22ac6a8 100644 --- a/src/backend/rewrite/rewriteRemove.c +++ b/src/backend/rewrite/rewriteRemove.c @@ -19,6 +19,7 @@ #include "access/sysattr.h" #include "catalog/dependency.h" #include "catalog/indexing.h" +#include "catalog/namespace.h" #include "catalog/pg_rewrite.h" #include "miscadmin.h" #include "rewrite/rewriteRemove.h" @@ -37,13 +38,18 @@ * Delete a rule given its name. */ void -RemoveRewriteRule(Oid owningRel, const char *ruleName, DropBehavior behavior, - bool missing_ok) +RemoveRewriteRule(RangeVar *relation, const char *ruleName, + DropBehavior behavior, bool missing_ok) { HeapTuple tuple; Oid eventRelationOid; + Oid owningRel; ObjectAddress object; + /* should match RemoveRewriteRuleById */ + owningRel = RangeVarGetRelid(relation, ShareUpdateExclusiveLock, + false, false); + /* * Find the tuple for the target rule. */ diff --git a/src/backend/storage/ipc/sinval.c b/src/backend/storage/ipc/sinval.c index 9ab16b16ed..8499615f14 100644 --- a/src/backend/storage/ipc/sinval.c +++ b/src/backend/storage/ipc/sinval.c @@ -22,6 +22,9 @@ #include "utils/inval.h" +uint64 SharedInvalidMessageCounter; + + /* * Because backends sitting idle will not be reading sinval events, we * need a way to give an idle backend a swift kick in the rear and make @@ -90,6 +93,7 @@ ReceiveSharedInvalidMessages( { SharedInvalidationMessage *msg = &messages[nextmsg++]; + SharedInvalidMessageCounter++; invalFunction(msg); } @@ -106,6 +110,7 @@ ReceiveSharedInvalidMessages( { /* got a reset message */ elog(DEBUG4, "cache state reset"); + SharedInvalidMessageCounter++; resetFunction(); break; /* nothing more to do */ } @@ -118,6 +123,7 @@ ReceiveSharedInvalidMessages( { SharedInvalidationMessage *msg = &messages[nextmsg++]; + SharedInvalidMessageCounter++; invalFunction(msg); } diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index 859b3852db..3ac098b2a9 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -81,10 +81,11 @@ LockRelationOid(Oid relid, LOCKMODE lockmode) /* * Now that we have the lock, check for invalidation messages, so that we * will update or flush any stale relcache entry before we try to use it. - * We can skip this in the not-uncommon case that we already had the same - * type of lock being requested, since then no one else could have - * modified the relcache entry in an undesirable way. (In the case where - * our own xact modifies the rel, the relcache update happens via + * RangeVarGetRelid() specifically relies on us for this. We can skip + * this in the not-uncommon case that we already had the same type of lock + * being requested, since then no one else could have modified the + * relcache entry in an undesirable way. (In the case where our own xact + * modifies the rel, the relcache update happens via * CommandCounterIncrement, not here.) */ if (res != LOCKACQUIRE_ALREADY_HELD) diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 224e1f3761..0698cafbce 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -77,7 +77,15 @@ CheckRelationOwnership(RangeVar *rel, bool noCatalogs) Oid relOid; HeapTuple tuple; - relOid = RangeVarGetRelid(rel, false); + /* + * XXX: This is unsafe in the presence of concurrent DDL, since it is + * called before acquiring any lock on the target relation. However, + * locking the target relation (especially using something like + * AccessExclusiveLock) before verifying that the user has permissions + * is not appealing either. + */ + relOid = RangeVarGetRelid(rel, NoLock, false, false); + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid)); if (!HeapTupleIsValid(tuple)) /* should not happen */ elog(ERROR, "cache lookup failed for relation %u", relOid); @@ -1111,20 +1119,17 @@ standard_ProcessUtility(Node *parsetree, case T_DropPropertyStmt: { DropPropertyStmt *stmt = (DropPropertyStmt *) parsetree; - Oid relId; - - relId = RangeVarGetRelid(stmt->relation, false); switch (stmt->removeType) { case OBJECT_RULE: /* RemoveRewriteRule checks permissions */ - RemoveRewriteRule(relId, stmt->property, + RemoveRewriteRule(stmt->relation, stmt->property, stmt->behavior, stmt->missing_ok); break; case OBJECT_TRIGGER: /* DropTrigger checks permissions */ - DropTrigger(relId, stmt->property, + DropTrigger(stmt->relation, stmt->property, stmt->behavior, stmt->missing_ok); break; default: diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c index 4a3e241c41..3fa95e2fd3 100644 --- a/src/backend/utils/adt/acl.c +++ b/src/backend/utils/adt/acl.c @@ -1939,7 +1939,8 @@ convert_table_name(text *tablename) relrv = makeRangeVarFromNameList(textToQualifiedNameList(tablename)); - return RangeVarGetRelid(relrv, false); + /* We might not even have permissions on this relation; don't lock it. */ + return RangeVarGetRelid(relrv, NoLock, false, false); } /* diff --git a/src/backend/utils/adt/regproc.c b/src/backend/utils/adt/regproc.c index 6716c0204c..0d42a39ec4 100644 --- a/src/backend/utils/adt/regproc.c +++ b/src/backend/utils/adt/regproc.c @@ -823,7 +823,9 @@ regclassin(PG_FUNCTION_ARGS) */ names = stringToQualifiedNameList(class_name_or_oid); - result = RangeVarGetRelid(makeRangeVarFromNameList(names), false); + /* We might not even have permissions on this relation; don't lock it. */ + result = RangeVarGetRelid(makeRangeVarFromNameList(names), NoLock, false, + false); PG_RETURN_OID(result); } @@ -1294,7 +1296,9 @@ text_regclass(PG_FUNCTION_ARGS) RangeVar *rv; rv = makeRangeVarFromNameList(textToQualifiedNameList(relname)); - result = RangeVarGetRelid(rv, false); + + /* We might not even have permissions on this relation; don't lock it. */ + result = RangeVarGetRelid(rv, NoLock, false, false); PG_RETURN_OID(result); } diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 49dc9c88d5..3fd99e00e3 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -385,8 +385,9 @@ pg_get_viewdef_name(PG_FUNCTION_ARGS) RangeVar *viewrel; Oid viewoid; + /* Look up view name. Can't lock it - we might not have privileges. */ viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname)); - viewoid = RangeVarGetRelid(viewrel, false); + viewoid = RangeVarGetRelid(viewrel, NoLock, false, false); PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, 0))); } @@ -403,8 +404,10 @@ pg_get_viewdef_name_ext(PG_FUNCTION_ARGS) Oid viewoid; prettyFlags = pretty ? PRETTYFLAG_PAREN | PRETTYFLAG_INDENT : 0; + + /* Look up view name. Can't lock it - we might not have privileges. */ viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname)); - viewoid = RangeVarGetRelid(viewrel, false); + viewoid = RangeVarGetRelid(viewrel, NoLock, false, false); PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, prettyFlags))); } @@ -1567,9 +1570,9 @@ pg_get_serial_sequence(PG_FUNCTION_ARGS) SysScanDesc scan; HeapTuple tup; - /* Get the OID of the table */ + /* Look up table name. Can't lock it - we might not have privileges. */ tablerv = makeRangeVarFromNameList(textToQualifiedNameList(tablename)); - tableOid = RangeVarGetRelid(tablerv, false); + tableOid = RangeVarGetRelid(tablerv, NoLock, false, false); /* Get the number of the column */ column = text_to_cstring(columnname); diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index 7e1e194794..4bcbc20497 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -15,6 +15,7 @@ #define NAMESPACE_H #include "nodes/primnodes.h" +#include "storage/lock.h" /* @@ -47,7 +48,8 @@ typedef struct OverrideSearchPath } OverrideSearchPath; -extern Oid RangeVarGetRelid(const RangeVar *relation, bool failOK); +extern Oid RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, + bool missing_ok, bool nowait); extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation); extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation); extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid); diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index ad97871d98..fe21298b64 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -112,7 +112,7 @@ extern Oid CreateTrigger(CreateTrigStmt *stmt, const char *queryString, Oid constraintOid, Oid indexOid, bool isInternal); -extern void DropTrigger(Oid relid, const char *trigname, +extern void DropTrigger(RangeVar *relation, const char *trigname, DropBehavior behavior, bool missing_ok); extern void RemoveTriggerById(Oid trigOid); extern Oid get_trigger_oid(Oid relid, const char *name, bool missing_ok); diff --git a/src/include/rewrite/rewriteRemove.h b/src/include/rewrite/rewriteRemove.h index 90df04591f..b9a63bad7b 100644 --- a/src/include/rewrite/rewriteRemove.h +++ b/src/include/rewrite/rewriteRemove.h @@ -17,7 +17,7 @@ #include "nodes/parsenodes.h" -extern void RemoveRewriteRule(Oid owningRel, const char *ruleName, +extern void RemoveRewriteRule(RangeVar *relation, const char *ruleName, DropBehavior behavior, bool missing_ok); extern void RemoveRewriteRuleById(Oid ruleOid); diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index e9ce0257ac..aba474d237 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -116,6 +116,10 @@ typedef union } SharedInvalidationMessage; +/* Counter of messages processed; don't worry about overflow. */ +extern uint64 SharedInvalidMessageCounter; + + extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n); extern void ReceiveSharedInvalidMessages( diff --git a/src/pl/plpgsql/src/pl_comp.c b/src/pl/plpgsql/src/pl_comp.c index afd20b4872..d22fa68eb3 100644 --- a/src/pl/plpgsql/src/pl_comp.c +++ b/src/pl/plpgsql/src/pl_comp.c @@ -1708,7 +1708,8 @@ plpgsql_parse_cwordtype(List *idents) relvar = makeRangeVar(strVal(linitial(idents)), strVal(lsecond(idents)), -1); - classOid = RangeVarGetRelid(relvar, true); + /* Can't lock relation - we might not have privileges. */ + classOid = RangeVarGetRelid(relvar, NoLock, true, false); if (!OidIsValid(classOid)) goto done; fldname = strVal(lthird(idents)); @@ -1804,16 +1805,11 @@ plpgsql_parse_cwordrowtype(List *idents) /* Avoid memory leaks in long-term function context */ oldCxt = MemoryContextSwitchTo(compile_tmp_cxt); - /* Lookup the relation */ + /* Look up relation name. Can't lock it - we might not have privileges. */ relvar = makeRangeVar(strVal(linitial(idents)), strVal(lsecond(idents)), -1); - classOid = RangeVarGetRelid(relvar, true); - if (!OidIsValid(classOid)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s.%s\" does not exist", - strVal(linitial(idents)), strVal(lsecond(idents))))); + classOid = RangeVarGetRelid(relvar, NoLock, false, false); MemoryContextSwitchTo(oldCxt); -- 2.40.0