]> granicus.if.org Git - postgresql/commitdiff
Fix longstanding recursion hazard in sinval message processing.
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 7 Sep 2018 22:04:37 +0000 (18:04 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 7 Sep 2018 22:04:54 +0000 (18:04 -0400)
LockRelationOid and sibling routines supposed that, if our session already
holds the lock they were asked to acquire, they could skip calling
AcceptInvalidationMessages on the grounds that we must have already read
any remote sinval messages issued against the relation being locked.
This is normally true, but there's a critical special case where it's not:
processing inside AcceptInvalidationMessages might attempt to access system
relations, resulting in a recursive call to acquire a relation lock.

Hence, if the outer call had acquired that same system catalog lock, we'd
fall through, despite the possibility that there's an as-yet-unread sinval
message for that system catalog.  This could, for example, result in
failure to access a system catalog or index that had just been processed
by VACUUM FULL.  This is the explanation for buildfarm failures we've been
seeing intermittently for the past three months.  The bug is far older
than that, but commits a54e1f158 et al added a new recursion case within
AcceptInvalidationMessages that is apparently easier to hit than any
previous case.

To fix this, we must not skip calling AcceptInvalidationMessages until
we have *finished* a call to it since acquiring a relation lock, not
merely acquired the lock.  (There's already adequate logic inside
AcceptInvalidationMessages to deal with being called recursively.)
Fortunately, we can implement that at trivial cost, by adding a flag
to LOCALLOCK hashtable entries that tracks whether we know we have
completed such a call.

There is an API hazard added by this patch for external callers of
LockAcquire: if anything is testing for LOCKACQUIRE_ALREADY_HELD,
it might be fooled by the new return code LOCKACQUIRE_ALREADY_CLEAR
into thinking the lock wasn't already held.  This should be a fail-soft
condition, though, unless something very bizarre is being done in
response to the test.

Also, I added an additional output argument to LockAcquireExtended,
assuming that that probably isn't called by any outside code given
the very limited usefulness of its additional functionality.

Back-patch to all supported branches.

Discussion: https://postgr.es/m/12259.1532117714@sss.pgh.pa.us

src/backend/storage/ipc/standby.c
src/backend/storage/lmgr/lmgr.c
src/backend/storage/lmgr/lock.c
src/include/storage/lock.h

index 2e077028951282f2daa2b3bc531a5be79f916f3d..f9c12e603b1c65b206bd295f3244b9a7c9f19ada 100644 (file)
@@ -661,7 +661,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 
        SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
 
-       LockAcquireExtended(&locktag, AccessExclusiveLock, true, false, false);
+       (void) LockAcquireExtended(&locktag, AccessExclusiveLock, true, false,
+                                                          false, NULL);
 }
 
 static void
index 7b2dcb6c600136e326096f258867d34bb9234ee6..dc0a4396388ec7bd6d9bdf3c5a46df577667c9ce 100644 (file)
@@ -105,11 +105,12 @@ void
 LockRelationOid(Oid relid, LOCKMODE lockmode)
 {
        LOCKTAG         tag;
+       LOCALLOCK  *locallock;
        LockAcquireResult res;
 
        SetLocktagRelationOid(&tag, relid);
 
-       res = LockAcquire(&tag, lockmode, false, false);
+       res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
 
        /*
         * Now that we have the lock, check for invalidation messages, so that we
@@ -120,9 +121,18 @@ LockRelationOid(Oid relid, LOCKMODE lockmode)
         * relcache entry in an undesirable way.  (In the case where our own xact
         * modifies the rel, the relcache update happens via
         * CommandCounterIncrement, not here.)
+        *
+        * However, in corner cases where code acts on tables (usually catalogs)
+        * recursively, we might get here while still processing invalidation
+        * messages in some outer execution of this function or a sibling.  The
+        * "cleared" status of the lock tells us whether we really are done
+        * absorbing relevant inval messages.
         */
-       if (res != LOCKACQUIRE_ALREADY_HELD)
+       if (res != LOCKACQUIRE_ALREADY_CLEAR)
+       {
                AcceptInvalidationMessages();
+               MarkLockClear(locallock);
+       }
 }
 
 /*
@@ -138,11 +148,12 @@ bool
 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 {
        LOCKTAG         tag;
+       LOCALLOCK  *locallock;
        LockAcquireResult res;
 
        SetLocktagRelationOid(&tag, relid);
 
-       res = LockAcquire(&tag, lockmode, false, true);
+       res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
 
        if (res == LOCKACQUIRE_NOT_AVAIL)
                return false;
@@ -151,8 +162,11 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
         * Now that we have the lock, check for invalidation messages; see notes
         * in LockRelationOid.
         */
