]> granicus.if.org Git - postgresql/commitdiff
Fix issues around the "variable" support in the lwlock infrastructure.
authorAndres Freund <andres@anarazel.de>
Fri, 31 Jul 2015 18:20:43 +0000 (20:20 +0200)
committerAndres Freund <andres@anarazel.de>
Sun, 2 Aug 2015 16:41:23 +0000 (18:41 +0200)
The lwlock scalability work introduced two race conditions into the
lwlock variable support provided for xlog.c. First, and harmlessly on
most platforms, it set/read the variable without the spinlock in some
places. Secondly, due to the removal of the spinlock, it was possible
that a backend missed changes to the variable's state if it changed in
the wrong moment because checking the lock's state, the variable's state
and the queuing are not protected by a single spinlock acquisition
anymore.

To fix first move resetting the variable's from LWLockAcquireWithVar to
WALInsertLockRelease, via a new function LWLockReleaseClearVar. That
prevents issues around waiting for a variable's value to change when a
new locker has acquired the lock, but not yet set the value. Secondly
re-check that the variable hasn't changed after enqueing, that prevents
the issue that the lock has been released and already re-acquired by the
time the woken up backend checks for the lock's state.

Reported-By: Jeff Janes
Analyzed-By: Heikki Linnakangas
Reviewed-By: Heikki Linnakangas
Discussion: 5592DB35.2060401@iki.fi
Backpatch: 9.5, where the lwlock scalability went in

src/backend/access/transam/xlog.c
src/backend/storage/lmgr/lwlock.c
src/include/storage/lwlock.h

index 1dd31b37ffe06e67bd3756489f2b139cea6dfce7..939813e7b7177ad022dd2344a6ecec1530a68a56 100644 (file)
@@ -1408,9 +1408,7 @@ WALInsertLockAcquire(void)
         * The insertingAt value is initially set to 0, as we don't know our
         * insert location yet.
         */
-       immed = LWLockAcquireWithVar(&WALInsertLocks[MyLockNo].l.lock,
-                                                                &WALInsertLocks[MyLockNo].l.insertingAt,
-                                                                0);
+       immed = LWLockAcquire(&WALInsertLocks[MyLockNo].l.lock, LW_EXCLUSIVE);
        if (!immed)
        {
                /*
@@ -1435,26 +1433,28 @@ WALInsertLockAcquireExclusive(void)
        int                     i;
 
        /*
-        * When holding all the locks, we only update the last lock's insertingAt
-        * indicator.  The others are set to 0xFFFFFFFFFFFFFFFF, which is higher
-        * than any real XLogRecPtr value, to make sure that no-one blocks waiting
-        * on those.
+        * When holding all the locks, all but the last lock's insertingAt
+        * indicator is set to 0xFFFFFFFFFFFFFFFF, which is higher than any real
+        * XLogRecPtr value, to make sure that no-one blocks waiting on those.
         */
        for (i = 0; i < NUM_XLOGINSERT_LOCKS - 1; i++)
        {
-               LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
-                                                        &WALInsertLocks[i].l.insertingAt,
-                                                        PG_UINT64_MAX);
+               LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
+               LWLockUpdateVar(&WALInsertLocks[i].l.lock,
+                                               &WALInsertLocks[i].l.insertingAt,
+                                               PG_UINT64_MAX);
        }
-       LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
-                                                &WALInsertLocks[i].l.insertingAt,
-                                                0);
+       /* Variable value reset to 0 at release */
+       LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
 
        holdingAllLocks = true;
 }
 
 /*
  * Release our insertion lock (or locks, if we're holding them all).
+ *
+ * NB: Reset all variables to 0, so they cause LWLockWaitForVar to block the
+ * next time the lock is acquired.
  */
 static void
 WALInsertLockRelease(void)
@@ -1464,13 +1464,17 @@ WALInsertLockRelease(void)
                int                     i;
 
                for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
-                       LWLockRelease(&WALInsertLocks[i].l.lock);
+                       LWLockReleaseClearVar(&WALInsertLocks[i].l.lock,
+                                                                 &WALInsertLocks[i].l.insertingAt,
+                                                                 0);
 
                holdingAllLocks = false;
        }
        else
        {
-               LWLockRelease(&WALInsertLocks[MyLockNo].l.lock);
+               LWLockReleaseClearVar(&WALInsertLocks[MyLockNo].l.lock,
+                                                         &WALInsertLocks[MyLockNo].l.insertingAt,
+                                                         0);
        }
 }
 
