From 68a2e52bbaf98f136a96b3a0d734ca52ca440a95 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 21 Mar 2014 15:06:08 +0100 Subject: [PATCH] Replace the XLogInsert slots with regular LWLocks. The special feature the XLogInsert slots had over regular LWLocks is the insertingAt value that was updated atomically with releasing backends waiting on it. Add new functions to the LWLock API to do that, and replace the slots with LWLocks. This reduces the amount of duplicated code. (There's still some duplication, but at least it's all in lwlock.c now.) Reviewed by Andres Freund. --- src/backend/access/transam/xlog.c | 821 +++++++++--------------------- src/backend/storage/lmgr/lwlock.c | 262 +++++++++- src/backend/utils/misc/guc.c | 6 +- src/include/access/xlog.h | 2 +- src/include/storage/lwlock.h | 6 +- 5 files changed, 502 insertions(+), 595 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index a2577314bc..f9d6bf4ce5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -89,7 +89,7 @@ int sync_method = DEFAULT_SYNC_METHOD; int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ -int num_xloginsert_slots = 8; +int num_xloginsert_locks = 8; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -292,7 +292,7 @@ XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; * (which is almost but not quite the same as a pointer to the most recent * CHECKPOINT record). We update this from the shared-memory copy, * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we - * hold an insertion slot). See XLogInsert for details. We are also allowed + * hold an insertion lock). See XLogInsert for details. We are also allowed * to update from XLogCtl->RedoRecPtr if we hold the info_lck; * see GetRedoRecPtr. A freshly spawned backend obtains the value during * InitXLOGAccess. @@ -364,63 +364,51 @@ typedef struct XLogwrtResult XLogRecPtr Flush; /* last byte + 1 flushed */ } XLogwrtResult; - /* - * A slot for inserting to the WAL. This is similar to an LWLock, the main - * difference is that there is an extra xlogInsertingAt field that is protected - * by the same mutex. Unlike an LWLock, a slot can only be acquired in - * exclusive mode. - * - * The xlogInsertingAt field is used to advertise to other processes how far - * the slot owner has progressed in inserting the record. When a backend - * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't - * yet know where it's going to insert the record. That's conservative - * but correct; the new insertion is certainly going to go to a byte position - * greater than 1. If another backend needs to flush the WAL, it will have to - * wait for the new insertion. xlogInsertingAt is updated after finishing the - * insert or when crossing a page boundary, which will wake up anyone waiting - * for it, whether the wait was necessary in the first place or not. - * - * A process can wait on a slot in two modes: LW_EXCLUSIVE or - * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is - * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes - * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is - * released, or xlogInsertingAt is updated. In other words, a process in - * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress - * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to - * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end. - * - * To join the wait queue, a process must set MyProc->lwWaitMode to the mode - * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head - * or tail of the wait queue. The same mechanism is used to wait on an LWLock, - * see lwlock.c for details. + * Inserting to WAL is protected by a small fixed number of WAL insertion + * locks. To insert to the WAL, you must hold one of the locks - it doesn't + * matter which one. To lock out other concurrent insertions, you must hold + * of them. Each WAL insertion lock consists of a lightweight lock, plus an + * indicator of how far the insertion has progressed (insertingAt). + * + * The insertingAt values are read when a process wants to flush WAL from + * the in-memory buffers to disk, to check that all the insertions to the + * region the process is about to write out have finished. You could simply + * wait for all currently in-progress insertions to finish, but the + * insertingAt indicator allows you to ignore insertions to later in the WAL, + * so that you only wait for the insertions that are modifying the buffers + * you're about to write out. + * + * This isn't just an optimization. If all the WAL buffers are dirty, an + * inserter that's holding a WAL insert lock might need to evict an old WAL + * buffer, which requires flushing the WAL. If it's possible for an inserter + * to block on another inserter unnecessarily, deadlock can arise when two + * inserters holding a WAL insert lock wait for each other to finish their + * insertion. + * + * Small WAL records that don't cross a page boundary never update the value, + * the WAL record is just copied to the page and the lock is released. But + * to avoid the deadlock-scenario explained above, the indicator is always + * updated before sleeping while holding an insertion lock. */ typedef struct { - slock_t mutex; /* protects the below fields */ - XLogRecPtr xlogInsertingAt; /* insert has completed up to this point */ - - PGPROC *owner; /* for debugging purposes */ - - bool releaseOK; /* T if ok to release waiters */ - char exclusive; /* # of exclusive holders (0 or 1) */ - PGPROC *head; /* head of list of waiting PGPROCs */ - PGPROC *tail; /* tail of list of waiting PGPROCs */ - /* tail is undefined when head is NULL */ -} XLogInsertSlot; + LWLock lock; + XLogRecPtr insertingAt; +} WALInsertLock; /* - * All the slots are allocated as an array in shared memory. We force the - * array stride to be a power of 2, which saves a few cycles in indexing, but - * more importantly also ensures that individual slots don't cross cache line - * boundaries. (Of course, we have to also ensure that the array start - * address is suitably aligned.) + * All the WAL insertion locks are allocated as an array in shared memory. We + * force the array stride to be a power of 2, which saves a few cycles in + * indexing, but more importantly also ensures that individual slots don't + * cross cache line boundaries. (Of course, we have to also ensure that the + * array start address is suitably aligned.) */ -typedef union XLogInsertSlotPadded +typedef union WALInsertLockPadded { - XLogInsertSlot slot; + WALInsertLock l; char pad[CACHE_LINE_SIZE]; -} XLogInsertSlotPadded; +} WALInsertLockPadded; /* * Shared state data for XLogInsert. @@ -455,8 +443,8 @@ typedef struct XLogCtlInsert * we must WAL-log it before it actually affects WAL-logging by backends. * Checkpointer sets at startup or after SIGHUP. * - * To read these fields, you must hold an insertion slot. To modify them, - * you must hold ALL the slots. + * To read these fields, you must hold an insertion lock. To modify them, + * you must hold ALL the locks. */ XLogRecPtr RedoRecPtr; /* current redo point for insertions */ bool forcePageWrites; /* forcing full-page writes for PITR? */ @@ -473,8 +461,12 @@ typedef struct XLogCtlInsert int nonExclusiveBackups; XLogRecPtr lastBackupStart; - /* insertion slots, see XLogInsertSlot struct above for details */ - XLogInsertSlotPadded *insertSlots; + /* + * WAL insertion locks. + */ + WALInsertLockPadded *WALInsertLocks; + LWLockTranche WALInsertLockTranche; + int WALInsertLockTrancheId; } XLogCtlInsert; /* @@ -612,6 +604,9 @@ typedef struct XLogCtlData static XLogCtlData *XLogCtl = NULL; +/* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */ +static WALInsertLockPadded *WALInsertLocks = NULL; + /* * We maintain an image of pg_control in shared memory. */ @@ -735,9 +730,9 @@ static bool InRedo = false; /* Have we launched bgwriter during recovery? */ static bool bgwriterLaunched = false; -/* For WALInsertSlotAcquire/Release functions */ -static int MySlotNo = 0; -static bool holdingAllSlots = false; +/* For WALInsertLockAcquire/Release functions */ +static int MyLockNo = 0; +static bool holdingAllLocks = false; static void readRecoveryCommandFile(void); static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo); @@ -811,16 +806,15 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); -static void WakeupWaiters(XLogRecPtr EndPos); static char *GetXLogBuffer(XLogRecPtr ptr); static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos); static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos); static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr); -static void WALInsertSlotAcquire(bool exclusive); -static void WALInsertSlotAcquireOne(int slotno); -static void WALInsertSlotRelease(void); -static void WALInsertSlotReleaseOne(int slotno); +static void WALInsertLockAcquire(void); +static void WALInsertLockAcquireExclusive(void); +static void WALInsertLockRelease(void); +static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt); /* * Insert an XLOG record having the specified RMID and info bytes, @@ -897,7 +891,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) * * We may have to loop back to here if a race condition is detected below. * We could prevent the race by doing all this work while holding an - * insertion slot, but it seems better to avoid doing CRC calculations + * insertion lock, but it seems better to avoid doing CRC calculations * while holding one. * * We add entries for backup blocks to the chain, so that they don't need @@ -915,8 +909,8 @@ begin:; /* * Decide if we need to do full-page writes in this XLOG record: true if * full_page_writes is on or we have a PITR request for it. Since we - * don't yet have an insertion slot, fullPageWrites and forcePageWrites - * could change under us, but we'll recheck them once we have a slot. + * don't yet have an insertion lock, fullPageWrites and forcePageWrites + * could change under us, but we'll recheck them once we have a lock. */ doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites; @@ -1090,16 +1084,15 @@ begin:; * record in place. This can be done concurrently in multiple processes. * * To keep track of which insertions are still in-progress, each concurrent - * inserter allocates an "insertion slot", which tells others how far the - * inserter has progressed. There is a small fixed number of insertion - * slots, determined by the num_xloginsert_slots GUC. When an inserter - * finishes, it updates the xlogInsertingAt of its slot to the end of the - * record it inserted, to let others know that it's done. xlogInsertingAt - * is also updated when crossing over to a new WAL buffer, to allow the - * the previous buffer to be flushed. + * inserter acquires an insertion lock. In addition to just indicating that + * an insertion is in progress, the lock tells others how far the inserter + * has progressed. There is a small fixed number of insertion locks, + * determined by the num_xloginsert_locks GUC. When an inserter crosses a + * page boundary, it updates the value stored in the lock to the how far it + * has inserted, to allow the the previous buffer to be flushed. * - * Holding onto a slot also protects RedoRecPtr and fullPageWrites from - * changing until the insertion is finished. + * Holding onto an insertion lock also protects RedoRecPtr and + * fullPageWrites from changing until the insertion is finished. * * Step 2 can usually be done completely in parallel. If the required WAL * page is not initialized yet, you have to grab WALBufMappingLock to @@ -1109,7 +1102,10 @@ begin:; *---------- */ START_CRIT_SECTION(); - WALInsertSlotAcquire(isLogSwitch); + if (isLogSwitch) + WALInsertLockAcquireExclusive(); + else + WALInsertLockAcquire(); /* * Check to see if my RedoRecPtr is out of date. If so, may have to go @@ -1138,7 +1134,7 @@ begin:; * Oops, this buffer now needs to be backed up, but we * didn't think so above. Start over. */ - WALInsertSlotRelease(); + WALInsertLockRelease(); END_CRIT_SECTION(); rdt_lastnormal->next = NULL; info = info_orig; @@ -1157,7 +1153,7 @@ begin:; if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites) { /* Oops, must redo it with full-page data. */ - WALInsertSlotRelease(); + WALInsertLockRelease(); END_CRIT_SECTION(); rdt_lastnormal->next = NULL; info = info_orig; @@ -1205,7 +1201,7 @@ begin:; /* * Done! Let others know that we're finished. */ - WALInsertSlotRelease(); + WALInsertLockRelease(); MarkCurrentTransactionIdLoggedIfAny(); @@ -1366,7 +1362,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) /* * These calculations are a bit heavy-weight to be done while holding a - * spinlock, but since we're holding all the WAL insertion slots, there + * spinlock, but since we're holding all the WAL insertion locks, there * are no other inserters competing for it. GetXLogInsertRecPtr() does * compete for it, but that's not called very frequently. */ @@ -1526,7 +1522,7 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, while (CurrPos < EndPos) { /* initialize the next page (if not initialized already) */ - WakeupWaiters(CurrPos); + WALInsertLockUpdateInsertingAt(CurrPos); AdvanceXLInsertBuffer(CurrPos, false); CurrPos += XLOG_BLCKSZ; } @@ -1537,452 +1533,123 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, } /* - * Allocate a slot for insertion. - * - * In exclusive mode, all slots are reserved for the current process. That - * blocks all concurrent insertions. + * Acquire a WAL insertion lock, for inserting to WAL. */ static void -WALInsertSlotAcquire(bool exclusive) +WALInsertLockAcquire(void) { - int i; - - if (exclusive) - { - for (i = 0; i < num_xloginsert_slots; i++) - WALInsertSlotAcquireOne(i); - holdingAllSlots = true; - } - else - WALInsertSlotAcquireOne(-1); -} - -/* - * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary - * one if slotno == -1. The index of the slot that was acquired is stored in - * MySlotNo. - * - * This is more or less equivalent to LWLockAcquire(). - */ -static void -WALInsertSlotAcquireOne(int slotno) -{ - volatile XLogInsertSlot *slot; - PGPROC *proc = MyProc; - bool retry = false; - int extraWaits = 0; - static int slotToTry = -1; + bool immed; /* - * Try to use the slot we used last time. If the system isn't particularly - * busy, it's a good bet that it's available, and it's good to have some - * affinity to a particular slot so that you don't unnecessarily bounce - * cache lines between processes when there is no contention. + * It doesn't matter which of the WAL insertion locks we acquire, so try + * the one we used last time. If the system isn't particularly busy, + * it's a good bet that it's still available, and it's good to have some + * affinity to a particular lock so that you don't unnecessarily bounce + * cache lines between processes when there's no contention. * - * If this is the first time through in this backend, pick a slot - * (semi-)randomly. This allows the slots to be used evenly if you have a - * lot of very short connections. + * If this is the first time through in this backend, pick a lock + * (semi-)randomly. This allows the locks to be used evenly if you have + * a lot of very short connections. */ - if (slotno != -1) - MySlotNo = slotno; - else - { - if (slotToTry == -1) - slotToTry = MyProc->pgprocno % num_xloginsert_slots; - MySlotNo = slotToTry; - } + static int lockToTry = -1; - /* - * We can't wait if we haven't got a PGPROC. This should only occur - * during bootstrap or shared memory initialization. Put an Assert here - * to catch unsafe coding practices. - */ - Assert(MyProc != NULL); - - /* - * Lock out cancel/die interrupts until we exit the code section protected - * by the slot. This ensures that interrupts will not interfere with - * manipulations of data structures in shared memory. There is no cleanup - * mechanism to release the slot if the backend dies while holding one, - * so make this a critical section. - */ - START_CRIT_SECTION(); + if (lockToTry == -1) + lockToTry = MyProc->pgprocno % num_xloginsert_locks; + MyLockNo = lockToTry; /* - * Loop here to try to acquire slot after each time we are signaled by - * WALInsertSlotRelease. + * The insertingAt value is initially set to 0, as we don't know our + * insert location yet. */ - for (;;) + immed = LWLockAcquireWithVar(&WALInsertLocks[MyLockNo].l.lock, + &WALInsertLocks[MyLockNo].l.insertingAt, + 0); + if (!immed) { - bool mustwait; - - slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot; - - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&slot->mutex); - - /* If retrying, allow WALInsertSlotRelease to release waiters again */ - if (retry) - slot->releaseOK = true; - - /* If I can get the slot, do so quickly. */ - if (slot->exclusive == 0) - { - slot->exclusive++; - mustwait = false; - } - else - mustwait = true; - - if (!mustwait) - break; /* got the lock */ - - Assert(slot->owner != MyProc); - - /* - * Add myself to wait queue. - */ - proc->lwWaiting = true; - proc->lwWaitMode = LW_EXCLUSIVE; - proc->lwWaitLink = NULL; - if (slot->head == NULL) - slot->head = proc; - else - slot->tail->lwWaitLink = proc; - slot->tail = proc; - - /* Can release the mutex now */ - SpinLockRelease(&slot->mutex); - /* - * Wait until awakened. - * - * Since we share the process wait semaphore with the regular lock - * manager and ProcWaitForSignal, and we may need to acquire a slot - * while one of those is pending, it is possible that we get awakened - * for a reason other than being signaled by WALInsertSlotRelease. If - * so, loop back and wait again. Once we've gotten the slot, - * re-increment the sema by the number of additional signals received, - * so that the lock manager or signal manager will see the received - * signal when it next waits. + * If we couldn't get the lock immediately, try another lock next + * time. On a system with more insertion locks than concurrent + * inserters, this causes all the inserters to eventually migrate + * to a lock that no-one else is using. On a system with more + * inserters than locks, it still helps to distribute the inserters + * evenly across the locks. */ - for (;;) - { - /* "false" means cannot accept cancel/die interrupt here. */ - PGSemaphoreLock(&proc->sem, false); - if (!proc->lwWaiting) - break; - extraWaits++; - } - - /* Now loop back and try to acquire lock again. */ - retry = true; + lockToTry = (lockToTry + 1) % num_xloginsert_locks; } - - slot->owner = proc; - - /* - * Normally, we initialize the xlogInsertingAt value of the slot to 1, - * because we don't yet know where in the WAL we're going to insert. It's - * not critical what it points to right now - leaving it to a too small - * value just means that WaitXlogInsertionsToFinish() might wait on us - * unnecessarily, until we update the value (when we finish the insert or - * move to next page). - * - * If we're grabbing all the slots, however, stamp all but the last one - * with InvalidXLogRecPtr, meaning there is no insert in progress. The last - * slot is the one that we will update as we proceed with the insert, the - * rest are held just to keep off other inserters. - */ - if (slotno != -1 && slotno != num_xloginsert_slots - 1) - slot->xlogInsertingAt = InvalidXLogRecPtr; - else - slot->xlogInsertingAt = 1; - - /* We are done updating shared state of the slot itself. */ - SpinLockRelease(&slot->mutex); - - /* - * Fix the process wait semaphore's count for any absorbed wakeups. - */ - while (extraWaits-- > 0) - PGSemaphoreUnlock(&proc->sem); - - /* - * If we couldn't get the slot immediately, try another slot next time. - * On a system with more insertion slots than concurrent inserters, this - * causes all the inserters to eventually migrate to a slot that no-one - * else is using. On a system with more inserters than slots, it still - * causes the inserters to be distributed quite evenly across the slots. - */ - if (slotno != -1 && retry) - slotToTry = (slotToTry + 1) % num_xloginsert_slots; } /* - * Wait for the given slot to become free, or for its xlogInsertingAt location - * to change to something else than 'waitptr'. In other words, wait for the - * inserter using the given slot to finish its insertion, or to at least make - * some progress. + * Acquire all WAL insertion locks, to prevent other backends from inserting + * to WAL. */ static void -WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr) +WALInsertLockAcquireExclusive(void) { - PGPROC *proc = MyProc; - int extraWaits = 0; - - /* - * Lock out cancel/die interrupts while we sleep on the slot. There is - * no cleanup mechanism to remove us from the wait queue if we got - * interrupted. - */ - HOLD_INTERRUPTS(); + int i; /* - * Loop here to try to acquire lock after each time we are signaled. + * When holding all the locks, we only update the last lock's insertingAt + * indicator. The others are set to 0xFFFFFFFFFFFFFFFF, which is higher + * than any real XLogRecPtr value, to make sure that no-one blocks + * waiting on those. */ - for (;;) + for (i = 0; i < num_xloginsert_locks - 1; i++) { - bool mustwait; - - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&slot->mutex); - - /* If I can get the lock, do so quickly. */ - if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr) - mustwait = false; - else - mustwait = true; - - if (!mustwait) - break; /* the lock was free */ - - Assert(slot->owner != MyProc); - - /* - * Add myself to wait queue. - */ - proc->lwWaiting = true; - proc->lwWaitMode = LW_WAIT_UNTIL_FREE; - proc->lwWaitLink = NULL; - - /* waiters are added to the front of the queue */ - proc->lwWaitLink = slot->head; - if (slot->head == NULL) - slot->tail = proc; - slot->head = proc; - - /* Can release the mutex now */ - SpinLockRelease(&slot->mutex); - - /* - * Wait until awakened. - * - * Since we share the process wait semaphore with other things, like - * the regular lock manager and ProcWaitForSignal, and we may need to - * acquire an LWLock while one of those is pending, it is possible that - * we get awakened for a reason other than being signaled by - * LWLockRelease. If so, loop back and wait again. Once we've gotten - * the LWLock, re-increment the sema by the number of additional - * signals received, so that the lock manager or signal manager will - * see the received signal when it next waits. - */ - for (;;) - { - /* "false" means cannot accept cancel/die interrupt here. */ - PGSemaphoreLock(&proc->sem, false); - if (!proc->lwWaiting) - break; - extraWaits++; - } - - /* Now loop back and try to acquire lock again. */ + LWLockAcquireWithVar(&WALInsertLocks[i].l.lock, + &WALInsertLocks[i].l.insertingAt, + UINT64CONST(0xFFFFFFFFFFFFFFFF)); } + LWLockAcquireWithVar(&WALInsertLocks[i].l.lock, + &WALInsertLocks[i].l.insertingAt, + 0); - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&slot->mutex); - - /* - * Fix the process wait semaphore's count for any absorbed wakeups. - */ - while (extraWaits-- > 0) - PGSemaphoreUnlock(&proc->sem); - - /* - * Now okay to allow cancel/die interrupts. - */ - RESUME_INTERRUPTS(); + holdingAllLocks = true; } /* - * Wake up all processes waiting for us with WaitOnSlot(). Sets our - * xlogInsertingAt value to EndPos, without releasing the slot. + * Release our insertion lock (or locks, if we're holding them all). */ static void -WakeupWaiters(XLogRecPtr EndPos) +WALInsertLockRelease(void) { - volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot; - PGPROC *head; - PGPROC *proc; - PGPROC *next; - - /* - * If we have already reported progress up to the same point, do nothing. - * No other process can modify xlogInsertingAt, so we can check this before - * grabbing the spinlock. - */ - if (slot->xlogInsertingAt == EndPos) - return; - /* xlogInsertingAt should not go backwards */ - Assert(slot->xlogInsertingAt < EndPos); - - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&slot->mutex); - - /* we should own the slot */ - Assert(slot->exclusive == 1 && slot->owner == MyProc); - - slot->xlogInsertingAt = EndPos; - - /* - * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken - * up. They are always in the front of the queue. - */ - head = slot->head; - - if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE) + if (holdingAllLocks) { - proc = head; - next = proc->lwWaitLink; - while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE) - { - proc = next; - next = next->lwWaitLink; - } + int i; + + for (i = 0; i < num_xloginsert_locks; i++) + LWLockRelease(&WALInsertLocks[i].l.lock); - /* proc is now the last PGPROC to be released */ - slot->head = next; - proc->lwWaitLink = NULL; + holdingAllLocks = false; } else - head = NULL; - - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&slot->mutex); - - /* - * Awaken any waiters I removed from the queue. - */ - while (head != NULL) { - proc = head; - head = proc->lwWaitLink; - proc->lwWaitLink = NULL; - proc->lwWaiting = false; - PGSemaphoreUnlock(&proc->sem); + LWLockRelease(&WALInsertLocks[MyLockNo].l.lock); } } /* - * Release our insertion slot (or slots, if we're holding them all). + * Update our insertingAt value, to let others know that we've finished + * inserting up to that point. */ static void -WALInsertSlotRelease(void) +WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt) { - int i; - - if (holdingAllSlots) + if (holdingAllLocks) { - for (i = 0; i < num_xloginsert_slots; i++) - WALInsertSlotReleaseOne(i); - holdingAllSlots = false; + /* + * We use the last lock to mark our actual position, see comments in + * WALInsertLockAcquireExclusive. + */ + LWLockUpdateVar(&WALInsertLocks[num_xloginsert_locks - 1].l.lock, + &WALInsertLocks[num_xloginsert_locks - 1].l.insertingAt, + insertingAt); } else - WALInsertSlotReleaseOne(MySlotNo); -} - -static void -WALInsertSlotReleaseOne(int slotno) -{ - volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot; - PGPROC *head; - PGPROC *proc; - - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&slot->mutex); - - /* we must be holding it */ - Assert(slot->exclusive == 1 && slot->owner == MyProc); - - slot->xlogInsertingAt = InvalidXLogRecPtr; - - /* Release my hold on the slot */ - slot->exclusive = 0; - slot->owner = NULL; - - /* - * See if I need to awaken any waiters.. - */ - head = slot->head; - if (head != NULL) - { - if (slot->releaseOK) - { - /* - * Remove the to-be-awakened PGPROCs from the queue. - */ - bool releaseOK = true; - - proc = head; - - /* - * First wake up any backends that want to be woken up without - * acquiring the lock. These are always in the front of the queue. - */ - while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink) - proc = proc->lwWaitLink; - - /* - * Awaken the first exclusive-waiter, if any. - */ - if (proc->lwWaitLink) - { - Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE); - proc = proc->lwWaitLink; - releaseOK = false; - } - /* proc is now the last PGPROC to be released */ - slot->head = proc->lwWaitLink; - proc->lwWaitLink = NULL; - - slot->releaseOK = releaseOK; - } - else - head = NULL; - } - - /* We are done updating shared state of the slot itself. */ - SpinLockRelease(&slot->mutex); - - /* - * Awaken any waiters I removed from the queue. - */ - while (head != NULL) - { - proc = head; - head = proc->lwWaitLink; - proc->lwWaitLink = NULL; - proc->lwWaiting = false; - PGSemaphoreUnlock(&proc->sem); - } - - /* - * Now okay to allow cancel/die interrupts. - */ - END_CRIT_SECTION(); + LWLockUpdateVar(&WALInsertLocks[MyLockNo].l.lock, + &WALInsertLocks[MyLockNo].l.insertingAt, + insertingAt); } - /* * Wait for any WAL insertions < upto to finish. * @@ -2032,79 +1699,49 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) } /* + * Loop through all the locks, sleeping on any in-progress insert older + * than 'upto'. + * * finishedUpto is our return value, indicating the point upto which * all the WAL insertions have been finished. Initialize it to the head - * of reserved WAL, and as we iterate through the insertion slots, back it + * of reserved WAL, and as we iterate through the insertion locks, back it * out for any insertion that's still in progress. */ finishedUpto = reservedUpto; - - /* - * Loop through all the slots, sleeping on any in-progress insert older - * than 'upto'. - */ - for (i = 0; i < num_xloginsert_slots; i++) + for (i = 0; i < num_xloginsert_locks; i++) { - volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot; - XLogRecPtr insertingat; - - retry: - /* - * We can check if the slot is in use without grabbing the spinlock. - * The spinlock acquisition of insertpos_lck before this loop acts - * as a memory barrier. If someone acquires the slot after that, it - * can't possibly be inserting to anything < reservedUpto. If it was - * acquired before that, an unlocked test will return true. - */ - if (!slot->exclusive) - continue; - - SpinLockAcquire(&slot->mutex); - /* re-check now that we have the lock */ - if (!slot->exclusive) - { - SpinLockRelease(&slot->mutex); - continue; - } - insertingat = slot->xlogInsertingAt; - SpinLockRelease(&slot->mutex); - - if (insertingat == InvalidXLogRecPtr) + XLogRecPtr insertingat = InvalidXLogRecPtr; + do { /* - * slot is reserved just to hold off other inserters, there is no - * actual insert in progress. + * See if this insertion is in progress. LWLockWait will wait for + * the lock to be released, or for the 'value' to be set by a + * LWLockUpdateVar call. When a lock is initially acquired, its + * value is 0 (InvalidXLogRecPtr), which means that we don't know + * where it's inserting yet. We will have to wait for it. If + * it's a small insertion, the record will most likely fit on the + * same page and the inserter will release the lock without ever + * calling LWLockUpdateVar. But if it has to sleep, it will + * advertise the insertion point with LWLockUpdateVar before + * sleeping. */ - continue; - } + if (LWLockWaitForVar(&WALInsertLocks[i].l.lock, + &WALInsertLocks[i].l.insertingAt, + insertingat, &insertingat)) + { + /* the lock was free, so no insertion in progress */ + insertingat = InvalidXLogRecPtr; + break; + } - /* - * This insertion is still in progress. Do we need to wait for it? - * - * When an inserter acquires a slot, it doesn't reset 'insertingat', so - * it will initially point to the old value of some already-finished - * insertion. The inserter will update the value as soon as it finishes - * the insertion, moves to the next page, or has to do I/O to flush an - * old dirty buffer. That means that when we see a slot with - * insertingat value < upto, we don't know if that insertion is still - * truly in progress, or if the slot is reused by a new inserter that - * hasn't updated the insertingat value yet. We have to assume it's the - * latter, and wait. - */ - if (insertingat < upto) - { - WaitOnSlot(slot, insertingat); - goto retry; - } - else - { /* - * We don't need to wait for this insertion, but update the - * return value. + * This insertion is still in progress. Have to wait, unless the + * inserter has proceeded past 'upto'. */ - if (insertingat < finishedUpto) - finishedUpto = insertingat; - } + } while (insertingat < upto); + + if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto) + finishedUpto = insertingat; } return finishedUpto; } @@ -2118,8 +1755,8 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) * * The caller must ensure that the page containing the requested location * isn't evicted yet, and won't be evicted. The way to ensure that is to - * hold onto an XLogInsertSlot with the xlogInsertingAt position set to - * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs + * hold onto a WAL insertion lock with the insertingAt position set to + * something <= ptr. GetXLogBuffer() will update insertingAt if it needs * to evict an old page from the buffer. (This means that once you call * GetXLogBuffer() with a given 'ptr', you must not access anything before * that point anymore, and must not call GetXLogBuffer() with an older 'ptr' @@ -2179,7 +1816,7 @@ GetXLogBuffer(XLogRecPtr ptr) * Let others know that we're finished inserting the record up * to the page boundary. */ - WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ); + WALInsertLockUpdateInsertingAt(expectedEndPtr - XLOG_BLCKSZ); AdvanceXLInsertBuffer(ptr, false); endptr = XLogCtl->xlblocks[idx]; @@ -5117,8 +4754,8 @@ XLOGShmemSize(void) /* XLogCtl */ size = sizeof(XLogCtlData); - /* xlog insertion slots, plus alignment */ - size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1)); + /* WAL insertion locks, plus alignment */ + size = add_size(size, mul_size(sizeof(WALInsertLockPadded), num_xloginsert_locks + 1)); /* xlblocks array */ size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers)); /* extra alignment padding for XLOG I/O buffers */ @@ -5166,11 +4803,27 @@ XLOGShmemInit(void) memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers); allocptr += sizeof(XLogRecPtr) * XLOGbuffers; - /* Xlog insertion slots. Ensure they're aligned to the full padded size */ - allocptr += sizeof(XLogInsertSlotPadded) - - ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded); - XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr; - allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots; + + /* WAL insertion locks. Ensure they're aligned to the full padded size */ + allocptr += sizeof(WALInsertLockPadded) - + ((uintptr_t) allocptr) % sizeof(WALInsertLockPadded); + WALInsertLocks = XLogCtl->Insert.WALInsertLocks = + (WALInsertLockPadded *) allocptr; + allocptr += sizeof(WALInsertLockPadded) * num_xloginsert_locks; + + XLogCtl->Insert.WALInsertLockTrancheId = LWLockNewTrancheId(); + + XLogCtl->Insert.WALInsertLockTranche.name = "WALInsertLocks"; + XLogCtl->Insert.WALInsertLockTranche.array_base = WALInsertLocks; + XLogCtl->Insert.WALInsertLockTranche.array_stride = sizeof(WALInsertLockPadded); + + LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId, &XLogCtl->Insert.WALInsertLockTranche); + for (i = 0; i < num_xloginsert_locks; i++) + { + LWLockInitialize(&WALInsertLocks[i].l.lock, + XLogCtl->Insert.WALInsertLockTrancheId); + WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr; + } /* * Align the start of the page buffers to a full xlog block size boundary. @@ -5190,19 +4843,6 @@ XLOGShmemInit(void) XLogCtl->SharedHotStandbyActive = false; XLogCtl->WalWriterSleeping = false; - for (i = 0; i < num_xloginsert_slots; i++) - { - XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot; - SpinLockInit(&slot->mutex); - slot->xlogInsertingAt = InvalidXLogRecPtr; - slot->owner = NULL; - - slot->releaseOK = true; - slot->exclusive = 0; - slot->head = NULL; - slot->tail = NULL; - } - SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); SpinLockInit(&XLogCtl->ulsn_lck); @@ -7925,6 +7565,11 @@ InitXLOGAccess(void) ThisTimeLineID = XLogCtl->ThisTimeLineID; Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode()); + /* Initialize our copy of WALInsertLocks and register the tranche */ + WALInsertLocks = XLogCtl->Insert.WALInsertLocks; + LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId, + &XLogCtl->Insert.WALInsertLockTranche); + /* Use GetRedoRecPtr to copy the RedoRecPtr safely */ (void) GetRedoRecPtr(); } @@ -7943,7 +7588,7 @@ GetRedoRecPtr(void) /* * The possibly not up-to-date copy in XlogCtl is enough. Even if we - * grabbed a WAL insertion slot to read the master copy, someone might + * grabbed a WAL insertion lock to read the master copy, someone might * update it just after we've released the lock. */ SpinLockAcquire(&xlogctl->info_lck); @@ -7961,7 +7606,7 @@ GetRedoRecPtr(void) * * NOTE: The value *actually* returned is the position of the last full * xlog page. It lags behind the real insert position by at most 1 page. - * For that, we don't need to scan through WAL insertion slots, and an + * For that, we don't need to scan through WAL insertion locks, and an * approximation is enough for the current usage of this function. */ XLogRecPtr @@ -8322,7 +7967,7 @@ CreateCheckPoint(int flags) * We must block concurrent insertions while examining insert state to * determine the checkpoint REDO pointer. */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos); /* @@ -8347,7 +7992,7 @@ CreateCheckPoint(int flags) MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) && ControlFile->checkPoint == ControlFile->checkPointCopy.redo) { - WALInsertSlotRelease(); + WALInsertLockRelease(); LWLockRelease(CheckpointLock); END_CRIT_SECTION(); return; @@ -8391,7 +8036,7 @@ CreateCheckPoint(int flags) /* * Here we update the shared RedoRecPtr for future XLogInsert calls; this - * must be done while holding the insertion slots. + * must be done while holding all the insertion locks. * * Note: if we fail to complete the checkpoint, RedoRecPtr will be left * pointing past where it really needs to point. This is okay; the only @@ -8403,10 +8048,10 @@ CreateCheckPoint(int flags) RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo; /* - * Now we can release the WAL insertion slots, allowing other xacts to + * Now we can release the WAL insertion locks, allowing other xacts to * proceed while we are flushing disk buffers. */ - WALInsertSlotRelease(); + WALInsertLockRelease(); /* Update the info_lck-protected copy of RedoRecPtr as well */ SpinLockAcquire(&xlogctl->info_lck); @@ -8436,7 +8081,7 @@ CreateCheckPoint(int flags) * we wait till he's out of his commit critical section before proceeding. * See notes in RecordTransactionCommit(). * - * Because we've already released the insertion slots, this test is a bit + * Because we've already released the insertion locks, this test is a bit * fuzzy: it is possible that we will wait for xacts we didn't really need * to wait for. But the delay should be short and it seems better to make * checkpoint take a bit longer than to hold off insertions longer than @@ -8667,10 +8312,10 @@ CreateEndOfRecoveryRecord(void) xlrec.end_time = time(NULL); - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); xlrec.ThisTimeLineID = ThisTimeLineID; xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID; - WALInsertSlotRelease(); + WALInsertLockRelease(); LocalSetXLogInsertAllowed(); @@ -8856,9 +8501,9 @@ CreateRestartPoint(int flags) * during recovery this is just pro forma, because no WAL insertions are * happening. */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo; - WALInsertSlotRelease(); + WALInsertLockRelease(); /* Also update the info_lck-protected copy */ SpinLockAcquire(&xlogctl->info_lck); @@ -9318,9 +8963,9 @@ UpdateFullPageWrites(void) */ if (fullPageWrites) { - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); Insert->fullPageWrites = true; - WALInsertSlotRelease(); + WALInsertLockRelease(); } /* @@ -9341,9 +8986,9 @@ UpdateFullPageWrites(void) if (!fullPageWrites) { - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); Insert->fullPageWrites = false; - WALInsertSlotRelease(); + WALInsertLockRelease(); } END_CRIT_SECTION(); } @@ -9974,15 +9619,15 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, * Note that forcePageWrites has no effect during an online backup from * the standby. * - * We must hold all the insertion slots to change the value of + * We must hold all the insertion locks to change the value of * forcePageWrites, to ensure adequate interlocking against XLogInsert(). */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); if (exclusive) { if (XLogCtl->Insert.exclusiveBackup) { - WALInsertSlotRelease(); + WALInsertLockRelease(); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("a backup is already in progress"), @@ -9993,7 +9638,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, else XLogCtl->Insert.nonExclusiveBackups++; XLogCtl->Insert.forcePageWrites = true; - WALInsertSlotRelease(); + WALInsertLockRelease(); /* Ensure we release forcePageWrites if fail below */ PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive)); @@ -10108,13 +9753,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, * taking a checkpoint right after another is not that expensive * either because only few buffers have been dirtied yet. */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); if (XLogCtl->Insert.lastBackupStart < startpoint) { XLogCtl->Insert.lastBackupStart = startpoint; gotUniqueStartpoint = true; } - WALInsertSlotRelease(); + WALInsertLockRelease(); } while (!gotUniqueStartpoint); XLByteToSeg(startpoint, _logSegNo); @@ -10204,7 +9849,7 @@ pg_start_backup_callback(int code, Datum arg) bool exclusive = DatumGetBool(arg); /* Update backup counters and forcePageWrites on failure */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); if (exclusive) { Assert(XLogCtl->Insert.exclusiveBackup); @@ -10221,7 +9866,7 @@ pg_start_backup_callback(int code, Datum arg) { XLogCtl->Insert.forcePageWrites = false; } - WALInsertSlotRelease(); + WALInsertLockRelease(); } /* @@ -10290,7 +9935,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) /* * OK to update backup counters and forcePageWrites */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); if (exclusive) XLogCtl->Insert.exclusiveBackup = false; else @@ -10310,7 +9955,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) { XLogCtl->Insert.forcePageWrites = false; } - WALInsertSlotRelease(); + WALInsertLockRelease(); if (exclusive) { @@ -10595,7 +10240,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) void do_pg_abort_backup(void) { - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); Assert(XLogCtl->Insert.nonExclusiveBackups > 0); XLogCtl->Insert.nonExclusiveBackups--; @@ -10604,7 +10249,7 @@ do_pg_abort_backup(void) { XLogCtl->Insert.forcePageWrites = false; } - WALInsertSlotRelease(); + WALInsertLockRelease(); } /* diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 82ef440949..f9c9bb299f 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -10,6 +10,13 @@ * locking should be done with the full lock manager --- which depends on * LWLocks to protect its shared state. * + * In addition to exclusive and shared modes, lightweight locks can be used + * to wait until a variable changes value. The variable is initially set + * when the lock is acquired with LWLockAcquireWithVar, and can be updated + * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar + * waits for the variable to be updated, or until the lock is free. The + * meaning of the variable is up to the caller, the lightweight lock code + * just assigns and compares it. * * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -78,6 +85,9 @@ static LWLock *held_lwlocks[MAX_SIMUL_LWLOCKS]; static int lock_addin_request = 0; static bool lock_addin_request_allowed = true; +static bool LWLockAcquireCommon(LWLock *l, LWLockMode mode, uint64 *valptr, + uint64 val); + #ifdef LWLOCK_STATS typedef struct lwlock_stats_key { @@ -443,16 +453,36 @@ LWLockInitialize(LWLock *lock, int tranche_id) /* * LWLockAcquire - acquire a lightweight lock in the specified mode * - * If the lock is not available, sleep until it is. + * If the lock is not available, sleep until it is. Returns true if the lock + * was available immediately, false if we had to sleep. * * Side effect: cancel/die interrupts are held off until lock release. */ -void +bool LWLockAcquire(LWLock *l, LWLockMode mode) +{ + return LWLockAcquireCommon(l, mode, NULL, 0); +} + +/* + * LWLockAcquireWithVar - like LWLockAcquire, but also sets *valptr = val + * + * The lock is always acquired in exclusive mode with this function. + */ +bool +LWLockAcquireWithVar(LWLock *l, uint64 *valptr, uint64 val) +{ + return LWLockAcquireCommon(l, LW_EXCLUSIVE, valptr, val); +} + +/* internal function to implement LWLockAcquire and LWLockAcquireWithVar */ +static bool +LWLockAcquireCommon(LWLock *l, LWLockMode mode, uint64 *valptr, uint64 val) { volatile LWLock *lock = l; PGPROC *proc = MyProc; bool retry = false; + bool result = true; int extraWaits = 0; #ifdef LWLOCK_STATS lwlock_stats *lwstats; @@ -601,8 +631,13 @@ LWLockAcquire(LWLock *l, LWLockMode mode) /* Now loop back and try to acquire lock again. */ retry = true; + result = false; } + /* If there's a variable associated with this lock, initialize it */ + if (valptr) + *valptr = val; + /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); @@ -616,6 +651,8 @@ LWLockAcquire(LWLock *l, LWLockMode mode) */ while (extraWaits-- > 0) PGSemaphoreUnlock(&proc->sem); + + return result; } /* @@ -834,6 +871,227 @@ LWLockAcquireOrWait(LWLock *l, LWLockMode mode) return !mustwait; } +/* + * LWLockWaitForVar - Wait until lock is free, or a variable is updated. + * + * If the lock is held and *valptr equals oldval, waits until the lock is + * either freed, or the lock holder updates *valptr by calling + * LWLockUpdateVar. If the lock is free on exit (immediately or after + * waiting), returns true. If the lock is still held, but *valptr no longer + * matches oldval, returns false and sets *newval to the current value in + * *valptr. + * + * It's possible that the lock holder releases the lock, but another backend + * acquires it again before we get a chance to observe that the lock was + * momentarily released. We wouldn't need to wait for the new lock holder, + * but we cannot distinguish that case, so we will have to wait. + * + * Note: this function ignores shared lock holders; if the lock is held + * in shared mode, returns 'true'. + */ +bool +LWLockWaitForVar(LWLock *l, uint64 *valptr, uint64 oldval, uint64 *newval) +{ + volatile LWLock *lock = l; + volatile uint64 *valp = valptr; + PGPROC *proc = MyProc; + int extraWaits = 0; + bool result = false; + + /* + * Quick test first to see if it the slot is free right now. + * + * XXX: the caller uses a spinlock before this, so we don't need a memory + * barrier here as far as the current usage is concerned. But that might + * not be safe in general. + */ + if (lock->exclusive == 0) + return true; + + /* + * Lock out cancel/die interrupts while we sleep on the lock. There is + * no cleanup mechanism to remove us from the wait queue if we got + * interrupted. + */ + HOLD_INTERRUPTS(); + + /* + * Loop here to check the lock's status after each time we are signaled. + */ + for (;;) + { + bool mustwait; + uint64 value; + + /* Acquire mutex. Time spent holding mutex should be short! */ +#ifdef LWLOCK_STATS + lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); +#else + SpinLockAcquire(&lock->mutex); +#endif + + /* Is the lock now free, and if not, does the value match? */ + if (lock->exclusive == 0) + { + result = true; + mustwait = false; + } + else + { + value = *valp; + if (value != oldval) + { + result = false; + mustwait = false; + *newval = value; + } + else + mustwait = true; + } + + if (!mustwait) + break; /* the lock was free or value didn't match */ + + /* + * Add myself to wait queue. + */ + proc->lwWaiting = true; + proc->lwWaitMode = LW_WAIT_UNTIL_FREE; + proc->lwWaitLink = NULL; + + /* waiters are added to the front of the queue */ + proc->lwWaitLink = lock->head; + if (lock->head == NULL) + lock->tail = proc; + lock->head = proc; + + /* Can release the mutex now */ + SpinLockRelease(&lock->mutex); + + /* + * Wait until awakened. + * + * Since we share the process wait semaphore with the regular lock + * manager and ProcWaitForSignal, and we may need to acquire an LWLock + * while one of those is pending, it is possible that we get awakened + * for a reason other than being signaled by LWLockRelease. If so, + * loop back and wait again. Once we've gotten the LWLock, + * re-increment the sema by the number of additional signals received, + * so that the lock manager or signal manager will see the received + * signal when it next waits. + */ + LOG_LWDEBUG("LWLockWaitForVar", T_NAME(l), T_ID(l), "waiting"); + +#ifdef LWLOCK_STATS + lwstats->block_count++; +#endif + + TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode); + + for (;;) + { + /* "false" means cannot accept cancel/die interrupt here. */ + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } + + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode); + + LOG_LWDEBUG("LWLockWaitForVar", T_NAME(l), T_ID(l), "awakened"); + + /* Now loop back and check the status of the lock again. */ + } + + /* We are done updating shared state of the lock itself. */ + SpinLockRelease(&lock->mutex); + + TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(l), T_ID(l), mode); + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); + + /* + * Now okay to allow cancel/die interrupts. + */ + RESUME_INTERRUPTS(); + + return result; +} + + +/* + * LWLockUpdateVar - Update a variable and wake up waiters atomically + * + * Sets *valptr to 'val', and wakes up all processes waiting for us with + * LWLockWaitForVar(). Setting the value and waking up the processes happen + * atomically so that any process calling LWLockWaitForVar() on the same lock + * is guaranteed to see the new value, and act accordingly. + * + * The caller must be holding the lock in exclusive mode. + */ +void +LWLockUpdateVar(LWLock *l, uint64 *valptr, uint64 val) +{ + volatile LWLock *lock = l; + volatile uint64 *valp = valptr; + PGPROC *head; + PGPROC *proc; + PGPROC *next; + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&lock->mutex); + + /* we should hold the lock */ + Assert(lock->exclusive == 1); + + /* Update the lock's value */ + *valp = val; + + /* + * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken + * up. They are always in the front of the queue. + */ + head = lock->head; + + if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE) + { + proc = head; + next = proc->lwWaitLink; + while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE) + { + proc = next; + next = next->lwWaitLink; + } + + /* proc is now the last PGPROC to be released */ + lock->head = next; + proc->lwWaitLink = NULL; + } + else + head = NULL; + + /* We are done updating shared state of the lock itself. */ + SpinLockRelease(&lock->mutex); + + /* + * Awaken any waiters I removed from the queue. + */ + while (head != NULL) + { + proc = head; + head = proc->lwWaitLink; + proc->lwWaitLink = NULL; + proc->lwWaiting = false; + PGSemaphoreUnlock(&proc->sem); + } +} + + /* * LWLockRelease - release a previously acquired lock */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index da882b22dc..2181a39853 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2120,12 +2120,12 @@ static struct config_int ConfigureNamesInt[] = }, { - {"xloginsert_slots", PGC_POSTMASTER, WAL_SETTINGS, - gettext_noop("Sets the number of slots for concurrent xlog insertions."), + {"xloginsert_locks", PGC_POSTMASTER, WAL_SETTINGS, + gettext_noop("Sets the number of locks used for concurrent xlog insertions."), NULL, GUC_NOT_IN_SAMPLE }, - &num_xloginsert_slots, + &num_xloginsert_locks, 8, 1, 1000, NULL, NULL, NULL }, diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 3509228466..56cfe63d8c 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -192,7 +192,7 @@ extern bool EnableHotStandby; extern bool fullPageWrites; extern bool wal_log_hints; extern bool log_checkpoints; -extern int num_xloginsert_slots; +extern int num_xloginsert_locks; /* WAL levels */ typedef enum WalLevel diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 8840c791dd..3a1953383e 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -169,13 +169,17 @@ typedef enum LWLockMode extern bool Trace_lwlocks; #endif -extern void LWLockAcquire(LWLock *lock, LWLockMode mode); +extern bool LWLockAcquire(LWLock *lock, LWLockMode mode); extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode); extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode); extern void LWLockRelease(LWLock *lock); extern void LWLockReleaseAll(void); extern bool LWLockHeldByMe(LWLock *lock); +extern bool LWLockAcquireWithVar(LWLock *lock, uint64 *valptr, uint64 val); +extern bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval); +extern void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 value); + extern Size LWLockShmemSize(void); extern void CreateLWLocks(void); -- 2.40.0