-       if (res != LOCKACQUIRE_ALREADY_HELD)
+       if (res != LOCKACQUIRE_ALREADY_CLEAR)
+       {
                AcceptInvalidationMessages();
+               MarkLockClear(locallock);
+       }
 
        return true;
 }
@@ -199,20 +213,24 @@ void
 LockRelation(Relation relation, LOCKMODE lockmode)
 {
        LOCKTAG         tag;
+       LOCALLOCK  *locallock;
        LockAcquireResult res;
 
        SET_LOCKTAG_RELATION(tag,
                                                 relation->rd_lockInfo.lockRelId.dbId,
                                                 relation->rd_lockInfo.lockRelId.relId);
 
-       res = LockAcquire(&tag, lockmode, false, false);
+       res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
 
        /*
         * Now that we have the lock, check for invalidation messages; see notes
         * in LockRelationOid.
         */
-       if (res != LOCKACQUIRE_ALREADY_HELD)
+       if (res != LOCKACQUIRE_ALREADY_CLEAR)
+       {
                AcceptInvalidationMessages();
+               MarkLockClear(locallock);
+       }
 }
 
 /*
@@ -226,13 +244,14 @@ bool
 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
 {
        LOCKTAG         tag;
+       LOCALLOCK  *locallock;
        LockAcquireResult res;
 
        SET_LOCKTAG_RELATION(tag,
                                                 relation->rd_lockInfo.lockRelId.dbId,
                                                 relation->rd_lockInfo.lockRelId.relId);
 
-       res = LockAcquire(&tag, lockmode, false, true);
+       res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
 
        if (res == LOCKACQUIRE_NOT_AVAIL)
                return false;
@@ -241,8 +260,11 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
         * Now that we have the lock, check for invalidation messages; see notes
         * in LockRelationOid.
         */
-       if (res != LOCKACQUIRE_ALREADY_HELD)
+       if (res != LOCKACQUIRE_ALREADY_CLEAR)
+       {
                AcceptInvalidationMessages();
+               MarkLockClear(locallock);
+       }
 
        return true;
 }
index dc3d8d981794828a0ba5ade904028249d1390de2..58d1f32cf36076bffeaf6efb5f859605eda0b484 100644 (file)
@@ -669,6 +669,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
  *             LOCKACQUIRE_NOT_AVAIL           lock not available, and dontWait=true
  *             LOCKACQUIRE_OK                          lock successfully acquired
  *             LOCKACQUIRE_ALREADY_HELD        incremented count for lock already held
+ *             LOCKACQUIRE_ALREADY_CLEAR       incremented count for lock already clear
  *
  * In the normal case where dontWait=false and the caller doesn't need to
  * distinguish a freshly acquired lock from one already taken earlier in
