/* initialize to given target; may increase below */
WriteRqstPtr = record;
- /* read LogwrtResult and update local state */
+ /*
+ * Now wait until we get the write lock, or someone else does the
+ * flush for us.
+ */
+ for (;;)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
+ /* read LogwrtResult and update local state */
SpinLockAcquire(&xlogctl->info_lck);
if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
WriteRqstPtr = xlogctl->LogwrtRqst.Write;
LogwrtResult = xlogctl->LogwrtResult;
SpinLockRelease(&xlogctl->info_lck);
- }
- /* done already? */
- if (!XLByteLE(record, LogwrtResult.Flush))
- {
- /* now wait for the write lock */
- LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+ /* done already? */
+ if (XLByteLE(record, LogwrtResult.Flush))
+ break;
+
+ /*
+ * Try to get the write lock. If we can't get it immediately, wait
+ * until it's released, and recheck if we still need to do the flush
+ * or if the backend that held the lock did it for us already. This
+ * helps to maintain a good rate of group committing when the system
+ * is bottlenecked by the speed of fsyncing.
+ */
+ if (!LWLockWaitUntilFree(WALWriteLock, LW_EXCLUSIVE))
+ {
+ /*
+ * The lock is now free, but we didn't acquire it yet. Before we
+ * do, loop back to check if someone else flushed the record for
+ * us already.
+ */
+ continue;
+ }
+ /* Got the lock */
LogwrtResult = XLogCtl->Write.LogwrtResult;
if (!XLByteLE(record, LogwrtResult.Flush))
{
XLogWrite(WriteRqst, false, false);
}
LWLockRelease(WALWriteLock);
+ /* done */
+ break;
}
END_CRIT_SECTION();
elog(PANIC, "cannot wait without a PGPROC structure");
proc->lwWaiting = true;
- proc->lwExclusive = (mode == LW_EXCLUSIVE);
+ proc->lwWaitMode = mode;
proc->lwWaitLink = NULL;
if (lock->head == NULL)
lock->head = proc;
return !mustwait;
}
+/*
+ * LWLockWaitUntilFree - Wait until a lock is free
+ *
+ * The semantics of this function are a bit funky. If the lock is currently
+ * free, it is acquired in the given mode, and the function returns true. If
+ * the lock isn't immediately free, the function waits until it is released
+ * and returns false, but does not acquire the lock.
+ *
+ * This is currently used for WALWriteLock: when a backend flushes the WAL,
+ * holding WALWriteLock, it can flush the commit records of many other
+ * backends as a side-effect. Those other backends need to wait until the
+ * flush finishes, but don't need to acquire the lock anymore. They can just
+ * wake up, observe that their records have already been flushed, and return.
+ */
+bool
+LWLockWaitUntilFree(LWLockId lockid, LWLockMode mode)
+{
+ volatile LWLock *lock = &(LWLockArray[lockid].lock);
+ PGPROC *proc = MyProc;
+ bool mustwait;
+ int extraWaits = 0;
+
+ PRINT_LWDEBUG("LWLockWaitUntilFree", lockid, lock);
+
+ /* Ensure we will have room to remember the lock */
+ if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
+ elog(ERROR, "too many LWLocks taken");
+
+ /*
+ * Lock out cancel/die interrupts until we exit the code section protected
+ * by the LWLock. This ensures that interrupts will not interfere with
+ * manipulations of data structures in shared memory.
+ */
+ HOLD_INTERRUPTS();
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&lock->mutex);
+
+ /* If I can get the lock, do so quickly. */
+ if (mode == LW_EXCLUSIVE)
+ {
+ if (lock->exclusive == 0 && lock->shared == 0)
+ {
+ lock->exclusive++;
+ mustwait = false;
+ }
+ else
+ mustwait = true;
+ }
+ else
+ {
+ if (lock->exclusive == 0)
+ {
+ lock->shared++;
+ mustwait = false;
+ }
+ else
+ mustwait = true;
+ }
+
+ if (mustwait)
+ {
+ /*
+ * Add myself to wait queue.
+ *
+ * If we don't have a PGPROC structure, there's no way to wait. This
+ * should never occur, since MyProc should only be null during shared
+ * memory initialization.
+ */
+ if (proc == NULL)
+ elog(PANIC, "cannot wait without a PGPROC structure");
+
+ proc->lwWaiting = true;
+ proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+ proc->lwWaitLink = NULL;
+ if (lock->head == NULL)
+ lock->head = proc;
+ else
+ lock->tail->lwWaitLink = proc;
+ lock->tail = proc;
+
+ /* Can release the mutex now */
+ SpinLockRelease(&lock->mutex);
+
+ /*
+ * Wait until awakened. Like in LWLockAcquire, be prepared for bogus
+ * wakups, because we share the semaphore with ProcWaitForSignal.
+ */
+ LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "waiting");
+
+#ifdef LWLOCK_STATS
+ block_counts[lockid]++;
+#endif
+
+ TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
+
+ for (;;)
+ {
+ /* "false" means cannot accept cancel/die interrupt here. */
+ PGSemaphoreLock(&proc->sem, false);
+ if (!proc->lwWaiting)
+ break;
+ extraWaits++;
+ }
+
+ TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+
+ LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "awakened");
+ }
+ else
+ {
+ /* We are done updating shared state of the lock itself. */
+ SpinLockRelease(&lock->mutex);
+ }
+
+ /*
+ * Fix the process wait semaphore's count for any absorbed wakeups.
+ */
+ while (extraWaits-- > 0)
+ PGSemaphoreUnlock(&proc->sem);
+
+ if (mustwait)
+ {
+ /* Failed to get lock, so release interrupt holdoff */
+ RESUME_INTERRUPTS();
+ LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "failed");
+ TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode);
+ }
+ else
+ {
+ /* Add lock to list of locks held by this backend */
+ held_lwlocks[num_held_lwlocks++] = lockid;
+ TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode);
+ }
+
+ return !mustwait;
+}
+
/*
* LWLockRelease - release a previously acquired lock
*/
/*
* Remove the to-be-awakened PGPROCs from the queue. If the front
* waiter wants exclusive lock, awaken him only. Otherwise awaken
- * as many waiters as want shared access.
+ * as many waiters as want shared access (or just want to be
+ * woken up when the lock becomes free without acquiring it,
+ * ie. LWLockWaitUntilFree).
*/
+ bool releaseOK = true;
+
proc = head;
- if (!proc->lwExclusive)
+ if (proc->lwWaitMode != LW_EXCLUSIVE)
{
while (proc->lwWaitLink != NULL &&
- !proc->lwWaitLink->lwExclusive)
+ proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
+ {
proc = proc->lwWaitLink;
+ if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
+ releaseOK = false;
+ }
}
/* proc is now the last PGPROC to be released */
lock->head = proc->lwWaitLink;
proc->lwWaitLink = NULL;
- /* prevent additional wakeups until retryer gets to run */
- lock->releaseOK = false;
+ /*
+ * Prevent additional wakeups until retryer gets to run. Backends
+ * that are just waiting for the lock to become free don't prevent
+ * wakeups, because they might decide that they don't want the
+ * lock, after all.
+ */
+ if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
+ releaseOK = false;
+
+ lock->releaseOK = releaseOK;
}
else
{