index e5566d1b60969bc1fb03eb7503e7e35b363d1e5c..ae03eb14196c80fec7a0d042da94726a4a2e2b65 100644 (file)
  * locking should be done with the full lock manager --- which depends on
  * LWLocks to protect its shared state.
  *
- * In addition to exclusive and shared modes, lightweight locks can be used
- * to wait until a variable changes value.  The variable is initially set
- * when the lock is acquired with LWLockAcquireWithVar, and can be updated
+ * In addition to exclusive and shared modes, lightweight locks can be used to
+ * wait until a variable changes value.  The variable is initially not set
+ * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
+ * value it was set to when the lock was released last, and can be updated
  * without releasing the lock by calling LWLockUpdateVar.  LWLockWaitForVar
- * waits for the variable to be updated, or until the lock is free.  The
- * meaning of the variable is up to the caller, the lightweight lock code
- * just assigns and compares it.
+ * waits for the variable to be updated, or until the lock is free.  When
+ * releasing the lock with LWLockReleaseClearVar() the value can be set to an
+ * appropriate value for a free lock.  The meaning of the variable is up to
+ * the caller, the lightweight lock code just assigns and compares it.
  *
  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -150,9 +152,6 @@ static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
 static int     lock_addin_request = 0;
 static bool lock_addin_request_allowed = true;
 
-static inline bool LWLockAcquireCommon(LWLock *l, LWLockMode mode,
-                                       uint64 *valptr, uint64 val);
-
 #ifdef LWLOCK_STATS
 typedef struct lwlock_stats_key
 {
@@ -899,25 +898,7 @@ LWLockDequeueSelf(LWLock *lock)
  * Side effect: cancel/die interrupts are held off until lock release.
  */
 bool
-LWLockAcquire(LWLock *l, LWLockMode mode)
-{
-       return LWLockAcquireCommon(l, mode, NULL, 0);
-}
-
-/*
- * LWLockAcquireWithVar - like LWLockAcquire, but also sets *valptr = val
- *
- * The lock is always acquired in exclusive mode with this function.
- */
-bool
-LWLockAcquireWithVar(LWLock *l, uint64 *valptr, uint64 val)
-{
-       return LWLockAcquireCommon(l, LW_EXCLUSIVE, valptr, val);
-}
-
-/* internal function to implement LWLockAcquire and LWLockAcquireWithVar */
-static inline bool
-LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
+LWLockAcquire(LWLock *lock, LWLockMode mode)
 {
        PGPROC     *proc = MyProc;
        bool            result = true;
@@ -1064,10 +1045,6 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
                result = false;
        }
 
-       /* If there's a variable associated with this lock, initialize it */
-       if (valptr)
-               *valptr = val;
-
        TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode);
 
        /* Add lock to list of locks held by this backend */
@@ -1258,6 +1235,71 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
        return !mustwait;
 }
 
+/*
+ * Does the lwlock in its current state need to wait for the variable value to
+ * change?
+ *
+ * If we don't need to wait, and it's because the value of the variable has
+ * changed, store the current value in newval.
+ *
+ * *result is set to true if the lock was free, and false otherwise.
+ */
+static bool
+LWLockConflictsWithVar(LWLock *lock,
+                                          uint64 *valptr, uint64 oldval, uint64 *newval,
+                                          bool *result)
+{
+       bool            mustwait;
+       uint64          value;
+#ifdef LWLOCK_STATS
+       lwlock_stats *lwstats;
+
+       lwstats = get_lwlock_stats_entry(lock);
+#endif
+
+       /*
+        * Test first to see if it the slot is free right now.
+        *
+        * XXX: the caller uses a spinlock before this, so we don't need a memory
+        * barrier here as far as the current usage is concerned.  But that might
+        * not be safe in general.
+        */
+       mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
+
+       if (!mustwait)
+       {
+               *result = true;
+               return false;
+       }
+
+       *result = false;
+
+       /*
+        * Read value using spinlock as we can't rely on atomic 64 bit
+        * reads/stores.  TODO: On platforms with a way to do atomic 64 bit
+        * reads/writes the spinlock could be optimized away.
+        */
+#ifdef LWLOCK_STATS
+       lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+       SpinLockAcquire(&lock->mutex);
+#endif
+       value = *valptr;
+       SpinLockRelease(&lock->mutex);
+
+       if (value != oldval)
+       {
+               mustwait = false;
+               *newval = value;
+       }
+       else
+       {
+               mustwait = true;
+       }
+
+       return mustwait;
+}
+
 /*
  * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
  *
@@ -1268,11 +1310,6 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
  * matches oldval, returns false and sets *newval to the current value in
  * *valptr.
  *
- * It's possible that the lock holder releases the lock, but another backend
- * acquires it again before we get a chance to observe that the lock was
- * momentarily released.  We wouldn't need to wait for the new lock holder,
- * but we cannot distinguish that case, so we will have to wait.
- *
  * Note: this function ignores shared lock holders; if the lock is held
  * in shared mode, returns 'true'.
  */