@@ -685,7 +686,8 @@ LockAcquire(const LOCKTAG *locktag,
                        bool sessionLock,
                        bool dontWait)
 {
-       return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
+       return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
+                                                          true, NULL);
 }
 
 /*
@@ -696,13 +698,17 @@ LockAcquire(const LOCKTAG *locktag,
  * caller to note that the lock table is full and then begin taking
  * extreme action to reduce the number of other lock holders before
  * retrying the action.
+ *
+ * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
+ * table entry if a lock is successfully acquired, or NULL if not.
  */
 LockAcquireResult
 LockAcquireExtended(const LOCKTAG *locktag,
                                        LOCKMODE lockmode,
                                        bool sessionLock,
                                        bool dontWait,
-                                       bool reportMemoryError)
+                                       bool reportMemoryError,
+                                       LOCALLOCK **locallockp)
 {
        LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
        LockMethod      lockMethodTable;
@@ -766,9 +772,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
                locallock->proclock = NULL;
                locallock->hashcode = LockTagHashCode(&(localtag.lock));
                locallock->nLocks = 0;
+               locallock->holdsStrongLockCount = false;
+               locallock->lockCleared = false;
                locallock->numLockOwners = 0;
                locallock->maxLockOwners = 8;
-               locallock->holdsStrongLockCount = false;
                locallock->lockOwners = NULL;   /* in case next line fails */
                locallock->lockOwners = (LOCALLOCKOWNER *)
                        MemoryContextAlloc(TopMemoryContext,
@@ -789,13 +796,22 @@ LockAcquireExtended(const LOCKTAG *locktag,
        }
        hashcode = locallock->hashcode;
 
+       if (locallockp)
+               *locallockp = locallock;
+
        /*
         * If we already hold the lock, we can just increase the count locally.
+        *
+        * If lockCleared is already set, caller need not worry about absorbing
+        * sinval messages related to the lock's object.
         */
        if (locallock->nLocks > 0)
        {
                GrantLockLocal(locallock, owner);
-               return LOCKACQUIRE_ALREADY_HELD;
+               if (locallock->lockCleared)
+                       return LOCKACQUIRE_ALREADY_CLEAR;
+               else
+                       return LOCKACQUIRE_ALREADY_HELD;
        }
 
        /*
@@ -877,6 +893,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
                                                                                   hashcode))
                {
                        AbortStrongLockAcquire();
+                       if (locallock->nLocks == 0)
+                               RemoveLocalLock(locallock);
+                       if (locallockp)
+                               *locallockp = NULL;
                        if (reportMemoryError)
                                ereport(ERROR,
                                                (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -911,6 +931,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
        {
                AbortStrongLockAcquire();
                LWLockRelease(partitionLock);
+               if (locallock->nLocks == 0)
+                       RemoveLocalLock(locallock);
+               if (locallockp)
+                       *locallockp = NULL;
                if (reportMemoryError)
                        ereport(ERROR,
                                        (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -976,6 +1000,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
                        LWLockRelease(partitionLock);
                        if (locallock->nLocks == 0)
                                RemoveLocalLock(locallock);
+                       if (locallockp)
+                               *locallockp = NULL;
                        return LOCKACQUIRE_NOT_AVAIL;
                }
 
@@ -1645,6 +1671,20 @@ GrantAwaitedLock(void)
        GrantLockLocal(awaitedLock, awaitedOwner);
 }
 
+/*
+ * MarkLockClear -- mark an acquired lock as "clear"
+ *
+ * This means that we know we have absorbed all sinval messages that other
+ * sessions generated before we acquired this lock, and so we can confidently
+ * assume we know about any catalog changes protected by this lock.
+ */
+void
+MarkLockClear(LOCALLOCK *locallock)
+{
+       Assert(locallock->nLocks > 0);
+       locallock->lockCleared = true;
+}
+
 /*
  * WaitOnLock -- wait to acquire a lock
  *
@@ -1909,6 +1949,15 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
        if (locallock->nLocks > 0)
                return true;
 
+       /*
+        * At this point we can no longer suppose we are clear of invalidation
+        * messages related to this lock.  Although we'll delete the LOCALLOCK
+        * object before any intentional return from this routine, it seems worth
+        * the trouble to explicitly reset lockCleared right now, just in case
+        * some error prevents us from deleting the LOCALLOCK.
+        */
+       locallock->lockCleared = false;
+
        /* Attempt fast release of any lock eligible for the fast path. */
        if (EligibleForRelationFastPath(locktag, lockmode) &&
                FastPathLocalUseCount > 0)
index 777da71679056b7b99f7309d8e19c49f9066281f..ff4df7fec513c11ff048aa7c977ea040f56284b5 100644 (file)
@@ -408,9 +408,10 @@ typedef struct LOCALLOCK
        PROCLOCK   *proclock;           /* associated PROCLOCK object, if any */
        uint32          hashcode;               /* copy of LOCKTAG's hash value */
        int64           nLocks;                 /* total number of times lock is held */
+       bool            holdsStrongLockCount;   /* bumped FastPathStrongRelationLocks */
+       bool            lockCleared;    /* we read all sinval msgs for lock */
        int                     numLockOwners;  /* # of relevant ResourceOwners */
        int                     maxLockOwners;  /* allocated size of array */
-       bool            holdsStrongLockCount;   /* bumped FastPathStrongRelationLocks */
        LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
 } LOCALLOCK;
 
@@ -472,7 +473,8 @@ typedef enum
 {
        LOCKACQUIRE_NOT_AVAIL,          /* lock not available, and dontWait=true */
        LOCKACQUIRE_OK,                         /* lock successfully acquired */
-       LOCKACQUIRE_ALREADY_HELD        /* incremented count for lock already held */
+       LOCKACQUIRE_ALREADY_HELD,       /* incremented count for lock already held */
+       LOCKACQUIRE_ALREADY_CLEAR       /* incremented count for lock already clear */
 } LockAcquireResult;
 
 /* Deadlock states identified by DeadLockCheck() */
@@ -528,8 +530,10 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
                                        LOCKMODE lockmode,
                                        bool sessionLock,
                                        bool dontWait,
-                                       bool report_memory_error);
+                                       bool reportMemoryError,
+                                       LOCALLOCK **locallockp);
 extern void AbortStrongLockAcquire(void);
+extern void MarkLockClear(LOCALLOCK *locallock);
 extern bool LockRelease(const LOCKTAG *locktag,
                        LOCKMODE lockmode, bool sessionLock);
 extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);