From f868a8143a98c4172e3faaf415d9352179d5760b Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Fri, 7 Sep 2018 18:04:37 -0400 Subject: [PATCH] Fix longstanding recursion hazard in sinval message processing. LockRelationOid and sibling routines supposed that, if our session already holds the lock they were asked to acquire, they could skip calling AcceptInvalidationMessages on the grounds that we must have already read any remote sinval messages issued against the relation being locked. This is normally true, but there's a critical special case where it's not: processing inside AcceptInvalidationMessages might attempt to access system relations, resulting in a recursive call to acquire a relation lock. Hence, if the outer call had acquired that same system catalog lock, we'd fall through, despite the possibility that there's an as-yet-unread sinval message for that system catalog. This could, for example, result in failure to access a system catalog or index that had just been processed by VACUUM FULL. This is the explanation for buildfarm failures we've been seeing intermittently for the past three months. The bug is far older than that, but commits a54e1f158 et al added a new recursion case within AcceptInvalidationMessages that is apparently easier to hit than any previous case. To fix this, we must not skip calling AcceptInvalidationMessages until we have *finished* a call to it since acquiring a relation lock, not merely acquired the lock. (There's already adequate logic inside AcceptInvalidationMessages to deal with being called recursively.) Fortunately, we can implement that at trivial cost, by adding a flag to LOCALLOCK hashtable entries that tracks whether we know we have completed such a call. There is an API hazard added by this patch for external callers of LockAcquire: if anything is testing for LOCKACQUIRE_ALREADY_HELD, it might be fooled by the new return code LOCKACQUIRE_ALREADY_CLEAR into thinking the lock wasn't already held. This should be a fail-soft condition, though, unless something very bizarre is being done in response to the test. Also, I added an additional output argument to LockAcquireExtended, assuming that that probably isn't called by any outside code given the very limited usefulness of its additional functionality. Back-patch to all supported branches. Discussion: https://postgr.es/m/12259.1532117714@sss.pgh.pa.us --- src/backend/storage/ipc/standby.c | 3 +- src/backend/storage/lmgr/lmgr.c | 38 ++++++++++++++++----- src/backend/storage/lmgr/lock.c | 57 ++++++++++++++++++++++++++++--- src/include/storage/lock.h | 10 ++++-- 4 files changed, 92 insertions(+), 16 deletions(-) diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 2e07702895..f9c12e603b 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -661,7 +661,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid) SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid); - LockAcquireExtended(&locktag, AccessExclusiveLock, true, false, false); + (void) LockAcquireExtended(&locktag, AccessExclusiveLock, true, false, + false, NULL); } static void diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index 7b2dcb6c60..dc0a439638 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -105,11 +105,12 @@ void LockRelationOid(Oid relid, LOCKMODE lockmode) { LOCKTAG tag; + LOCALLOCK *locallock; LockAcquireResult res; SetLocktagRelationOid(&tag, relid); - res = LockAcquire(&tag, lockmode, false, false); + res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock); /* * Now that we have the lock, check for invalidation messages, so that we @@ -120,9 +121,18 @@ LockRelationOid(Oid relid, LOCKMODE lockmode) * relcache entry in an undesirable way. (In the case where our own xact * modifies the rel, the relcache update happens via * CommandCounterIncrement, not here.) + * + * However, in corner cases where code acts on tables (usually catalogs) + * recursively, we might get here while still processing invalidation + * messages in some outer execution of this function or a sibling. The + * "cleared" status of the lock tells us whether we really are done + * absorbing relevant inval messages. */ - if (res != LOCKACQUIRE_ALREADY_HELD) + if (res != LOCKACQUIRE_ALREADY_CLEAR) + { AcceptInvalidationMessages(); + MarkLockClear(locallock); + } } /* @@ -138,11 +148,12 @@ bool ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode) { LOCKTAG tag; + LOCALLOCK *locallock; LockAcquireResult res; SetLocktagRelationOid(&tag, relid); - res = LockAcquire(&tag, lockmode, false, true); + res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock); if (res == LOCKACQUIRE_NOT_AVAIL) return false; @@ -151,8 +162,11 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode) * Now that we have the lock, check for invalidation messages; see notes * in LockRelationOid. */ - if (res != LOCKACQUIRE_ALREADY_HELD) + if (res != LOCKACQUIRE_ALREADY_CLEAR) + { AcceptInvalidationMessages(); + MarkLockClear(locallock); + } return true; } @@ -199,20 +213,24 @@ void LockRelation(Relation relation, LOCKMODE lockmode) { LOCKTAG tag; + LOCALLOCK *locallock; LockAcquireResult res; SET_LOCKTAG_RELATION(tag, relation->rd_lockInfo.lockRelId.dbId, relation->rd_lockInfo.lockRelId.relId); - res = LockAcquire(&tag, lockmode, false, false); + res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock); /* * Now that we have the lock, check for invalidation messages; see notes * in LockRelationOid. */ - if (res != LOCKACQUIRE_ALREADY_HELD) + if (res != LOCKACQUIRE_ALREADY_CLEAR) + { AcceptInvalidationMessages(); + MarkLockClear(locallock); + } } /* @@ -226,13 +244,14 @@ bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode) { LOCKTAG tag; + LOCALLOCK *locallock; LockAcquireResult res; SET_LOCKTAG_RELATION(tag, relation->rd_lockInfo.lockRelId.dbId, relation->rd_lockInfo.lockRelId.relId); - res = LockAcquire(&tag, lockmode, false, true); + res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock); if (res == LOCKACQUIRE_NOT_AVAIL) return false; @@ -241,8 +260,11 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode) * Now that we have the lock, check for invalidation messages; see notes * in LockRelationOid. */ - if (res != LOCKACQUIRE_ALREADY_HELD) + if (res != LOCKACQUIRE_ALREADY_CLEAR) + { AcceptInvalidationMessages(); + MarkLockClear(locallock); + } return true; } diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index dc3d8d9817..58d1f32cf3 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -669,6 +669,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) * LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true * LOCKACQUIRE_OK lock successfully acquired * LOCKACQUIRE_ALREADY_HELD incremented count for lock already held + * LOCKACQUIRE_ALREADY_CLEAR incremented count for lock already clear * * In the normal case where dontWait=false and the caller doesn't need to * distinguish a freshly acquired lock from one already taken earlier in @@ -685,7 +686,8 @@ LockAcquire(const LOCKTAG *locktag, bool sessionLock, bool dontWait) { - return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true); + return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, + true, NULL); } /* @@ -696,13 +698,17 @@ LockAcquire(const LOCKTAG *locktag, * caller to note that the lock table is full and then begin taking * extreme action to reduce the number of other lock holders before * retrying the action. + * + * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK + * table entry if a lock is successfully acquired, or NULL if not. */ LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait, - bool reportMemoryError) + bool reportMemoryError, + LOCALLOCK **locallockp) { LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; LockMethod lockMethodTable; @@ -766,9 +772,10 @@ LockAcquireExtended(const LOCKTAG *locktag, locallock->proclock = NULL; locallock->hashcode = LockTagHashCode(&(localtag.lock)); locallock->nLocks = 0; + locallock->holdsStrongLockCount = false; + locallock->lockCleared = false; locallock->numLockOwners = 0; locallock->maxLockOwners = 8; - locallock->holdsStrongLockCount = false; locallock->lockOwners = NULL; /* in case next line fails */ locallock->lockOwners = (LOCALLOCKOWNER *) MemoryContextAlloc(TopMemoryContext, @@ -789,13 +796,22 @@ LockAcquireExtended(const LOCKTAG *locktag, } hashcode = locallock->hashcode; + if (locallockp) + *locallockp = locallock; + /* * If we already hold the lock, we can just increase the count locally. + * + * If lockCleared is already set, caller need not worry about absorbing + * sinval messages related to the lock's object. */ if (locallock->nLocks > 0) { GrantLockLocal(locallock, owner); - return LOCKACQUIRE_ALREADY_HELD; + if (locallock->lockCleared) + return LOCKACQUIRE_ALREADY_CLEAR; + else + return LOCKACQUIRE_ALREADY_HELD; } /* @@ -877,6 +893,10 @@ LockAcquireExtended(const LOCKTAG *locktag, hashcode)) { AbortStrongLockAcquire(); + if (locallock->nLocks == 0) + RemoveLocalLock(locallock); + if (locallockp) + *locallockp = NULL; if (reportMemoryError) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -911,6 +931,10 @@ LockAcquireExtended(const LOCKTAG *locktag, { AbortStrongLockAcquire(); LWLockRelease(partitionLock); + if (locallock->nLocks == 0) + RemoveLocalLock(locallock); + if (locallockp) + *locallockp = NULL; if (reportMemoryError) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -976,6 +1000,8 @@ LockAcquireExtended(const LOCKTAG *locktag, LWLockRelease(partitionLock); if (locallock->nLocks == 0) RemoveLocalLock(locallock); + if (locallockp) + *locallockp = NULL; return LOCKACQUIRE_NOT_AVAIL; } @@ -1645,6 +1671,20 @@ GrantAwaitedLock(void) GrantLockLocal(awaitedLock, awaitedOwner); } +/* + * MarkLockClear -- mark an acquired lock as "clear" + * + * This means that we know we have absorbed all sinval messages that other + * sessions generated before we acquired this lock, and so we can confidently + * assume we know about any catalog changes protected by this lock. + */ +void +MarkLockClear(LOCALLOCK *locallock) +{ + Assert(locallock->nLocks > 0); + locallock->lockCleared = true; +} + /* * WaitOnLock -- wait to acquire a lock * @@ -1909,6 +1949,15 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) if (locallock->nLocks > 0) return true; + /* + * At this point we can no longer suppose we are clear of invalidation + * messages related to this lock. Although we'll delete the LOCALLOCK + * object before any intentional return from this routine, it seems worth + * the trouble to explicitly reset lockCleared right now, just in case + * some error prevents us from deleting the LOCALLOCK. + */ + locallock->lockCleared = false; + /* Attempt fast release of any lock eligible for the fast path. */ if (EligibleForRelationFastPath(locktag, lockmode) && FastPathLocalUseCount > 0) diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 777da71679..ff4df7fec5 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -408,9 +408,10 @@ typedef struct LOCALLOCK PROCLOCK *proclock; /* associated PROCLOCK object, if any */ uint32 hashcode; /* copy of LOCKTAG's hash value */ int64 nLocks; /* total number of times lock is held */ + bool holdsStrongLockCount; /* bumped FastPathStrongRelationLocks */ + bool lockCleared; /* we read all sinval msgs for lock */ int numLockOwners; /* # of relevant ResourceOwners */ int maxLockOwners; /* allocated size of array */ - bool holdsStrongLockCount; /* bumped FastPathStrongRelationLocks */ LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */ } LOCALLOCK; @@ -472,7 +473,8 @@ typedef enum { LOCKACQUIRE_NOT_AVAIL, /* lock not available, and dontWait=true */ LOCKACQUIRE_OK, /* lock successfully acquired */ - LOCKACQUIRE_ALREADY_HELD /* incremented count for lock already held */ + LOCKACQUIRE_ALREADY_HELD, /* incremented count for lock already held */ + LOCKACQUIRE_ALREADY_CLEAR /* incremented count for lock already clear */ } LockAcquireResult; /* Deadlock states identified by DeadLockCheck() */ @@ -528,8 +530,10 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait, - bool report_memory_error); + bool reportMemoryError, + LOCALLOCK **locallockp); extern void AbortStrongLockAcquire(void); +extern void MarkLockClear(LOCALLOCK *locallock); extern bool LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock); extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks); -- 2.40.0