@@ -1290,16 +1327,6 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 
        PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
 
-       /*
-        * Quick test first to see if it the slot is free right now.
-        *
-        * XXX: the caller uses a spinlock before this, so we don't need a memory
-        * barrier here as far as the current usage is concerned.  But that might
-        * not be safe in general.
-        */
-       if ((pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) == 0)
-               return true;
-
        /*
         * Lock out cancel/die interrupts while we sleep on the lock.  There is no
         * cleanup mechanism to remove us from the wait queue if we got
@@ -1313,39 +1340,9 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
        for (;;)
        {
                bool            mustwait;
-               uint64          value;
-
-               mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
-
-               if (mustwait)
-               {
-                       /*
-                        * Perform comparison using spinlock as we can't rely on atomic 64
-                        * bit reads/stores.
-                        */
-#ifdef LWLOCK_STATS
-                       lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
-#else
-                       SpinLockAcquire(&lock->mutex);
-#endif
 
-                       /*
-                        * XXX: We can significantly optimize this on platforms with 64bit
-                        * atomics.
-                        */
-                       value = *valptr;
-                       if (value != oldval)
-                       {
-                               result = false;
-                               mustwait = false;
-                               *newval = value;
-                       }
-                       else
-                               mustwait = true;
-                       SpinLockRelease(&lock->mutex);
-               }
-               else
-                       mustwait = false;
+               mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
+                                                                                 &result);
 
                if (!mustwait)
                        break;                          /* the lock was free or value didn't match */
@@ -1354,7 +1351,9 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
                 * Add myself to wait queue. Note that this is racy, somebody else
                 * could wakeup before we're finished queuing. NB: We're using nearly
                 * the same twice-in-a-row lock acquisition protocol as
-                * LWLockAcquire(). Check its comments for details.
+                * LWLockAcquire(). Check its comments for details. The only
+                * difference is that we also have to check the variable's values when
+                * checking the state of the lock.
                 */
                LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
 
@@ -1365,12 +1364,13 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
                pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
 
                /*
-                * We're now guaranteed to be woken up if necessary. Recheck the
-                * lock's state.
+                * We're now guaranteed to be woken up if necessary. Recheck the lock
+                * and variables state.
                 */
-               mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
+               mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
+                                                                                 &result);
 
-               /* Ok, lock is free after we queued ourselves. Undo queueing. */
+               /* Ok, no conflict after we queued ourselves. Undo queueing. */
                if (!mustwait)
                {
                        LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
@@ -1587,6 +1587,31 @@ LWLockRelease(LWLock *lock)
        RESUME_INTERRUPTS();
 }
 
+/*
+ * LWLockReleaseClearVar - release a previously acquired lock, reset variable
+ */
+void
+LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
+{
+#ifdef LWLOCK_STATS
+       lwlock_stats *lwstats;
+
+       lwstats = get_lwlock_stats_entry(lock);
+       lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+       SpinLockAcquire(&lock->mutex);
+#endif
+       /*
+        * Set the variable's value before releasing the lock, that prevents race
+        * a race condition wherein a new locker acquires the lock, but hasn't yet
+        * set the variables value.
+        */
+       *valptr = val;
+       SpinLockRelease(&lock->mutex);
+
+       LWLockRelease(lock);
+}
+
 
 /*
  * LWLockReleaseAll - release all currently-held locks
index cff3b9992218e79f3f168798a1b42bf534a10254..cbd63184b9b4c131afd91a7e3187da3f89a92c87 100644 (file)
@@ -182,10 +182,10 @@ extern bool LWLockAcquire(LWLock *lock, LWLockMode mode);
 extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode);
 extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode);
 extern void LWLockRelease(LWLock *lock);
+extern void LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val);
 extern void LWLockReleaseAll(void);
 extern bool LWLockHeldByMe(LWLock *lock);
 
-extern bool LWLockAcquireWithVar(LWLock *lock, uint64 *valptr, uint64 val);
 extern bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval);
 extern void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 value);