int wal_level = WAL_LEVEL_MINIMAL;
int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
-int num_xloginsert_slots = 8;
+int num_xloginsert_locks = 8;
#ifdef WAL_DEBUG
bool XLOG_DEBUG = false;
* (which is almost but not quite the same as a pointer to the most recent
* CHECKPOINT record). We update this from the shared-memory copy,
* XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold an insertion slot). See XLogInsert for details. We are also allowed
+ * hold an insertion lock). See XLogInsert for details. We are also allowed
* to update from XLogCtl->RedoRecPtr if we hold the info_lck;
* see GetRedoRecPtr. A freshly spawned backend obtains the value during
* InitXLOGAccess.
XLogRecPtr Flush; /* last byte + 1 flushed */
} XLogwrtResult;
-
/*
- * A slot for inserting to the WAL. This is similar to an LWLock, the main
- * difference is that there is an extra xlogInsertingAt field that is protected
- * by the same mutex. Unlike an LWLock, a slot can only be acquired in
- * exclusive mode.
- *
- * The xlogInsertingAt field is used to advertise to other processes how far
- * the slot owner has progressed in inserting the record. When a backend
- * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't
- * yet know where it's going to insert the record. That's conservative
- * but correct; the new insertion is certainly going to go to a byte position
- * greater than 1. If another backend needs to flush the WAL, it will have to
- * wait for the new insertion. xlogInsertingAt is updated after finishing the
- * insert or when crossing a page boundary, which will wake up anyone waiting
- * for it, whether the wait was necessary in the first place or not.
- *
- * A process can wait on a slot in two modes: LW_EXCLUSIVE or
- * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is
- * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes
- * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is
- * released, or xlogInsertingAt is updated. In other words, a process in
- * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress
- * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to
- * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end.
- *
- * To join the wait queue, a process must set MyProc->lwWaitMode to the mode
- * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head
- * or tail of the wait queue. The same mechanism is used to wait on an LWLock,
- * see lwlock.c for details.
+ * Inserting to WAL is protected by a small fixed number of WAL insertion
+ * locks. To insert to the WAL, you must hold one of the locks - it doesn't
+ * matter which one. To lock out other concurrent insertions, you must hold
+ * of them. Each WAL insertion lock consists of a lightweight lock, plus an
+ * indicator of how far the insertion has progressed (insertingAt).
+ *
+ * The insertingAt values are read when a process wants to flush WAL from
+ * the in-memory buffers to disk, to check that all the insertions to the
+ * region the process is about to write out have finished. You could simply
+ * wait for all currently in-progress insertions to finish, but the
+ * insertingAt indicator allows you to ignore insertions to later in the WAL,
+ * so that you only wait for the insertions that are modifying the buffers
+ * you're about to write out.
+ *
+ * This isn't just an optimization. If all the WAL buffers are dirty, an
+ * inserter that's holding a WAL insert lock might need to evict an old WAL
+ * buffer, which requires flushing the WAL. If it's possible for an inserter
+ * to block on another inserter unnecessarily, deadlock can arise when two
+ * inserters holding a WAL insert lock wait for each other to finish their
+ * insertion.
+ *
+ * Small WAL records that don't cross a page boundary never update the value,
+ * the WAL record is just copied to the page and the lock is released. But
+ * to avoid the deadlock-scenario explained above, the indicator is always
+ * updated before sleeping while holding an insertion lock.
*/
typedef struct
{
- slock_t mutex; /* protects the below fields */
- XLogRecPtr xlogInsertingAt; /* insert has completed up to this point */
-
- PGPROC *owner; /* for debugging purposes */
-
- bool releaseOK; /* T if ok to release waiters */
- char exclusive; /* # of exclusive holders (0 or 1) */
- PGPROC *head; /* head of list of waiting PGPROCs */
- PGPROC *tail; /* tail of list of waiting PGPROCs */
- /* tail is undefined when head is NULL */
-} XLogInsertSlot;
+ LWLock lock;
+ XLogRecPtr insertingAt;
+} WALInsertLock;
/*
- * All the slots are allocated as an array in shared memory. We force the
- * array stride to be a power of 2, which saves a few cycles in indexing, but
- * more importantly also ensures that individual slots don't cross cache line
- * boundaries. (Of course, we have to also ensure that the array start
- * address is suitably aligned.)
+ * All the WAL insertion locks are allocated as an array in shared memory. We
+ * force the array stride to be a power of 2, which saves a few cycles in
+ * indexing, but more importantly also ensures that individual slots don't
+ * cross cache line boundaries. (Of course, we have to also ensure that the
+ * array start address is suitably aligned.)
*/
-typedef union XLogInsertSlotPadded
+typedef union WALInsertLockPadded
{
- XLogInsertSlot slot;
+ WALInsertLock l;
char pad[CACHE_LINE_SIZE];
-} XLogInsertSlotPadded;
+} WALInsertLockPadded;
/*
* Shared state data for XLogInsert.
* we must WAL-log it before it actually affects WAL-logging by backends.
* Checkpointer sets at startup or after SIGHUP.
*
- * To read these fields, you must hold an insertion slot. To modify them,
- * you must hold ALL the slots.
+ * To read these fields, you must hold an insertion lock. To modify them,
+ * you must hold ALL the locks.
*/
XLogRecPtr RedoRecPtr; /* current redo point for insertions */
bool forcePageWrites; /* forcing full-page writes for PITR? */
int nonExclusiveBackups;
XLogRecPtr lastBackupStart;
- /* insertion slots, see XLogInsertSlot struct above for details */
- XLogInsertSlotPadded *insertSlots;
+ /*
+ * WAL insertion locks.
+ */
+ WALInsertLockPadded *WALInsertLocks;
+ LWLockTranche WALInsertLockTranche;
+ int WALInsertLockTrancheId;
} XLogCtlInsert;
/*
static XLogCtlData *XLogCtl = NULL;
+/* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */
+static WALInsertLockPadded *WALInsertLocks = NULL;
+
/*
* We maintain an image of pg_control in shared memory.
*/
/* Have we launched bgwriter during recovery? */
static bool bgwriterLaunched = false;
-/* For WALInsertSlotAcquire/Release functions */
-static int MySlotNo = 0;
-static bool holdingAllSlots = false;
+/* For WALInsertLockAcquire/Release functions */
+static int MyLockNo = 0;
+static bool holdingAllLocks = false;
static void readRecoveryCommandFile(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
XLogRecPtr *PrevPtr);
static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
-static void WakeupWaiters(XLogRecPtr EndPos);
static char *GetXLogBuffer(XLogRecPtr ptr);
static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
-static void WALInsertSlotAcquire(bool exclusive);
-static void WALInsertSlotAcquireOne(int slotno);
-static void WALInsertSlotRelease(void);
-static void WALInsertSlotReleaseOne(int slotno);
+static void WALInsertLockAcquire(void);
+static void WALInsertLockAcquireExclusive(void);
+static void WALInsertLockRelease(void);
+static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
/*
* Insert an XLOG record having the specified RMID and info bytes,
*
* We may have to loop back to here if a race condition is detected below.
* We could prevent the race by doing all this work while holding an
- * insertion slot, but it seems better to avoid doing CRC calculations
+ * insertion lock, but it seems better to avoid doing CRC calculations
* while holding one.
*
* We add entries for backup blocks to the chain, so that they don't need
/*
* Decide if we need to do full-page writes in this XLOG record: true if
* full_page_writes is on or we have a PITR request for it. Since we
- * don't yet have an insertion slot, fullPageWrites and forcePageWrites
- * could change under us, but we'll recheck them once we have a slot.
+ * don't yet have an insertion lock, fullPageWrites and forcePageWrites
+ * could change under us, but we'll recheck them once we have a lock.
*/
doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
* record in place. This can be done concurrently in multiple processes.
*
* To keep track of which insertions are still in-progress, each concurrent
- * inserter allocates an "insertion slot", which tells others how far the
- * inserter has progressed. There is a small fixed number of insertion
- * slots, determined by the num_xloginsert_slots GUC. When an inserter
- * finishes, it updates the xlogInsertingAt of its slot to the end of the
- * record it inserted, to let others know that it's done. xlogInsertingAt
- * is also updated when crossing over to a new WAL buffer, to allow the
- * the previous buffer to be flushed.
+ * inserter acquires an insertion lock. In addition to just indicating that
+ * an insertion is in progress, the lock tells others how far the inserter
+ * has progressed. There is a small fixed number of insertion locks,
+ * determined by the num_xloginsert_locks GUC. When an inserter crosses a
+ * page boundary, it updates the value stored in the lock to the how far it
+ * has inserted, to allow the the previous buffer to be flushed.
*
- * Holding onto a slot also protects RedoRecPtr and fullPageWrites from
- * changing until the insertion is finished.
+ * Holding onto an insertion lock also protects RedoRecPtr and
+ * fullPageWrites from changing until the insertion is finished.
*
* Step 2 can usually be done completely in parallel. If the required WAL
* page is not initialized yet, you have to grab WALBufMappingLock to
*----------
*/
START_CRIT_SECTION();
- WALInsertSlotAcquire(isLogSwitch);
+ if (isLogSwitch)
+ WALInsertLockAcquireExclusive();
+ else
+ WALInsertLockAcquire();
/*
* Check to see if my RedoRecPtr is out of date. If so, may have to go
* Oops, this buffer now needs to be backed up, but we
* didn't think so above. Start over.
*/
- WALInsertSlotRelease();
+ WALInsertLockRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
{
/* Oops, must redo it with full-page data. */
- WALInsertSlotRelease();
+ WALInsertLockRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
/*
* Done! Let others know that we're finished.
*/
- WALInsertSlotRelease();
+ WALInsertLockRelease();
MarkCurrentTransactionIdLoggedIfAny();
/*
* These calculations are a bit heavy-weight to be done while holding a
- * spinlock, but since we're holding all the WAL insertion slots, there
+ * spinlock, but since we're holding all the WAL insertion locks, there
* are no other inserters competing for it. GetXLogInsertRecPtr() does
* compete for it, but that's not called very frequently.
*/
while (CurrPos < EndPos)
{
/* initialize the next page (if not initialized already) */
- WakeupWaiters(CurrPos);
+ WALInsertLockUpdateInsertingAt(CurrPos);
AdvanceXLInsertBuffer(CurrPos, false);
CurrPos += XLOG_BLCKSZ;
}
}
/*
- * Allocate a slot for insertion.
- *
- * In exclusive mode, all slots are reserved for the current process. That
- * blocks all concurrent insertions.
+ * Acquire a WAL insertion lock, for inserting to WAL.
*/
static void
-WALInsertSlotAcquire(bool exclusive)
+WALInsertLockAcquire(void)
{
- int i;
-
- if (exclusive)
- {
- for (i = 0; i < num_xloginsert_slots; i++)
- WALInsertSlotAcquireOne(i);
- holdingAllSlots = true;
- }
- else
- WALInsertSlotAcquireOne(-1);
-}
-
-/*
- * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary
- * one if slotno == -1. The index of the slot that was acquired is stored in
- * MySlotNo.
- *
- * This is more or less equivalent to LWLockAcquire().
- */
-static void
-WALInsertSlotAcquireOne(int slotno)
-{
- volatile XLogInsertSlot *slot;
- PGPROC *proc = MyProc;
- bool retry = false;
- int extraWaits = 0;
- static int slotToTry = -1;
+ bool immed;
/*
- * Try to use the slot we used last time. If the system isn't particularly
- * busy, it's a good bet that it's available, and it's good to have some
- * affinity to a particular slot so that you don't unnecessarily bounce
- * cache lines between processes when there is no contention.
+ * It doesn't matter which of the WAL insertion locks we acquire, so try
+ * the one we used last time. If the system isn't particularly busy,
+ * it's a good bet that it's still available, and it's good to have some
+ * affinity to a particular lock so that you don't unnecessarily bounce
+ * cache lines between processes when there's no contention.
*
- * If this is the first time through in this backend, pick a slot
- * (semi-)randomly. This allows the slots to be used evenly if you have a
- * lot of very short connections.
+ * If this is the first time through in this backend, pick a lock
+ * (semi-)randomly. This allows the locks to be used evenly if you have
+ * a lot of very short connections.
*/
- if (slotno != -1)
- MySlotNo = slotno;
- else
- {
- if (slotToTry == -1)
- slotToTry = MyProc->pgprocno % num_xloginsert_slots;
- MySlotNo = slotToTry;
- }
+ static int lockToTry = -1;
- /*
- * We can't wait if we haven't got a PGPROC. This should only occur
- * during bootstrap or shared memory initialization. Put an Assert here
- * to catch unsafe coding practices.
- */
- Assert(MyProc != NULL);
-
- /*
- * Lock out cancel/die interrupts until we exit the code section protected
- * by the slot. This ensures that interrupts will not interfere with
- * manipulations of data structures in shared memory. There is no cleanup
- * mechanism to release the slot if the backend dies while holding one,
- * so make this a critical section.
- */
- START_CRIT_SECTION();
+ if (lockToTry == -1)
+ lockToTry = MyProc->pgprocno % num_xloginsert_locks;
+ MyLockNo = lockToTry;
/*
- * Loop here to try to acquire slot after each time we are signaled by
- * WALInsertSlotRelease.
+ * The insertingAt value is initially set to 0, as we don't know our
+ * insert location yet.
*/
- for (;;)
+ immed = LWLockAcquireWithVar(&WALInsertLocks[MyLockNo].l.lock,
+ &WALInsertLocks[MyLockNo].l.insertingAt,
+ 0);
+ if (!immed)
{
- bool mustwait;
-
- slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
-
- /* Acquire mutex. Time spent holding mutex should be short! */
- SpinLockAcquire(&slot->mutex);
-
- /* If retrying, allow WALInsertSlotRelease to release waiters again */
- if (retry)
- slot->releaseOK = true;
-
- /* If I can get the slot, do so quickly. */
- if (slot->exclusive == 0)
- {
- slot->exclusive++;
- mustwait = false;
- }
- else
- mustwait = true;
-
- if (!mustwait)
- break; /* got the lock */
-
- Assert(slot->owner != MyProc);
-
- /*
- * Add myself to wait queue.
- */
- proc->lwWaiting = true;
- proc->lwWaitMode = LW_EXCLUSIVE;
- proc->lwWaitLink = NULL;
- if (slot->head == NULL)
- slot->head = proc;
- else
- slot->tail->lwWaitLink = proc;
- slot->tail = proc;
-
- /* Can release the mutex now */
- SpinLockRelease(&slot->mutex);
-
/*
- * Wait until awakened.
- *
- * Since we share the process wait semaphore with the regular lock
- * manager and ProcWaitForSignal, and we may need to acquire a slot
- * while one of those is pending, it is possible that we get awakened
- * for a reason other than being signaled by WALInsertSlotRelease. If
- * so, loop back and wait again. Once we've gotten the slot,
- * re-increment the sema by the number of additional signals received,
- * so that the lock manager or signal manager will see the received
- * signal when it next waits.
+ * If we couldn't get the lock immediately, try another lock next
+ * time. On a system with more insertion locks than concurrent
+ * inserters, this causes all the inserters to eventually migrate
+ * to a lock that no-one else is using. On a system with more
+ * inserters than locks, it still helps to distribute the inserters
+ * evenly across the locks.
*/
- for (;;)
- {
- /* "false" means cannot accept cancel/die interrupt here. */
- PGSemaphoreLock(&proc->sem, false);
- if (!proc->lwWaiting)
- break;
- extraWaits++;
- }
-
- /* Now loop back and try to acquire lock again. */
- retry = true;
+ lockToTry = (lockToTry + 1) % num_xloginsert_locks;
}
-
- slot->owner = proc;
-
- /*
- * Normally, we initialize the xlogInsertingAt value of the slot to 1,
- * because we don't yet know where in the WAL we're going to insert. It's
- * not critical what it points to right now - leaving it to a too small
- * value just means that WaitXlogInsertionsToFinish() might wait on us
- * unnecessarily, until we update the value (when we finish the insert or
- * move to next page).
- *
- * If we're grabbing all the slots, however, stamp all but the last one
- * with InvalidXLogRecPtr, meaning there is no insert in progress. The last
- * slot is the one that we will update as we proceed with the insert, the
- * rest are held just to keep off other inserters.
- */
- if (slotno != -1 && slotno != num_xloginsert_slots - 1)
- slot->xlogInsertingAt = InvalidXLogRecPtr;
- else
- slot->xlogInsertingAt = 1;
-
- /* We are done updating shared state of the slot itself. */
- SpinLockRelease(&slot->mutex);
-
- /*
- * Fix the process wait semaphore's count for any absorbed wakeups.
- */
- while (extraWaits-- > 0)
- PGSemaphoreUnlock(&proc->sem);
-
- /*
- * If we couldn't get the slot immediately, try another slot next time.
- * On a system with more insertion slots than concurrent inserters, this
- * causes all the inserters to eventually migrate to a slot that no-one
- * else is using. On a system with more inserters than slots, it still
- * causes the inserters to be distributed quite evenly across the slots.
- */
- if (slotno != -1 && retry)
- slotToTry = (slotToTry + 1) % num_xloginsert_slots;
}
/*
- * Wait for the given slot to become free, or for its xlogInsertingAt location
- * to change to something else than 'waitptr'. In other words, wait for the
- * inserter using the given slot to finish its insertion, or to at least make
- * some progress.
+ * Acquire all WAL insertion locks, to prevent other backends from inserting
+ * to WAL.
*/
static void
-WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
+WALInsertLockAcquireExclusive(void)
{
- PGPROC *proc = MyProc;
- int extraWaits = 0;
-
- /*
- * Lock out cancel/die interrupts while we sleep on the slot. There is
- * no cleanup mechanism to remove us from the wait queue if we got
- * interrupted.
- */
- HOLD_INTERRUPTS();
+ int i;
/*
- * Loop here to try to acquire lock after each time we are signaled.
+ * When holding all the locks, we only update the last lock's insertingAt
+ * indicator. The others are set to 0xFFFFFFFFFFFFFFFF, which is higher
+ * than any real XLogRecPtr value, to make sure that no-one blocks
+ * waiting on those.
*/
- for (;;)
+ for (i = 0; i < num_xloginsert_locks - 1; i++)
{
- bool mustwait;
-
- /* Acquire mutex. Time spent holding mutex should be short! */
- SpinLockAcquire(&slot->mutex);
-
- /* If I can get the lock, do so quickly. */
- if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr)
- mustwait = false;
- else
- mustwait = true;
-
- if (!mustwait)
- break; /* the lock was free */
-
- Assert(slot->owner != MyProc);
-
- /*
- * Add myself to wait queue.
- */
- proc->lwWaiting = true;
- proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
- proc->lwWaitLink = NULL;
-
- /* waiters are added to the front of the queue */
- proc->lwWaitLink = slot->head;
- if (slot->head == NULL)
- slot->tail = proc;
- slot->head = proc;
-
- /* Can release the mutex now */
- SpinLockRelease(&slot->mutex);
-
- /*
- * Wait until awakened.
- *
- * Since we share the process wait semaphore with other things, like
- * the regular lock manager and ProcWaitForSignal, and we may need to
- * acquire an LWLock while one of those is pending, it is possible that
- * we get awakened for a reason other than being signaled by
- * LWLockRelease. If so, loop back and wait again. Once we've gotten
- * the LWLock, re-increment the sema by the number of additional
- * signals received, so that the lock manager or signal manager will
- * see the received signal when it next waits.
- */
- for (;;)
- {
- /* "false" means cannot accept cancel/die interrupt here. */
- PGSemaphoreLock(&proc->sem, false);
- if (!proc->lwWaiting)
- break;
- extraWaits++;
- }
-
- /* Now loop back and try to acquire lock again. */
+ LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
+ &WALInsertLocks[i].l.insertingAt,
+ UINT64CONST(0xFFFFFFFFFFFFFFFF));
}
+ LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
+ &WALInsertLocks[i].l.insertingAt,
+ 0);
- /* We are done updating shared state of the lock itself. */
- SpinLockRelease(&slot->mutex);
-
- /*
- * Fix the process wait semaphore's count for any absorbed wakeups.
- */
- while (extraWaits-- > 0)
- PGSemaphoreUnlock(&proc->sem);
-
- /*
- * Now okay to allow cancel/die interrupts.
- */
- RESUME_INTERRUPTS();
+ holdingAllLocks = true;
}
/*
- * Wake up all processes waiting for us with WaitOnSlot(). Sets our
- * xlogInsertingAt value to EndPos, without releasing the slot.
+ * Release our insertion lock (or locks, if we're holding them all).
*/
static void
-WakeupWaiters(XLogRecPtr EndPos)
+WALInsertLockRelease(void)
{
- volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
- PGPROC *head;
- PGPROC *proc;
- PGPROC *next;
-
- /*
- * If we have already reported progress up to the same point, do nothing.
- * No other process can modify xlogInsertingAt, so we can check this before
- * grabbing the spinlock.
- */
- if (slot->xlogInsertingAt == EndPos)
- return;
- /* xlogInsertingAt should not go backwards */
- Assert(slot->xlogInsertingAt < EndPos);
-
- /* Acquire mutex. Time spent holding mutex should be short! */
- SpinLockAcquire(&slot->mutex);
-
- /* we should own the slot */
- Assert(slot->exclusive == 1 && slot->owner == MyProc);
-
- slot->xlogInsertingAt = EndPos;
-
- /*
- * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
- * up. They are always in the front of the queue.
- */
- head = slot->head;
-
- if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
+ if (holdingAllLocks)
{
- proc = head;
- next = proc->lwWaitLink;
- while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
- {
- proc = next;
- next = next->lwWaitLink;
- }
+ int i;
+
+ for (i = 0; i < num_xloginsert_locks; i++)
+ LWLockRelease(&WALInsertLocks[i].l.lock);
- /* proc is now the last PGPROC to be released */
- slot->head = next;
- proc->lwWaitLink = NULL;
+ holdingAllLocks = false;
}
else
- head = NULL;
-
- /* We are done updating shared state of the lock itself. */
- SpinLockRelease(&slot->mutex);
-
- /*
- * Awaken any waiters I removed from the queue.
- */
- while (head != NULL)
{
- proc = head;
- head = proc->lwWaitLink;
- proc->lwWaitLink = NULL;
- proc->lwWaiting = false;
- PGSemaphoreUnlock(&proc->sem);
+ LWLockRelease(&WALInsertLocks[MyLockNo].l.lock);
}
}
/*
- * Release our insertion slot (or slots, if we're holding them all).
+ * Update our insertingAt value, to let others know that we've finished
+ * inserting up to that point.
*/
static void
-WALInsertSlotRelease(void)
+WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
{
- int i;
-
- if (holdingAllSlots)
+ if (holdingAllLocks)
{
- for (i = 0; i < num_xloginsert_slots; i++)
- WALInsertSlotReleaseOne(i);
- holdingAllSlots = false;
+ /*
+ * We use the last lock to mark our actual position, see comments in
+ * WALInsertLockAcquireExclusive.
+ */
+ LWLockUpdateVar(&WALInsertLocks[num_xloginsert_locks - 1].l.lock,
+ &WALInsertLocks[num_xloginsert_locks - 1].l.insertingAt,
+ insertingAt);
}
else
- WALInsertSlotReleaseOne(MySlotNo);
-}
-
-static void
-WALInsertSlotReleaseOne(int slotno)
-{
- volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot;
- PGPROC *head;
- PGPROC *proc;
-
- /* Acquire mutex. Time spent holding mutex should be short! */
- SpinLockAcquire(&slot->mutex);
-
- /* we must be holding it */
- Assert(slot->exclusive == 1 && slot->owner == MyProc);
-
- slot->xlogInsertingAt = InvalidXLogRecPtr;
-
- /* Release my hold on the slot */
- slot->exclusive = 0;
- slot->owner = NULL;
-
- /*
- * See if I need to awaken any waiters..
- */
- head = slot->head;
- if (head != NULL)
- {
- if (slot->releaseOK)
- {
- /*
- * Remove the to-be-awakened PGPROCs from the queue.
- */
- bool releaseOK = true;
-
- proc = head;
-
- /*
- * First wake up any backends that want to be woken up without
- * acquiring the lock. These are always in the front of the queue.
- */
- while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
- proc = proc->lwWaitLink;
-
- /*
- * Awaken the first exclusive-waiter, if any.
- */
- if (proc->lwWaitLink)
- {
- Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE);
- proc = proc->lwWaitLink;
- releaseOK = false;
- }
- /* proc is now the last PGPROC to be released */
- slot->head = proc->lwWaitLink;
- proc->lwWaitLink = NULL;
-
- slot->releaseOK = releaseOK;
- }
- else
- head = NULL;
- }
-
- /* We are done updating shared state of the slot itself. */
- SpinLockRelease(&slot->mutex);
-
- /*
- * Awaken any waiters I removed from the queue.
- */
- while (head != NULL)
- {
- proc = head;
- head = proc->lwWaitLink;
- proc->lwWaitLink = NULL;
- proc->lwWaiting = false;
- PGSemaphoreUnlock(&proc->sem);
- }
-
- /*
- * Now okay to allow cancel/die interrupts.
- */
- END_CRIT_SECTION();
+ LWLockUpdateVar(&WALInsertLocks[MyLockNo].l.lock,
+ &WALInsertLocks[MyLockNo].l.insertingAt,
+ insertingAt);
}
-
/*
* Wait for any WAL insertions < upto to finish.
*
}
/*
+ * Loop through all the locks, sleeping on any in-progress insert older
+ * than 'upto'.
+ *
* finishedUpto is our return value, indicating the point upto which
* all the WAL insertions have been finished. Initialize it to the head
- * of reserved WAL, and as we iterate through the insertion slots, back it
+ * of reserved WAL, and as we iterate through the insertion locks, back it
* out for any insertion that's still in progress.
*/
finishedUpto = reservedUpto;
-
- /*
- * Loop through all the slots, sleeping on any in-progress insert older
- * than 'upto'.
- */
- for (i = 0; i < num_xloginsert_slots; i++)
+ for (i = 0; i < num_xloginsert_locks; i++)
{
- volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
- XLogRecPtr insertingat;
-
- retry:
- /*
- * We can check if the slot is in use without grabbing the spinlock.
- * The spinlock acquisition of insertpos_lck before this loop acts
- * as a memory barrier. If someone acquires the slot after that, it
- * can't possibly be inserting to anything < reservedUpto. If it was
- * acquired before that, an unlocked test will return true.
- */
- if (!slot->exclusive)
- continue;
-
- SpinLockAcquire(&slot->mutex);
- /* re-check now that we have the lock */
- if (!slot->exclusive)
- {
- SpinLockRelease(&slot->mutex);
- continue;
- }
- insertingat = slot->xlogInsertingAt;
- SpinLockRelease(&slot->mutex);
-
- if (insertingat == InvalidXLogRecPtr)
+ XLogRecPtr insertingat = InvalidXLogRecPtr;
+ do
{
/*
- * slot is reserved just to hold off other inserters, there is no
- * actual insert in progress.
+ * See if this insertion is in progress. LWLockWait will wait for
+ * the lock to be released, or for the 'value' to be set by a
+ * LWLockUpdateVar call. When a lock is initially acquired, its
+ * value is 0 (InvalidXLogRecPtr), which means that we don't know
+ * where it's inserting yet. We will have to wait for it. If
+ * it's a small insertion, the record will most likely fit on the
+ * same page and the inserter will release the lock without ever
+ * calling LWLockUpdateVar. But if it has to sleep, it will
+ * advertise the insertion point with LWLockUpdateVar before
+ * sleeping.
*/
- continue;
- }
+ if (LWLockWaitForVar(&WALInsertLocks[i].l.lock,
+ &WALInsertLocks[i].l.insertingAt,
+ insertingat, &insertingat))
+ {
+ /* the lock was free, so no insertion in progress */
+ insertingat = InvalidXLogRecPtr;
+ break;
+ }
- /*
- * This insertion is still in progress. Do we need to wait for it?
- *
- * When an inserter acquires a slot, it doesn't reset 'insertingat', so
- * it will initially point to the old value of some already-finished
- * insertion. The inserter will update the value as soon as it finishes
- * the insertion, moves to the next page, or has to do I/O to flush an
- * old dirty buffer. That means that when we see a slot with
- * insertingat value < upto, we don't know if that insertion is still
- * truly in progress, or if the slot is reused by a new inserter that
- * hasn't updated the insertingat value yet. We have to assume it's the
- * latter, and wait.
- */
- if (insertingat < upto)
- {
- WaitOnSlot(slot, insertingat);
- goto retry;
- }
- else
- {
/*
- * We don't need to wait for this insertion, but update the
- * return value.
+ * This insertion is still in progress. Have to wait, unless the
+ * inserter has proceeded past 'upto'.
*/
- if (insertingat < finishedUpto)
- finishedUpto = insertingat;
- }
+ } while (insertingat < upto);
+
+ if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
+ finishedUpto = insertingat;
}
return finishedUpto;
}
*
* The caller must ensure that the page containing the requested location
* isn't evicted yet, and won't be evicted. The way to ensure that is to
- * hold onto an XLogInsertSlot with the xlogInsertingAt position set to
- * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs
+ * hold onto a WAL insertion lock with the insertingAt position set to
+ * something <= ptr. GetXLogBuffer() will update insertingAt if it needs
* to evict an old page from the buffer. (This means that once you call
* GetXLogBuffer() with a given 'ptr', you must not access anything before
* that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
* Let others know that we're finished inserting the record up
* to the page boundary.
*/
- WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ);
+ WALInsertLockUpdateInsertingAt(expectedEndPtr - XLOG_BLCKSZ);
AdvanceXLInsertBuffer(ptr, false);
endptr = XLogCtl->xlblocks[idx];
/* XLogCtl */
size = sizeof(XLogCtlData);
- /* xlog insertion slots, plus alignment */
- size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1));
+ /* WAL insertion locks, plus alignment */
+ size = add_size(size, mul_size(sizeof(WALInsertLockPadded), num_xloginsert_locks + 1));
/* xlblocks array */
size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
/* extra alignment padding for XLOG I/O buffers */
memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
- /* Xlog insertion slots. Ensure they're aligned to the full padded size */
- allocptr += sizeof(XLogInsertSlotPadded) -
- ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded);
- XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr;
- allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots;
+
+ /* WAL insertion locks. Ensure they're aligned to the full padded size */
+ allocptr += sizeof(WALInsertLockPadded) -
+ ((uintptr_t) allocptr) % sizeof(WALInsertLockPadded);
+ WALInsertLocks = XLogCtl->Insert.WALInsertLocks =
+ (WALInsertLockPadded *) allocptr;
+ allocptr += sizeof(WALInsertLockPadded) * num_xloginsert_locks;
+
+ XLogCtl->Insert.WALInsertLockTrancheId = LWLockNewTrancheId();
+
+ XLogCtl->Insert.WALInsertLockTranche.name = "WALInsertLocks";
+ XLogCtl->Insert.WALInsertLockTranche.array_base = WALInsertLocks;
+ XLogCtl->Insert.WALInsertLockTranche.array_stride = sizeof(WALInsertLockPadded);
+
+ LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId, &XLogCtl->Insert.WALInsertLockTranche);
+ for (i = 0; i < num_xloginsert_locks; i++)
+ {
+ LWLockInitialize(&WALInsertLocks[i].l.lock,
+ XLogCtl->Insert.WALInsertLockTrancheId);
+ WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr;
+ }
/*
* Align the start of the page buffers to a full xlog block size boundary.
XLogCtl->SharedHotStandbyActive = false;
XLogCtl->WalWriterSleeping = false;
- for (i = 0; i < num_xloginsert_slots; i++)
- {
- XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
- SpinLockInit(&slot->mutex);
- slot->xlogInsertingAt = InvalidXLogRecPtr;
- slot->owner = NULL;
-
- slot->releaseOK = true;
- slot->exclusive = 0;
- slot->head = NULL;
- slot->tail = NULL;
- }
-
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck);
SpinLockInit(&XLogCtl->ulsn_lck);
ThisTimeLineID = XLogCtl->ThisTimeLineID;
Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode());
+ /* Initialize our copy of WALInsertLocks and register the tranche */
+ WALInsertLocks = XLogCtl->Insert.WALInsertLocks;
+ LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId,
+ &XLogCtl->Insert.WALInsertLockTranche);
+
/* Use GetRedoRecPtr to copy the RedoRecPtr safely */
(void) GetRedoRecPtr();
}
/*
* The possibly not up-to-date copy in XlogCtl is enough. Even if we
- * grabbed a WAL insertion slot to read the master copy, someone might
+ * grabbed a WAL insertion lock to read the master copy, someone might
* update it just after we've released the lock.
*/
SpinLockAcquire(&xlogctl->info_lck);
*
* NOTE: The value *actually* returned is the position of the last full
* xlog page. It lags behind the real insert position by at most 1 page.
- * For that, we don't need to scan through WAL insertion slots, and an
+ * For that, we don't need to scan through WAL insertion locks, and an
* approximation is enough for the current usage of this function.
*/
XLogRecPtr
* We must block concurrent insertions while examining insert state to
* determine the checkpoint REDO pointer.
*/
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
/*
MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
{
- WALInsertSlotRelease();
+ WALInsertLockRelease();
LWLockRelease(CheckpointLock);
END_CRIT_SECTION();
return;
/*
* Here we update the shared RedoRecPtr for future XLogInsert calls; this
- * must be done while holding the insertion slots.
+ * must be done while holding all the insertion locks.
*
* Note: if we fail to complete the checkpoint, RedoRecPtr will be left
* pointing past where it really needs to point. This is okay; the only
RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
/*
- * Now we can release the WAL insertion slots, allowing other xacts to
+ * Now we can release the WAL insertion locks, allowing other xacts to
* proceed while we are flushing disk buffers.
*/
- WALInsertSlotRelease();
+ WALInsertLockRelease();
/* Update the info_lck-protected copy of RedoRecPtr as well */
SpinLockAcquire(&xlogctl->info_lck);
* we wait till he's out of his commit critical section before proceeding.
* See notes in RecordTransactionCommit().
*
- * Because we've already released the insertion slots, this test is a bit
+ * Because we've already released the insertion locks, this test is a bit
* fuzzy: it is possible that we will wait for xacts we didn't really need
* to wait for. But the delay should be short and it seems better to make
* checkpoint take a bit longer than to hold off insertions longer than
xlrec.end_time = time(NULL);
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
xlrec.ThisTimeLineID = ThisTimeLineID;
xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID;
- WALInsertSlotRelease();
+ WALInsertLockRelease();
LocalSetXLogInsertAllowed();
* during recovery this is just pro forma, because no WAL insertions are
* happening.
*/
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
- WALInsertSlotRelease();
+ WALInsertLockRelease();
/* Also update the info_lck-protected copy */
SpinLockAcquire(&xlogctl->info_lck);
*/
if (fullPageWrites)
{
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
Insert->fullPageWrites = true;
- WALInsertSlotRelease();
+ WALInsertLockRelease();
}
/*
if (!fullPageWrites)
{
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
Insert->fullPageWrites = false;
- WALInsertSlotRelease();
+ WALInsertLockRelease();
}
END_CRIT_SECTION();
}
* Note that forcePageWrites has no effect during an online backup from
* the standby.
*
- * We must hold all the insertion slots to change the value of
+ * We must hold all the insertion locks to change the value of
* forcePageWrites, to ensure adequate interlocking against XLogInsert().
*/
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
if (exclusive)
{
if (XLogCtl->Insert.exclusiveBackup)
{
- WALInsertSlotRelease();
+ WALInsertLockRelease();
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("a backup is already in progress"),
else
XLogCtl->Insert.nonExclusiveBackups++;
XLogCtl->Insert.forcePageWrites = true;
- WALInsertSlotRelease();
+ WALInsertLockRelease();
/* Ensure we release forcePageWrites if fail below */
PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
* taking a checkpoint right after another is not that expensive
* either because only few buffers have been dirtied yet.
*/
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
if (XLogCtl->Insert.lastBackupStart < startpoint)
{
XLogCtl->Insert.lastBackupStart = startpoint;
gotUniqueStartpoint = true;
}
- WALInsertSlotRelease();
+ WALInsertLockRelease();
} while (!gotUniqueStartpoint);
XLByteToSeg(startpoint, _logSegNo);
bool exclusive = DatumGetBool(arg);
/* Update backup counters and forcePageWrites on failure */
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
if (exclusive)
{
Assert(XLogCtl->Insert.exclusiveBackup);
{
XLogCtl->Insert.forcePageWrites = false;
}
- WALInsertSlotRelease();
+ WALInsertLockRelease();
}
/*
/*
* OK to update backup counters and forcePageWrites
*/
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
if (exclusive)
XLogCtl->Insert.exclusiveBackup = false;
else
{
XLogCtl->Insert.forcePageWrites = false;
}
- WALInsertSlotRelease();
+ WALInsertLockRelease();
if (exclusive)
{
void
do_pg_abort_backup(void)
{
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
XLogCtl->Insert.nonExclusiveBackups--;
{
XLogCtl->Insert.forcePageWrites = false;
}
- WALInsertSlotRelease();
+ WALInsertLockRelease();
}
/*
* locking should be done with the full lock manager --- which depends on
* LWLocks to protect its shared state.
*
+ * In addition to exclusive and shared modes, lightweight locks can be used
+ * to wait until a variable changes value. The variable is initially set
+ * when the lock is acquired with LWLockAcquireWithVar, and can be updated
+ * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
+ * waits for the variable to be updated, or until the lock is free. The
+ * meaning of the variable is up to the caller, the lightweight lock code
+ * just assigns and compares it.
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
static int lock_addin_request = 0;
static bool lock_addin_request_allowed = true;
+static bool LWLockAcquireCommon(LWLock *l, LWLockMode mode, uint64 *valptr,
+ uint64 val);
+
#ifdef LWLOCK_STATS
typedef struct lwlock_stats_key
{
/*
* LWLockAcquire - acquire a lightweight lock in the specified mode
*
- * If the lock is not available, sleep until it is.
+ * If the lock is not available, sleep until it is. Returns true if the lock
+ * was available immediately, false if we had to sleep.
*
* Side effect: cancel/die interrupts are held off until lock release.
*/
-void
+bool
LWLockAcquire(LWLock *l, LWLockMode mode)
+{
+ return LWLockAcquireCommon(l, mode, NULL, 0);
+}
+
+/*
+ * LWLockAcquireWithVar - like LWLockAcquire, but also sets *valptr = val
+ *
+ * The lock is always acquired in exclusive mode with this function.
+ */
+bool
+LWLockAcquireWithVar(LWLock *l, uint64 *valptr, uint64 val)
+{
+ return LWLockAcquireCommon(l, LW_EXCLUSIVE, valptr, val);
+}
+
+/* internal function to implement LWLockAcquire and LWLockAcquireWithVar */
+static bool
+LWLockAcquireCommon(LWLock *l, LWLockMode mode, uint64 *valptr, uint64 val)
{
volatile LWLock *lock = l;
PGPROC *proc = MyProc;
bool retry = false;
+ bool result = true;
int extraWaits = 0;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
/* Now loop back and try to acquire lock again. */
retry = true;
+ result = false;
}
+ /* If there's a variable associated with this lock, initialize it */
+ if (valptr)
+ *valptr = val;
+
/* We are done updating shared state of the lock itself. */
SpinLockRelease(&lock->mutex);
*/
while (extraWaits-- > 0)
PGSemaphoreUnlock(&proc->sem);
+
+ return result;
}
/*
return !mustwait;
}
+/*
+ * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
+ *
+ * If the lock is held and *valptr equals oldval, waits until the lock is
+ * either freed, or the lock holder updates *valptr by calling
+ * LWLockUpdateVar. If the lock is free on exit (immediately or after
+ * waiting), returns true. If the lock is still held, but *valptr no longer
+ * matches oldval, returns false and sets *newval to the current value in
+ * *valptr.
+ *
+ * It's possible that the lock holder releases the lock, but another backend
+ * acquires it again before we get a chance to observe that the lock was
+ * momentarily released. We wouldn't need to wait for the new lock holder,
+ * but we cannot distinguish that case, so we will have to wait.
+ *
+ * Note: this function ignores shared lock holders; if the lock is held
+ * in shared mode, returns 'true'.
+ */
+bool
+LWLockWaitForVar(LWLock *l, uint64 *valptr, uint64 oldval, uint64 *newval)
+{
+ volatile LWLock *lock = l;
+ volatile uint64 *valp = valptr;
+ PGPROC *proc = MyProc;
+ int extraWaits = 0;
+ bool result = false;
+
+ /*
+ * Quick test first to see if it the slot is free right now.
+ *
+ * XXX: the caller uses a spinlock before this, so we don't need a memory
+ * barrier here as far as the current usage is concerned. But that might
+ * not be safe in general.
+ */
+ if (lock->exclusive == 0)
+ return true;
+
+ /*
+ * Lock out cancel/die interrupts while we sleep on the lock. There is
+ * no cleanup mechanism to remove us from the wait queue if we got
+ * interrupted.
+ */
+ HOLD_INTERRUPTS();
+
+ /*
+ * Loop here to check the lock's status after each time we are signaled.
+ */
+ for (;;)
+ {
+ bool mustwait;
+ uint64 value;
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+#ifdef LWLOCK_STATS
+ lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+ SpinLockAcquire(&lock->mutex);
+#endif
+
+ /* Is the lock now free, and if not, does the value match? */
+ if (lock->exclusive == 0)
+ {
+ result = true;
+ mustwait = false;
+ }
+ else
+ {
+ value = *valp;
+ if (value != oldval)
+ {
+ result = false;
+ mustwait = false;
+ *newval = value;
+ }
+ else
+ mustwait = true;
+ }
+
+ if (!mustwait)
+ break; /* the lock was free or value didn't match */
+
+ /*
+ * Add myself to wait queue.
+ */
+ proc->lwWaiting = true;
+ proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+ proc->lwWaitLink = NULL;
+
+ /* waiters are added to the front of the queue */
+ proc->lwWaitLink = lock->head;
+ if (lock->head == NULL)
+ lock->tail = proc;
+ lock->head = proc;
+
+ /* Can release the mutex now */
+ SpinLockRelease(&lock->mutex);
+
+ /*
+ * Wait until awakened.
+ *
+ * Since we share the process wait semaphore with the regular lock
+ * manager and ProcWaitForSignal, and we may need to acquire an LWLock
+ * while one of those is pending, it is possible that we get awakened
+ * for a reason other than being signaled by LWLockRelease. If so,
+ * loop back and wait again. Once we've gotten the LWLock,
+ * re-increment the sema by the number of additional signals received,
+ * so that the lock manager or signal manager will see the received
+ * signal when it next waits.
+ */
+ LOG_LWDEBUG("LWLockWaitForVar", T_NAME(l), T_ID(l), "waiting");
+
+#ifdef LWLOCK_STATS
+ lwstats->block_count++;
+#endif
+
+ TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode);
+
+ for (;;)
+ {
+ /* "false" means cannot accept cancel/die interrupt here. */
+ PGSemaphoreLock(&proc->sem, false);
+ if (!proc->lwWaiting)
+ break;
+ extraWaits++;
+ }
+
+ TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode);
+
+ LOG_LWDEBUG("LWLockWaitForVar", T_NAME(l), T_ID(l), "awakened");
+
+ /* Now loop back and check the status of the lock again. */
+ }
+
+ /* We are done updating shared state of the lock itself. */
+ SpinLockRelease(&lock->mutex);
+
+ TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(l), T_ID(l), mode);
+
+ /*
+ * Fix the process wait semaphore's count for any absorbed wakeups.
+ */
+ while (extraWaits-- > 0)
+ PGSemaphoreUnlock(&proc->sem);
+
+ /*
+ * Now okay to allow cancel/die interrupts.
+ */
+ RESUME_INTERRUPTS();
+
+ return result;
+}
+
+
+/*
+ * LWLockUpdateVar - Update a variable and wake up waiters atomically
+ *
+ * Sets *valptr to 'val', and wakes up all processes waiting for us with
+ * LWLockWaitForVar(). Setting the value and waking up the processes happen
+ * atomically so that any process calling LWLockWaitForVar() on the same lock
+ * is guaranteed to see the new value, and act accordingly.
+ *
+ * The caller must be holding the lock in exclusive mode.
+ */
+void
+LWLockUpdateVar(LWLock *l, uint64 *valptr, uint64 val)
+{
+ volatile LWLock *lock = l;
+ volatile uint64 *valp = valptr;
+ PGPROC *head;
+ PGPROC *proc;
+ PGPROC *next;
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&lock->mutex);
+
+ /* we should hold the lock */
+ Assert(lock->exclusive == 1);
+
+ /* Update the lock's value */
+ *valp = val;
+
+ /*
+ * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
+ * up. They are always in the front of the queue.
+ */
+ head = lock->head;
+
+ if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
+ {
+ proc = head;
+ next = proc->lwWaitLink;
+ while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
+ {
+ proc = next;
+ next = next->lwWaitLink;
+ }
+
+ /* proc is now the last PGPROC to be released */
+ lock->head = next;
+ proc->lwWaitLink = NULL;
+ }
+ else
+ head = NULL;
+
+ /* We are done updating shared state of the lock itself. */
+ SpinLockRelease(&lock->mutex);
+
+ /*
+ * Awaken any waiters I removed from the queue.
+ */
+ while (head != NULL)
+ {
+ proc = head;
+ head = proc->lwWaitLink;
+ proc->lwWaitLink = NULL;
+ proc->lwWaiting = false;
+ PGSemaphoreUnlock(&proc->sem);
+ }
+}
+
+
/*
* LWLockRelease - release a previously acquired lock
*/