From: Tom Lane Date: Mon, 1 Oct 2018 16:43:21 +0000 (-0400) Subject: Add assertions that we hold some relevant lock during relation open. X-Git-Tag: REL_12_BETA1~1471 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=b04aeb0a053e7cf7faad89f7d47844d8ba0dc839;p=postgresql Add assertions that we hold some relevant lock during relation open. Opening a relation with no lock at all is unsafe; there's no guarantee that we'll see a consistent state of the relevant catalog entries. While use of MVCC scans to read the catalogs partially addresses that complaint, it's still possible to switch to a new catalog snapshot partway through loading the relcache entry. Moreover, whether or not you trust the reasoning behind sometimes using less than AccessExclusiveLock for ALTER TABLE, that reasoning is certainly not valid if concurrent users of the table don't hold a lock corresponding to the operation they want to perform. Hence, add some assertion-build-only checks that require any caller of relation_open(x, NoLock) to hold at least AccessShareLock. This isn't a full solution, since we can't verify that the lock level is semantically appropriate for the action --- but it's definitely of some use, because it's already caught two bugs. We can also assert that callers of addRangeTableEntryForRelation() hold at least the lock level specified for the new RTE. Amit Langote and Tom Lane Discussion: https://postgr.es/m/16565.1538327894@sss.pgh.pa.us --- diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 3395445a9a..5f1a69ca53 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1137,6 +1137,14 @@ relation_open(Oid relationId, LOCKMODE lockmode) if (!RelationIsValid(r)) elog(ERROR, "could not open relation with OID %u", relationId); + /* + * If we didn't get the lock ourselves, assert that caller holds one, + * except in bootstrap mode where no locks are used. + */ + Assert(lockmode != NoLock || + IsBootstrapProcessingMode() || + CheckRelationLockedByMe(r, AccessShareLock, true)); + /* Make note that we've accessed a temporary relation */ if (RelationUsesLocalBuffers(r)) MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL; @@ -1183,6 +1191,10 @@ try_relation_open(Oid relationId, LOCKMODE lockmode) if (!RelationIsValid(r)) elog(ERROR, "could not open relation with OID %u", relationId); + /* If we didn't get the lock ourselves, assert that caller holds one */ + Assert(lockmode != NoLock || + CheckRelationLockedByMe(r, AccessShareLock, true)); + /* Make note that we've accessed a temporary relation */ if (RelationUsesLocalBuffers(r)) MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL; @@ -8084,6 +8096,8 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool * idx_rel = RelationIdGetRelation(replidindex); + Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true)); + /* deform tuple, so we have fast access to columns */ heap_deform_tuple(tp, desc, values, nulls); diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c index da600dc0ff..f115ed863a 100644 --- a/src/backend/parser/parse_relation.c +++ b/src/backend/parser/parse_relation.c @@ -28,6 +28,7 @@ #include "parser/parse_enr.h" #include "parser/parse_relation.h" #include "parser/parse_type.h" +#include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -1272,7 +1273,7 @@ addRangeTableEntry(ParseState *pstate, * * lockmode is the lock type required for query execution; it must be one * of AccessShareLock, RowShareLock, or RowExclusiveLock depending on the - * RTE's role within the query. The caller should always hold that lock mode + * RTE's role within the query. The caller must hold that lock mode * or a stronger one. * * Note: properly, lockmode should be declared LOCKMODE not int, but that @@ -1295,6 +1296,7 @@ addRangeTableEntryForRelation(ParseState *pstate, Assert(lockmode == AccessShareLock || lockmode == RowShareLock || lockmode == RowExclusiveLock); + Assert(CheckRelationLockedByMe(rel, lockmode, true)); rte->rtekind = RTE_RELATION; rte->alias = alias; diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index dc0a439638..3f57507bce 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -287,6 +287,51 @@ UnlockRelation(Relation relation, LOCKMODE lockmode) LockRelease(&tag, lockmode, false); } +/* + * CheckRelationLockedByMe + * + * Returns true if current transaction holds a lock on 'relation' of mode + * 'lockmode'. If 'orstronger' is true, a stronger lockmode is also OK. + * ("Stronger" is defined as "numerically higher", which is a bit + * semantically dubious but is OK for the purposes we use this for.) + */ +bool +CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger) +{ + LOCKTAG tag; + + SET_LOCKTAG_RELATION(tag, + relation->rd_lockInfo.lockRelId.dbId, + relation->rd_lockInfo.lockRelId.relId); + + if (LockHeldByMe(&tag, lockmode)) + return true; + + if (orstronger) + { + LOCKMODE slockmode; + + for (slockmode = lockmode + 1; + slockmode <= MaxLockMode; + slockmode++) + { + if (LockHeldByMe(&tag, slockmode)) + { +#ifdef NOT_USED + /* Sometimes this might be useful for debugging purposes */ + elog(WARNING, "lock mode %s substituted for %s on relation %s", + GetLockmodeName(tag.locktag_lockmethodid, slockmode), + GetLockmodeName(tag.locktag_lockmethodid, lockmode), + RelationGetRelationName(relation)); +#endif + return true; + } + } + } + + return false; +} + /* * LockHasWaitersRelation * diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 831ae62525..10f6f60f1e 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -563,6 +563,30 @@ DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2) return false; } +/* + * LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode' + * by the current transaction + */ +bool +LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode) +{ + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; + + /* + * See if there is a LOCALLOCK entry for this lock and lockmode + */ + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.mode = lockmode; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, + (void *) &localtag, + HASH_FIND, NULL); + + return (locallock && locallock->nLocks > 0); +} + /* * LockHasWaiters -- look up 'locktag' and check if releasing this * lock would wake up other processes waiting for it. diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h index a217de9716..e5356b7d54 100644 --- a/src/include/storage/lmgr.h +++ b/src/include/storage/lmgr.h @@ -45,6 +45,8 @@ extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode); extern void LockRelation(Relation relation, LOCKMODE lockmode); extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode); extern void UnlockRelation(Relation relation, LOCKMODE lockmode); +extern bool CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, + bool orstronger); extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode); extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode); diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index ff4df7fec5..a37fda7b63 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -540,6 +540,7 @@ extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks); extern void LockReleaseSession(LOCKMETHODID lockmethodid); extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks); extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks); +extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode); extern bool LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock); extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag, diff --git a/src/include/storage/lockdefs.h b/src/include/storage/lockdefs.h index 72eca39f17..9c933fab3f 100644 --- a/src/include/storage/lockdefs.h +++ b/src/include/storage/lockdefs.h @@ -45,11 +45,15 @@ typedef int LOCKMODE; #define AccessExclusiveLock 8 /* ALTER TABLE, DROP TABLE, VACUUM FULL, * and unqualified LOCK TABLE */ +#define MaxLockMode 8 + + +/* WAL representation of an AccessExclusiveLock on a table */ typedef struct xl_standby_lock { TransactionId xid; /* xid of holder of AccessExclusiveLock */ - Oid dbOid; - Oid relOid; + Oid dbOid; /* DB containing table */ + Oid relOid; /* OID of table */ } xl_standby_lock; #endif /* LOCKDEF_H_ */