* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.7 2001/12/29 21:30:32 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.8 2002/01/07 16:33:00 tgl Exp $
*
*-------------------------------------------------------------------------
*/
typedef struct LWLock
{
slock_t mutex; /* Protects LWLock and queue of PROCs */
+ bool releaseOK; /* T if ok to release waiters */
char exclusive; /* # of exclusive holders (0 or 1) */
int shared; /* # of shared holders (0..MaxBackends) */
PROC *head; /* head of list of waiting PROCs */
PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
{
if (Trace_lwlocks)
- elog(DEBUG, "%s(%d): excl %d shared %d head %p",
+ elog(DEBUG, "%s(%d): excl %d shared %d head %p rOK %d",
where, (int) lockid,
- (int) lock->exclusive, lock->shared, lock->head);
+ (int) lock->exclusive, lock->shared, lock->head,
+ (int) lock->releaseOK);
}
inline static void
for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
{
SpinLockInit(&lock->mutex);
+ lock->releaseOK = true;
lock->exclusive = 0;
lock->shared = 0;
lock->head = NULL;
LWLockAcquire(LWLockId lockid, LWLockMode mode)
{
volatile LWLock *lock = LWLockArray + lockid;
- bool mustwait;
+ PROC *proc = MyProc;
+ bool retry = false;
+ int extraWaits = 0;
PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
*/
HOLD_INTERRUPTS();
- /* Acquire mutex. Time spent holding mutex should be short! */
- SpinLockAcquire_NoHoldoff(&lock->mutex);
-
- /* If I can get the lock, do so quickly. */
- if (mode == LW_EXCLUSIVE)
+ /*
+ * Loop here to try to acquire lock after each time we are signaled
+ * by LWLockRelease.
+ *
+ * NOTE: it might seem better to have LWLockRelease actually grant us
+ * the lock, rather than retrying and possibly having to go back to
+ * sleep. But in practice that is no good because it means a process
+ * swap for every lock acquisition when two or more processes are
+ * contending for the same lock. Since LWLocks are normally used to
+ * protect not-very-long sections of computation, a process needs to
+ * be able to acquire and release the same lock many times during a
+ * single CPU time slice, even in the presence of contention. The
+ * efficiency of being able to do that outweighs the inefficiency of
+ * sometimes wasting a process dispatch cycle because the lock is not
+ * free when a released waiter finally gets to run. See pgsql-hackers
+ * archives for 29-Dec-01.
+ */
+ for (;;)
{
- if (lock->exclusive == 0 && lock->shared == 0)
+ bool mustwait;
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire_NoHoldoff(&lock->mutex);
+
+ /* If retrying, allow LWLockRelease to release waiters again */
+ if (retry)
+ lock->releaseOK = true;
+
+ /* If I can get the lock, do so quickly. */
+ if (mode == LW_EXCLUSIVE)
{
- lock->exclusive++;
- mustwait = false;
+ if (lock->exclusive == 0 && lock->shared == 0)
+ {
+ lock->exclusive++;
+ mustwait = false;
+ }
+ else
+ mustwait = true;
}
else
- mustwait = true;
- }
- else
- {
- /*
- * If there is someone waiting (presumably for exclusive access),
- * queue up behind him even though I could get the lock. This
- * prevents a stream of read locks from starving a writer.
- */
- if (lock->exclusive == 0 && lock->head == NULL)
{
- lock->shared++;
- mustwait = false;
+ if (lock->exclusive == 0)
+ {
+ lock->shared++;
+ mustwait = false;
+ }
+ else
+ mustwait = true;
}
- else
- mustwait = true;
- }
- if (mustwait)
- {
- /* Add myself to wait queue */
- PROC *proc = MyProc;
- int extraWaits = 0;
+ if (!mustwait)
+ break; /* got the lock */
/*
+ * Add myself to wait queue.
+ *
* If we don't have a PROC structure, there's no way to wait. This
* should never occur, since MyProc should only be null during
* shared memory initialization.
*
* Since we share the process wait semaphore with the regular lock
* manager and ProcWaitForSignal, and we may need to acquire an
- * LWLock while one of those is pending, it is possible that we
- * get awakened for a reason other than being granted the LWLock.
- * If so, loop back and wait again. Once we've gotten the lock,
+ * LWLock while one of those is pending, it is possible that we get
+ * awakened for a reason other than being signaled by LWLockRelease.
+ * If so, loop back and wait again. Once we've gotten the LWLock,
* re-increment the sema by the number of additional signals
* received, so that the lock manager or signal manager will see
* the received signal when it next waits.
LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
- /*
- * The awakener already updated the lock struct's state, so we
- * don't need to do anything more to it. Just need to fix the
- * semaphore count.
- */
- while (extraWaits-- > 0)
- IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum);
- }
- else
- {
- /* Got the lock without waiting */
- SpinLockRelease_NoHoldoff(&lock->mutex);
+ /* Now loop back and try to acquire lock again. */
+ retry = true;
}
+ /* We are done updating shared state of the lock itself. */
+ SpinLockRelease_NoHoldoff(&lock->mutex);
+
/* Add lock to list of locks held by this backend */
Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
held_lwlocks[num_held_lwlocks++] = lockid;
+
+ /*
+ * Fix the process wait semaphore's count for any absorbed wakeups.
+ */
+ while (extraWaits-- > 0)
+ IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum);
}
/*
}
else
{
- /*
- * If there is someone waiting (presumably for exclusive access),
- * queue up behind him even though I could get the lock. This
- * prevents a stream of read locks from starving a writer.
- */
- if (lock->exclusive == 0 && lock->head == NULL)
+ if (lock->exclusive == 0)
{
lock->shared++;
mustwait = false;
/*
* See if I need to awaken any waiters. If I released a non-last
- * shared hold, there cannot be anything to do.
+ * shared hold, there cannot be anything to do. Also, do not awaken
+ * any waiters if someone has already awakened waiters that haven't
+ * yet acquired the lock.
*/
head = lock->head;
if (head != NULL)
{
- if (lock->exclusive == 0 && lock->shared == 0)
+ if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
{
/*
- * Remove the to-be-awakened PROCs from the queue, and update
- * the lock state to show them as holding the lock.
+ * Remove the to-be-awakened PROCs from the queue. If the
+ * front waiter wants exclusive lock, awaken him only.
+ * Otherwise awaken as many waiters as want shared access.
*/
proc = head;
- if (proc->lwExclusive)
- lock->exclusive++;
- else
+ if (!proc->lwExclusive)
{
- lock->shared++;
while (proc->lwWaitLink != NULL &&
!proc->lwWaitLink->lwExclusive)
{
proc = proc->lwWaitLink;
- lock->shared++;
}
}
/* proc is now the last PROC to be released */
lock->head = proc->lwWaitLink;
proc->lwWaitLink = NULL;
+ /* prevent additional wakeups until retryer gets to run */
+ lock->releaseOK = false;
}
else
{