* The insertingAt value is initially set to 0, as we don't know our
* insert location yet.
*/
- immed = LWLockAcquireWithVar(&WALInsertLocks[MyLockNo].l.lock,
- &WALInsertLocks[MyLockNo].l.insertingAt,
- 0);
+ immed = LWLockAcquire(&WALInsertLocks[MyLockNo].l.lock, LW_EXCLUSIVE);
if (!immed)
{
/*
int i;
/*
- * When holding all the locks, we only update the last lock's insertingAt
- * indicator. The others are set to 0xFFFFFFFFFFFFFFFF, which is higher
- * than any real XLogRecPtr value, to make sure that no-one blocks waiting
- * on those.
+ * When holding all the locks, all but the last lock's insertingAt
+ * indicator is set to 0xFFFFFFFFFFFFFFFF, which is higher than any real
+ * XLogRecPtr value, to make sure that no-one blocks waiting on those.
*/
for (i = 0; i < NUM_XLOGINSERT_LOCKS - 1; i++)
{
- LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
- &WALInsertLocks[i].l.insertingAt,
- PG_UINT64_MAX);
+ LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
+ LWLockUpdateVar(&WALInsertLocks[i].l.lock,
+ &WALInsertLocks[i].l.insertingAt,
+ PG_UINT64_MAX);
}
- LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
- &WALInsertLocks[i].l.insertingAt,
- 0);
+ /* Variable value reset to 0 at release */
+ LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
holdingAllLocks = true;
}
/*
* Release our insertion lock (or locks, if we're holding them all).
+ *
+ * NB: Reset all variables to 0, so they cause LWLockWaitForVar to block the
+ * next time the lock is acquired.
*/
static void
WALInsertLockRelease(void)
int i;
for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
- LWLockRelease(&WALInsertLocks[i].l.lock);
+ LWLockReleaseClearVar(&WALInsertLocks[i].l.lock,
+ &WALInsertLocks[i].l.insertingAt,
+ 0);
holdingAllLocks = false;
}
else
{
- LWLockRelease(&WALInsertLocks[MyLockNo].l.lock);
+ LWLockReleaseClearVar(&WALInsertLocks[MyLockNo].l.lock,
+ &WALInsertLocks[MyLockNo].l.insertingAt,
+ 0);
}
}
* locking should be done with the full lock manager --- which depends on
* LWLocks to protect its shared state.
*
- * In addition to exclusive and shared modes, lightweight locks can be used
- * to wait until a variable changes value. The variable is initially set
- * when the lock is acquired with LWLockAcquireWithVar, and can be updated
+ * In addition to exclusive and shared modes, lightweight locks can be used to
+ * wait until a variable changes value. The variable is initially not set
+ * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
+ * value it was set to when the lock was released last, and can be updated
* without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
- * waits for the variable to be updated, or until the lock is free. The
- * meaning of the variable is up to the caller, the lightweight lock code
- * just assigns and compares it.
+ * waits for the variable to be updated, or until the lock is free. When
+ * releasing the lock with LWLockReleaseClearVar() the value can be set to an
+ * appropriate value for a free lock. The meaning of the variable is up to
+ * the caller, the lightweight lock code just assigns and compares it.
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
static int lock_addin_request = 0;
static bool lock_addin_request_allowed = true;
-static inline bool LWLockAcquireCommon(LWLock *l, LWLockMode mode,
- uint64 *valptr, uint64 val);
-
#ifdef LWLOCK_STATS
typedef struct lwlock_stats_key
{
* Side effect: cancel/die interrupts are held off until lock release.
*/
bool
-LWLockAcquire(LWLock *l, LWLockMode mode)
-{
- return LWLockAcquireCommon(l, mode, NULL, 0);
-}
-
-/*
- * LWLockAcquireWithVar - like LWLockAcquire, but also sets *valptr = val
- *
- * The lock is always acquired in exclusive mode with this function.
- */
-bool
-LWLockAcquireWithVar(LWLock *l, uint64 *valptr, uint64 val)
-{
- return LWLockAcquireCommon(l, LW_EXCLUSIVE, valptr, val);
-}
-
-/* internal function to implement LWLockAcquire and LWLockAcquireWithVar */
-static inline bool
-LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
+LWLockAcquire(LWLock *lock, LWLockMode mode)
{
PGPROC *proc = MyProc;
bool result = true;
result = false;
}
- /* If there's a variable associated with this lock, initialize it */
- if (valptr)
- *valptr = val;
-
TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode);
/* Add lock to list of locks held by this backend */
return !mustwait;
}
+/*
+ * Does the lwlock in its current state need to wait for the variable value to
+ * change?
+ *
+ * If we don't need to wait, and it's because the value of the variable has
+ * changed, store the current value in newval.
+ *
+ * *result is set to true if the lock was free, and false otherwise.
+ */
+static bool
+LWLockConflictsWithVar(LWLock *lock,
+ uint64 *valptr, uint64 oldval, uint64 *newval,
+ bool *result)
+{
+ bool mustwait;
+ uint64 value;
+#ifdef LWLOCK_STATS
+ lwlock_stats *lwstats;
+
+ lwstats = get_lwlock_stats_entry(lock);
+#endif
+
+ /*
+ * Test first to see if it the slot is free right now.
+ *
+ * XXX: the caller uses a spinlock before this, so we don't need a memory
+ * barrier here as far as the current usage is concerned. But that might
+ * not be safe in general.
+ */
+ mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
+
+ if (!mustwait)
+ {
+ *result = true;
+ return false;
+ }
+
+ *result = false;
+
+ /*
+ * Read value using spinlock as we can't rely on atomic 64 bit
+ * reads/stores. TODO: On platforms with a way to do atomic 64 bit
+ * reads/writes the spinlock could be optimized away.
+ */
+#ifdef LWLOCK_STATS
+ lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+ SpinLockAcquire(&lock->mutex);
+#endif
+ value = *valptr;
+ SpinLockRelease(&lock->mutex);
+
+ if (value != oldval)
+ {
+ mustwait = false;
+ *newval = value;
+ }
+ else
+ {
+ mustwait = true;
+ }
+
+ return mustwait;
+}
+
/*
* LWLockWaitForVar - Wait until lock is free, or a variable is updated.
*
* matches oldval, returns false and sets *newval to the current value in
* *valptr.
*
- * It's possible that the lock holder releases the lock, but another backend
- * acquires it again before we get a chance to observe that the lock was
- * momentarily released. We wouldn't need to wait for the new lock holder,
- * but we cannot distinguish that case, so we will have to wait.
- *
* Note: this function ignores shared lock holders; if the lock is held
* in shared mode, returns 'true'.
*/
PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
- /*
- * Quick test first to see if it the slot is free right now.
- *
- * XXX: the caller uses a spinlock before this, so we don't need a memory
- * barrier here as far as the current usage is concerned. But that might
- * not be safe in general.
- */
- if ((pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) == 0)
- return true;
-
/*
* Lock out cancel/die interrupts while we sleep on the lock. There is no
* cleanup mechanism to remove us from the wait queue if we got
for (;;)
{
bool mustwait;
- uint64 value;
-
- mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
-
- if (mustwait)
- {
- /*
- * Perform comparison using spinlock as we can't rely on atomic 64
- * bit reads/stores.
- */
-#ifdef LWLOCK_STATS
- lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
-#else
- SpinLockAcquire(&lock->mutex);
-#endif
- /*
- * XXX: We can significantly optimize this on platforms with 64bit
- * atomics.
- */
- value = *valptr;
- if (value != oldval)
- {
- result = false;
- mustwait = false;
- *newval = value;
- }
- else
- mustwait = true;
- SpinLockRelease(&lock->mutex);
- }
- else
- mustwait = false;
+ mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
+ &result);
if (!mustwait)
break; /* the lock was free or value didn't match */
* Add myself to wait queue. Note that this is racy, somebody else
* could wakeup before we're finished queuing. NB: We're using nearly
* the same twice-in-a-row lock acquisition protocol as
- * LWLockAcquire(). Check its comments for details.
+ * LWLockAcquire(). Check its comments for details. The only
+ * difference is that we also have to check the variable's values when
+ * checking the state of the lock.
*/
LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
/*
- * We're now guaranteed to be woken up if necessary. Recheck the
- * lock's state.
+ * We're now guaranteed to be woken up if necessary. Recheck the lock
+ * and variables state.
*/
- mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
+ mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
+ &result);
- /* Ok, lock is free after we queued ourselves. Undo queueing. */
+ /* Ok, no conflict after we queued ourselves. Undo queueing. */
if (!mustwait)
{
LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
RESUME_INTERRUPTS();
}
+/*
+ * LWLockReleaseClearVar - release a previously acquired lock, reset variable
+ */
+void
+LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
+{
+#ifdef LWLOCK_STATS
+ lwlock_stats *lwstats;
+
+ lwstats = get_lwlock_stats_entry(lock);
+ lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+ SpinLockAcquire(&lock->mutex);
+#endif
+ /*
+ * Set the variable's value before releasing the lock, that prevents race
+ * a race condition wherein a new locker acquires the lock, but hasn't yet
+ * set the variables value.
+ */
+ *valptr = val;
+ SpinLockRelease(&lock->mutex);
+
+ LWLockRelease(lock);
+}
+
/*
* LWLockReleaseAll - release all currently-held locks