From 48354581a49c30f5757c203415aa8412d85b0f70 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 10 Apr 2016 20:12:32 -0700 Subject: [PATCH] Allow Pin/UnpinBuffer to operate in a lockfree manner. Pinning/Unpinning a buffer is a very frequent operation; especially in read-mostly cache resident workloads. Benchmarking shows that in various scenarios the spinlock protecting a buffer header's state becomes a significant bottleneck. The problem can be reproduced with pgbench -S on larger machines, but can be considerably worse for queries which touch the same buffers over and over at a high frequency (e.g. nested loops over a small inner table). To allow atomic operations to be used, cram BufferDesc's flags, usage_count, buf_hdr_lock, refcount into a single 32bit atomic variable; that allows to manipulate them together using 32bit compare-and-swap operations. This requires reducing MAX_BACKENDS to 2^18-1 (which could be lifted by using a 64bit field, but it's not a realistic configuration atm). As not all operations can easily implemented in a lockfree manner, implement the previous buf_hdr_lock via a flag bit in the atomic variable. That way we can continue to lock the header in places where it's needed, but can get away without acquiring it in the more frequent hot-paths. There's some additional operations which can be done without the lock, but aren't in this patch; but the most important places are covered. As bufmgr.c now essentially re-implements spinlocks, abstract the delay logic from s_lock.c into something more generic. It now has already two users, and more are coming up; there's a follupw patch for lwlock.c at least. This patch is based on a proof-of-concept written by me, which Alexander Korotkov made into a fully working patch; the committed version is again revised by me. Benchmarking and testing has, amongst others, been provided by Dilip Kumar, Alexander Korotkov, Robert Haas. On a large x86 system improvements for readonly pgbench, with a high client count, of a factor of 8 have been observed. Author: Alexander Korotkov and Andres Freund Discussion: 2400449.GjM57CE0Yg@dinodell --- contrib/pg_buffercache/pg_buffercache_pages.c | 15 +- src/backend/storage/buffer/buf_init.c | 7 +- src/backend/storage/buffer/bufmgr.c | 508 ++++++++++++------ src/backend/storage/buffer/freelist.c | 44 +- src/backend/storage/buffer/localbuf.c | 64 ++- src/backend/storage/lmgr/s_lock.c | 206 +++---- src/include/postmaster/postmaster.h | 15 +- src/include/storage/buf_internals.h | 101 ++-- src/include/storage/s_lock.h | 18 + src/tools/pgindent/typedefs.list | 1 + 10 files changed, 622 insertions(+), 357 deletions(-) diff --git a/contrib/pg_buffercache/pg_buffercache_pages.c b/contrib/pg_buffercache/pg_buffercache_pages.c index 6622d22f5f..17b4b6fa6f 100644 --- a/contrib/pg_buffercache/pg_buffercache_pages.c +++ b/contrib/pg_buffercache/pg_buffercache_pages.c @@ -148,11 +148,12 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) */ for (i = 0; i < NBuffers; i++) { - volatile BufferDesc *bufHdr; + BufferDesc *bufHdr; + uint32 buf_state; bufHdr = GetBufferDescriptor(i); /* Lock each buffer header before inspecting. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); fctx->record[i].bufferid = BufferDescriptorGetBuffer(bufHdr); fctx->record[i].relfilenode = bufHdr->tag.rnode.relNode; @@ -160,21 +161,21 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) fctx->record[i].reldatabase = bufHdr->tag.rnode.dbNode; fctx->record[i].forknum = bufHdr->tag.forkNum; fctx->record[i].blocknum = bufHdr->tag.blockNum; - fctx->record[i].usagecount = bufHdr->usage_count; - fctx->record[i].pinning_backends = bufHdr->refcount; + fctx->record[i].usagecount = BUF_STATE_GET_USAGECOUNT(buf_state); + fctx->record[i].pinning_backends = BUF_STATE_GET_REFCOUNT(buf_state); - if (bufHdr->flags & BM_DIRTY) + if (buf_state & BM_DIRTY) fctx->record[i].isdirty = true; else fctx->record[i].isdirty = false; /* Note if the buffer is valid, and has storage created */ - if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_TAG_VALID)) + if ((buf_state & BM_VALID) && (buf_state & BM_TAG_VALID)) fctx->record[i].isvalid = true; else fctx->record[i].isvalid = false; - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } /* diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index bfa37f1c66..a5cffc7896 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -135,12 +135,9 @@ InitBufferPool(void) BufferDesc *buf = GetBufferDescriptor(i); CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; - buf->refcount = 0; - buf->wait_backend_pid = 0; - SpinLockInit(&buf->buf_hdr_lock); + pg_atomic_init_u32(&buf->state, 0); + buf->wait_backend_pid = 0; buf->buf_id = i; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index c664984d0a..29f10e5956 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -436,11 +436,12 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy); static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf, bool fixOwner); static void BufferSync(int flags); +static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context); static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, - int set_flag_bits); + uint32 set_flag_bits); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); static BufferDesc *BufferAlloc(SMgrRelation smgr, @@ -816,8 +817,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isLocalBuf) { /* Only need to adjust flags */ - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + Assert(buf_state & BM_VALID); + buf_state &= ~BM_VALID; + pg_atomic_write_u32(&bufHdr->state, buf_state); } else { @@ -828,10 +832,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, */ do { - LockBufHdr(bufHdr); - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; - UnlockBufHdr(bufHdr); + uint32 buf_state = LockBufHdr(bufHdr); + + Assert(buf_state & BM_VALID); + buf_state &= ~BM_VALID; + UnlockBufHdr(bufHdr, buf_state); } while (!StartBufferIO(bufHdr, true)); } } @@ -848,7 +853,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * it's not been recycled) but come right back here to try smgrextend * again. */ - Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */ + Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); @@ -933,7 +938,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isLocalBuf) { /* Only need to adjust flags */ - bufHdr->flags |= BM_VALID; + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + buf_state |= BM_VALID; + pg_atomic_write_u32(&bufHdr->state, buf_state); } else { @@ -987,10 +995,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; int buf_id; BufferDesc *buf; bool valid; + uint32 buf_state; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); @@ -1059,12 +1068,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * Select a victim buffer. The buffer is returned with its header * spinlock still held! */ - buf = StrategyGetBuffer(strategy); + buf = StrategyGetBuffer(strategy, &buf_state); - Assert(buf->refcount == 0); + Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0); /* Must copy buffer flags while we still hold the spinlock */ - oldFlags = buf->flags; + oldFlags = buf_state & BUF_FLAG_MASK; /* Pin the buffer and then release the buffer spinlock */ PinBuffer_Locked(buf); @@ -1108,9 +1117,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, XLogRecPtr lsn; /* Read the LSN while holding buffer header lock */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); lsn = BufferGetLSN(buf); - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); if (XLogNeedsFlush(lsn) && StrategyRejectBuffer(strategy, buf)) @@ -1254,7 +1263,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, /* * Need to lock the buffer header too in order to change its tag. */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* * Somebody could have pinned or re-dirtied the buffer while we were @@ -1262,11 +1271,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * recycle this buffer; we must undo everything we've done and start * over with a new victim buffer. */ - oldFlags = buf->flags; - if (buf->refcount == 1 && !(oldFlags & BM_DIRTY)) + oldFlags = buf_state & BUF_FLAG_MASK; + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY)) break; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); BufTableDelete(&newTag, newHash); if ((oldFlags & BM_TAG_VALID) && oldPartitionLock != newPartitionLock) @@ -1284,14 +1293,15 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * 1 so that the buffer can survive one clock-sweep pass.) */ buf->tag = newTag; - buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT); + buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | + BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT | + BUF_USAGECOUNT_MASK); if (relpersistence == RELPERSISTENCE_PERMANENT) - buf->flags |= BM_TAG_VALID | BM_PERMANENT; + buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE; else - buf->flags |= BM_TAG_VALID; - buf->usage_count = 1; + buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); if (oldFlags & BM_TAG_VALID) { @@ -1338,12 +1348,15 @@ InvalidateBuffer(BufferDesc *buf) BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; + uint32 buf_state; /* Save the original buffer tag before dropping the spinlock */ oldTag = buf->tag; - UnlockBufHdr(buf); + buf_state = pg_atomic_read_u32(&buf->state); + Assert(buf_state & BM_LOCKED); + UnlockBufHdr(buf, buf_state); /* * Need to compute the old tag's hashcode and partition lock ID. XXX is it @@ -1362,12 +1375,12 @@ retry: LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE); /* Re-lock the buffer header */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* If it's changed while we were waiting for lock, do nothing */ if (!BUFFERTAGS_EQUAL(buf->tag, oldTag)) { - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(oldPartitionLock); return; } @@ -1381,9 +1394,9 @@ retry: * yet done StartBufferIO, WaitIO will fall through and we'll effectively * be busy-looping here.) */ - if (buf->refcount != 0) + if (BUF_STATE_GET_REFCOUNT(buf_state) != 0) { - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(oldPartitionLock); /* safety check: should definitely not be our *own* pin */ if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0) @@ -1396,12 +1409,10 @@ retry: * Clear out the buffer's tag and flags. We must do this to ensure that * linear scans of the buffer array don't think the buffer is valid. */ - oldFlags = buf->flags; + oldFlags = buf_state & BUF_FLAG_MASK; CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; - - UnlockBufHdr(buf); + buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK); + UnlockBufHdr(buf, buf_state); /* * Remove the buffer from the lookup hashtable, if it was in there. @@ -1433,6 +1444,8 @@ void MarkBufferDirty(Buffer buffer) { BufferDesc *bufHdr; + uint32 buf_state; + uint32 old_buf_state; if (!BufferIsValid(buffer)) elog(ERROR, "bad buffer ID: %d", buffer); @@ -1449,24 +1462,32 @@ MarkBufferDirty(Buffer buffer) /* unfortunately we can't check if the lock is held exclusively */ Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr))); - LockBufHdr(bufHdr); + old_buf_state = pg_atomic_read_u32(&bufHdr->state); + for (;;) + { + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(bufHdr); - Assert(bufHdr->refcount > 0); + buf_state = old_buf_state; + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + buf_state |= BM_DIRTY | BM_JUST_DIRTIED; + + if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state, + buf_state)) + break; + } /* * If the buffer was not dirty already, do vacuum accounting. */ - if (!(bufHdr->flags & BM_DIRTY)) + if (!(old_buf_state & BM_DIRTY)) { VacuumPageDirty++; pgBufferUsage.shared_blks_dirtied++; if (VacuumCostActive) VacuumCostBalance += VacuumCostPageDirty; } - - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - - UnlockBufHdr(bufHdr); } /* @@ -1531,6 +1552,10 @@ ReleaseAndReadBuffer(Buffer buffer, * * This should be applied only to shared buffers, never local ones. * + * Since buffers are pinned/unpinned very frequently, pin buffers without + * taking the buffer header lock; instead update the state variable in loop of + * CAS operations. Hopefully it's just a single CAS. + * * Note that ResourceOwnerEnlargeBuffers must have been done already. * * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows @@ -1547,23 +1572,34 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) if (ref == NULL) { + uint32 buf_state; + uint32 old_buf_state; + ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); - LockBufHdr(buf); - buf->refcount++; - if (strategy == NULL) - { - if (buf->usage_count < BM_MAX_USAGE_COUNT) - buf->usage_count++; - } - else + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) { - if (buf->usage_count == 0) - buf->usage_count = 1; + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(buf); + + buf_state = old_buf_state; + + /* increase refcount */ + buf_state += BUF_REFCOUNT_ONE; + + /* increase usagecount unless already max */ + if (BUF_STATE_GET_USAGECOUNT(buf_state) != BM_MAX_USAGE_COUNT) + buf_state += BUF_USAGECOUNT_ONE; + + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + buf_state)) + { + result = (buf_state & BM_VALID) != 0; + break; + } } - result = (buf->flags & BM_VALID) != 0; - UnlockBufHdr(buf); } else { @@ -1603,6 +1639,7 @@ PinBuffer_Locked(BufferDesc *buf) { Buffer b; PrivateRefCountEntry *ref; + uint32 buf_state; /* * As explained, We don't expect any preexisting pins. That allows us to @@ -1610,8 +1647,14 @@ PinBuffer_Locked(BufferDesc *buf) */ Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); - buf->refcount++; - UnlockBufHdr(buf); + /* + * Since we hold the buffer spinlock, we can update the buffer state and + * release the lock in one operation. + */ + buf_state = pg_atomic_read_u32(&buf->state); + Assert(buf_state & BM_LOCKED); + buf_state += BUF_REFCOUNT_ONE; + UnlockBufHdr(buf, buf_state); b = BufferDescriptorGetBuffer(buf); @@ -1646,30 +1689,59 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) ref->refcount--; if (ref->refcount == 0) { + uint32 buf_state; + uint32 old_buf_state; + /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf))); Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf))); - LockBufHdr(buf); + /* + * Decrement the shared reference count. + * + * Since buffer spinlock holder can update status using just write, + * it's not safe to use atomic decrement here; thus use a CAS loop. + */ + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) + { + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(buf); + + buf_state = old_buf_state; + + buf_state -= BUF_REFCOUNT_ONE; - /* Decrement the shared reference count */ - Assert(buf->refcount > 0); - buf->refcount--; + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + buf_state)) + break; + } /* Support LockBufferForCleanup() */ - if ((buf->flags & BM_PIN_COUNT_WAITER) && - buf->refcount == 1) + if (buf_state & BM_PIN_COUNT_WAITER) { - /* we just released the last pin other than the waiter's */ - int wait_backend_pid = buf->wait_backend_pid; + /* + * Acquire the buffer header lock, re-check that there's a waiter. + * Another backend could have unpinned this buffer, and already + * woken up the waiter. There's no danger of the buffer being + * replaced after we unpinned it above, as it's pinned by the + * waiter. + */ + buf_state = LockBufHdr(buf); - buf->flags &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf); - ProcSendSignal(wait_backend_pid); - } - else - UnlockBufHdr(buf); + if ((buf_state & BM_PIN_COUNT_WAITER) && + BUF_STATE_GET_REFCOUNT(buf_state) == 1) + { + /* we just released the last pin other than the waiter's */ + int wait_backend_pid = buf->wait_backend_pid; + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(buf, buf_state); + ProcSendSignal(wait_backend_pid); + } + else + UnlockBufHdr(buf, buf_state); + } ForgetPrivateRefCountEntry(ref); } } @@ -1687,6 +1759,7 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) static void BufferSync(int flags) { + uint32 buf_state; int buf_id; int num_to_scan; int num_spaces; @@ -1736,13 +1809,13 @@ BufferSync(int flags) * Header spinlock is enough to examine BM_DIRTY, see comment in * SyncOneBuffer. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); - if ((bufHdr->flags & mask) == mask) + if ((buf_state & mask) == mask) { CkptSortItem *item; - bufHdr->flags |= BM_CHECKPOINT_NEEDED; + buf_state |= BM_CHECKPOINT_NEEDED; item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; @@ -1752,7 +1825,7 @@ BufferSync(int flags) item->blockNum = bufHdr->tag.blockNum; } - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } if (num_to_scan == 0) @@ -1888,7 +1961,7 @@ BufferSync(int flags) * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ - if (bufHdr->flags & BM_CHECKPOINT_NEEDED) + if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) { @@ -2176,8 +2249,8 @@ BgBufferSync(WritebackContext *wb_context) /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int buffer_state = SyncOneBuffer(next_to_clean, true, - wb_context); + int sync_state = SyncOneBuffer(next_to_clean, true, + wb_context); if (++next_to_clean >= NBuffers) { @@ -2186,7 +2259,7 @@ BgBufferSync(WritebackContext *wb_context) } num_to_scan--; - if (buffer_state & BUF_WRITTEN) + if (sync_state & BUF_WRITTEN) { reusable_buffers++; if (++num_written >= bgwriter_lru_maxpages) @@ -2195,7 +2268,7 @@ BgBufferSync(WritebackContext *wb_context) break; } } - else if (buffer_state & BUF_REUSABLE) + else if (sync_state & BUF_REUSABLE) reusable_buffers++; } @@ -2258,6 +2331,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + uint32 buf_state; BufferTag tag; ReservePrivateRefCountEntry(); @@ -2271,21 +2345,24 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); - if (bufHdr->refcount == 0 && bufHdr->usage_count == 0) + if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && + BUF_STATE_GET_USAGECOUNT(buf_state) == 0) + { result |= BUF_REUSABLE; + } else if (skip_recently_used) { /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return result; } - if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY)) + if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return result; } @@ -2439,6 +2516,7 @@ PrintBufferLeakWarning(Buffer buffer) int32 loccount; char *path; BackendId backend; + uint32 buf_state; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) @@ -2456,12 +2534,13 @@ PrintBufferLeakWarning(Buffer buffer) /* theoretically we should lock the bufhdr here */ path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); + buf_state = pg_atomic_read_u32(&buf->state); elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", buffer, path, - buf->tag.blockNum, buf->flags, - buf->refcount, loccount); + buf->tag.blockNum, buf_state & BUF_FLAG_MASK, + BUF_STATE_GET_REFCOUNT(buf_state), loccount); pfree(path); } @@ -2573,6 +2652,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) io_time; Block bufBlock; char *bufToWrite; + uint32 buf_state; /* * Acquire the buffer's io_in_progress lock. If StartBufferIO returns @@ -2598,7 +2678,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* * Run PageGetLSN while holding header lock, since we don't have the @@ -2607,8 +2687,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) recptr = BufferGetLSN(buf); /* To check if block content changes while flushing. - vadim 01/17/97 */ - buf->flags &= ~BM_JUST_DIRTIED; - UnlockBufHdr(buf); + buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(buf, buf_state); /* * Force XLOG flush up to buffer's LSN. This implements the basic WAL @@ -2627,7 +2707,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ - if (buf->flags & BM_PERMANENT) + if (buf_state & BM_PERMANENT) XLogFlush(recptr); /* @@ -2716,12 +2796,12 @@ BufferIsPermanent(Buffer buffer) /* * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we * need not bother with the buffer header spinlock. Even if someone else - * changes the buffer header flags while we're doing this, we assume that - * changing an aligned 2-byte BufFlags value is atomic, so we'll read the - * old value or the new value, but not random garbage. + * changes the buffer header state while we're doing this, the state is + * changed atomically, so we'll read the old value or the new value, but + * not random garbage. */ bufHdr = GetBufferDescriptor(buffer - 1); - return (bufHdr->flags & BM_PERMANENT) != 0; + return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0; } /* @@ -2736,6 +2816,7 @@ BufferGetLSNAtomic(Buffer buffer) BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1); char *page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); XLogRecPtr lsn; + uint32 buf_state; /* * If we don't need locking for correctness, fastpath out. @@ -2747,9 +2828,9 @@ BufferGetLSNAtomic(Buffer buffer) Assert(BufferIsValid(buffer)); Assert(BufferIsPinned(buffer)); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); lsn = PageGetLSN(page); - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return lsn; } @@ -2797,6 +2878,7 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * We can make this a tad faster by prechecking the buffer tag before @@ -2817,13 +2899,13 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node)) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -2887,6 +2969,7 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) { RelFileNode *rnode = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe @@ -2917,11 +3000,11 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) if (rnode == NULL) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode))) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } pfree(nodes); @@ -2951,6 +3034,7 @@ DropDatabaseBuffers(Oid dbid) for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe @@ -2959,11 +3043,11 @@ DropDatabaseBuffers(Oid dbid) if (bufHdr->tag.rnode.dbNode != dbid) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3055,9 +3139,12 @@ FlushRelationBuffers(Relation rel) { for (i = 0; i < NLocBuffer; i++) { + uint32 buf_state; + bufHdr = GetLocalBufferDescriptor(i); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + ((buf_state = pg_atomic_read_u32(&bufHdr->state)) & + (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { ErrorContextCallback errcallback; Page localpage; @@ -3078,7 +3165,8 @@ FlushRelationBuffers(Relation rel) localpage, false); - bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); + buf_state &= ~(BM_DIRTY | BM_JUST_DIRTIED); + pg_atomic_write_u32(&bufHdr->state, buf_state); /* Pop the error context stack */ error_context_stack = errcallback.previous; @@ -3093,6 +3181,8 @@ FlushRelationBuffers(Relation rel) for (i = 0; i < NBuffers; i++) { + uint32 buf_state; + bufHdr = GetBufferDescriptor(i); /* @@ -3104,9 +3194,9 @@ FlushRelationBuffers(Relation rel) ReservePrivateRefCountEntry(); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); @@ -3115,7 +3205,7 @@ FlushRelationBuffers(Relation rel) UnpinBuffer(bufHdr, true); } else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3145,6 +3235,8 @@ FlushDatabaseBuffers(Oid dbid) for (i = 0; i < NBuffers; i++) { + uint32 buf_state; + bufHdr = GetBufferDescriptor(i); /* @@ -3156,9 +3248,9 @@ FlushDatabaseBuffers(Oid dbid) ReservePrivateRefCountEntry(); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); @@ -3167,7 +3259,7 @@ FlushDatabaseBuffers(Oid dbid) UnpinBuffer(bufHdr, true); } else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3297,12 +3389,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * is only intended to be used in cases where failing to write out the * data would be harmless anyway, it doesn't really matter. */ - if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != + if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { XLogRecPtr lsn = InvalidXLogRecPtr; bool dirtied = false; bool delayChkpt = false; + uint32 buf_state; /* * If we need to protect hint bit updates from torn writes, WAL-log a @@ -3313,7 +3406,8 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * We don't check full_page_writes here because that logic is included * when we call XLogInsert() since the value changes dynamically. */ - if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT)) + if (XLogHintBitIsNeeded() && + (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT)) { /* * If we're in recovery we cannot dirty a page because of a hint. @@ -3352,9 +3446,11 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) lsn = XLogSaveBufferForHint(buffer, buffer_std); } - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (!(bufHdr->flags & BM_DIRTY)) + buf_state = LockBufHdr(bufHdr); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + + if (!(buf_state & BM_DIRTY)) { dirtied = true; /* Means "will be dirtied by this action" */ @@ -3374,8 +3470,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) if (!XLogRecPtrIsInvalid(lsn)) PageSetLSN(page, lsn); } - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - UnlockBufHdr(bufHdr); + + buf_state |= BM_DIRTY | BM_JUST_DIRTIED; + UnlockBufHdr(bufHdr, buf_state); if (delayChkpt) MyPgXact->delayChkpt = false; @@ -3406,17 +3503,19 @@ UnlockBuffers(void) if (buf) { - LockBufHdr(buf); + uint32 buf_state; + + buf_state = LockBufHdr(buf); /* * Don't complain if flag bit not set; it could have been reset but we * got a cancel/die interrupt before getting the signal. */ - if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && + if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pid == MyProcPid) - buf->flags &= ~BM_PIN_COUNT_WAITER; + buf_state &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); PinCountWaitBuf = NULL; } @@ -3509,27 +3608,30 @@ LockBufferForCleanup(Buffer buffer) for (;;) { + uint32 buf_state; + /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + buf_state = LockBufHdr(bufHdr); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return; } /* Failed, so mark myself as waiting for pincount 1 */ - if (bufHdr->flags & BM_PIN_COUNT_WAITER) + if (buf_state & BM_PIN_COUNT_WAITER) { - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pid = MyProcPid; - bufHdr->flags |= BM_PIN_COUNT_WAITER; PinCountWaitBuf = bufHdr; - UnlockBufHdr(bufHdr); + buf_state |= BM_PIN_COUNT_WAITER; + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Report the wait */ @@ -3558,11 +3660,11 @@ LockBufferForCleanup(Buffer buffer) * impossible with the current usages due to table level locking, but * better be safe. */ - LockBufHdr(bufHdr); - if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 && + buf_state = LockBufHdr(bufHdr); + if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && bufHdr->wait_backend_pid == MyProcPid) - bufHdr->flags &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(bufHdr); + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(bufHdr, buf_state); PinCountWaitBuf = NULL; /* Loop back and try again */ @@ -3603,22 +3705,26 @@ bool ConditionalLockBufferForCleanup(Buffer buffer) { BufferDesc *bufHdr; + uint32 buf_state, + refcount; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { + refcount = LocalRefCount[-buffer - 1]; /* There should be exactly one pin */ - Assert(LocalRefCount[-buffer - 1] > 0); - if (LocalRefCount[-buffer - 1] != 1) + Assert(refcount > 0); + if (refcount != 1) return false; /* Nobody else to wait for */ return true; } /* There should be exactly one local pin */ - Assert(GetPrivateRefCount(buffer) > 0); - if (GetPrivateRefCount(buffer) != 1) + refcount = GetPrivateRefCount(buffer); + Assert(refcount); + if (refcount != 1) return false; /* Try to acquire lock */ @@ -3626,17 +3732,19 @@ ConditionalLockBufferForCleanup(Buffer buffer) return false; bufHdr = GetBufferDescriptor(buffer - 1); - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + buf_state = LockBufHdr(bufHdr); + refcount = BUF_STATE_GET_REFCOUNT(buf_state); + + Assert(refcount > 0); + if (refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return true; } /* Failed, so release the lock */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); return false; } @@ -3666,17 +3774,17 @@ WaitIO(BufferDesc *buf) */ for (;;) { - BufFlags sv_flags; + uint32 buf_state; /* * It may not be necessary to acquire the spinlock to check the flag * here, but since this test is essential for correctness, we'd better * play it safe. */ - LockBufHdr(buf); - sv_flags = buf->flags; - UnlockBufHdr(buf); - if (!(sv_flags & BM_IO_IN_PROGRESS)) + buf_state = LockBufHdr(buf); + UnlockBufHdr(buf, buf_state); + + if (!(buf_state & BM_IO_IN_PROGRESS)) break; LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED); LWLockRelease(BufferDescriptorGetIOLock(buf)); @@ -3704,6 +3812,8 @@ WaitIO(BufferDesc *buf) static bool StartBufferIO(BufferDesc *buf, bool forInput) { + uint32 buf_state; + Assert(!InProgressBuf); for (;;) @@ -3714,9 +3824,9 @@ StartBufferIO(BufferDesc *buf, bool forInput) */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); - if (!(buf->flags & BM_IO_IN_PROGRESS)) + if (!(buf_state & BM_IO_IN_PROGRESS)) break; /* @@ -3725,24 +3835,23 @@ StartBufferIO(BufferDesc *buf, bool forInput) * an error (see AbortBufferIO). If that's the case, we must wait for * him to get unwedged. */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(BufferDescriptorGetIOLock(buf)); WaitIO(buf); } /* Once we get here, there is definitely no I/O active on this buffer */ - if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) + if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { /* someone else already did the I/O */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(BufferDescriptorGetIOLock(buf)); return false; } - buf->flags |= BM_IO_IN_PROGRESS; - - UnlockBufHdr(buf); + buf_state |= BM_IO_IN_PROGRESS; + UnlockBufHdr(buf, buf_state); InProgressBuf = buf; IsForInput = forInput; @@ -3768,19 +3877,22 @@ StartBufferIO(BufferDesc *buf, bool forInput) * be 0, or BM_VALID if we just finished reading in the page. */ static void -TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits) +TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) { + uint32 buf_state; + Assert(buf == InProgressBuf); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); + + Assert(buf_state & BM_IO_IN_PROGRESS); - Assert(buf->flags & BM_IO_IN_PROGRESS); - buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); - if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) - buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - buf->flags |= set_flag_bits; + buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); + if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) + buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - UnlockBufHdr(buf); + buf_state |= set_flag_bits; + UnlockBufHdr(buf, buf_state); InProgressBuf = NULL; @@ -3803,6 +3915,8 @@ AbortBufferIO(void) if (buf) { + uint32 buf_state; + /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that @@ -3811,24 +3925,22 @@ AbortBufferIO(void) */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - LockBufHdr(buf); - Assert(buf->flags & BM_IO_IN_PROGRESS); + buf_state = LockBufHdr(buf); + Assert(buf_state & BM_IO_IN_PROGRESS); if (IsForInput) { - Assert(!(buf->flags & BM_DIRTY)); + Assert(!(buf_state & BM_DIRTY)); + /* We'd better not think buffer is valid yet */ - Assert(!(buf->flags & BM_VALID)); - UnlockBufHdr(buf); + Assert(!(buf_state & BM_VALID)); + UnlockBufHdr(buf, buf_state); } else { - BufFlags sv_flags; - - sv_flags = buf->flags; - Assert(sv_flags & BM_DIRTY); - UnlockBufHdr(buf); + Assert(buf_state & BM_DIRTY); + UnlockBufHdr(buf, buf_state); /* Issue notice if this is not the first failure... */ - if (sv_flags & BM_IO_ERROR) + if (buf_state & BM_IO_ERROR) { /* Buffer is pinned, so we can read tag without spinlock */ char *path; @@ -3911,6 +4023,54 @@ rnode_comparator(const void *p1, const void *p2) return 0; } +/* + * Lock buffer header - set BM_LOCKED in buffer state. + */ +uint32 +LockBufHdr(BufferDesc *desc) +{ + SpinDelayStatus delayStatus = init_spin_delay(desc); + uint32 old_buf_state; + + while (true) + { + /* set BM_LOCKED flag */ + old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED); + /* if it wasn't set before we're OK */ + if (!(old_buf_state & BM_LOCKED)) + break; + perform_spin_delay(&delayStatus); + } + finish_spin_delay(&delayStatus); + return old_buf_state | BM_LOCKED; +} + +/* + * Wait until the BM_LOCKED flag isn't set anymore and return the buffer's + * state at that point. + * + * Obviously the buffer could be locked by the time the value is returned, so + * this is primarily useful in CAS style loops. + */ +static uint32 +WaitBufHdrUnlocked(BufferDesc *buf) +{ + SpinDelayStatus delayStatus = init_spin_delay(buf); + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&buf->state); + + while (buf_state & BM_LOCKED) + { + perform_spin_delay(&delayStatus); + buf_state = pg_atomic_read_u32(&buf->state); + } + + finish_spin_delay(&delayStatus); + + return buf_state; +} + /* * BufferTag comparator. */ diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 551d15205c..88b90dc527 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -98,7 +98,8 @@ typedef struct BufferAccessStrategyData /* Prototypes for internal functions */ -static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy); +static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy, + uint32 *buf_state); static void AddBufferToRing(BufferAccessStrategy strategy, BufferDesc *buf); @@ -180,11 +181,12 @@ ClockSweepTick(void) * return the buffer with the buffer header spinlock still held. */ BufferDesc * -StrategyGetBuffer(BufferAccessStrategy strategy) +StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state) { BufferDesc *buf; int bgwprocno; int trycounter; + uint32 local_buf_state; /* to avoid repeated (de-)referencing */ /* * If given a strategy object, see whether it can select a buffer. We @@ -192,7 +194,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy) */ if (strategy != NULL) { - buf = GetBufferFromRing(strategy); + buf = GetBufferFromRing(strategy, buf_state); if (buf != NULL) return buf; } @@ -279,14 +281,16 @@ StrategyGetBuffer(BufferAccessStrategy strategy) * it before we got to it. It's probably impossible altogether as * of 8.3, but we'd better check anyway.) */ - LockBufHdr(buf); - if (buf->refcount == 0 && buf->usage_count == 0) + local_buf_state = LockBufHdr(buf); + if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0 + && BUF_STATE_GET_USAGECOUNT(local_buf_state) == 0) { if (strategy != NULL) AddBufferToRing(strategy, buf); + *buf_state = local_buf_state; return buf; } - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); } } @@ -295,19 +299,20 @@ StrategyGetBuffer(BufferAccessStrategy strategy) trycounter = NBuffers; for (;;) { - buf = GetBufferDescriptor(ClockSweepTick()); /* * If the buffer is pinned or has a nonzero usage_count, we cannot use * it; decrement the usage_count (unless pinned) and keep scanning. */ - LockBufHdr(buf); - if (buf->refcount == 0) + local_buf_state = LockBufHdr(buf); + + if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0) { - if (buf->usage_count > 0) + if (BUF_STATE_GET_USAGECOUNT(local_buf_state) != 0) { - buf->usage_count--; + local_buf_state -= BUF_USAGECOUNT_ONE; + trycounter = NBuffers; } else @@ -315,6 +320,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy) /* Found a usable buffer */ if (strategy != NULL) AddBufferToRing(strategy, buf); + *buf_state = local_buf_state; return buf; } } @@ -327,10 +333,10 @@ StrategyGetBuffer(BufferAccessStrategy strategy) * probably better to fail than to risk getting stuck in an * infinite loop. */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); elog(ERROR, "no unpinned buffers available"); } - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); } } @@ -585,10 +591,12 @@ FreeAccessStrategy(BufferAccessStrategy strategy) * The bufhdr spin lock is held on the returned buffer. */ static BufferDesc * -GetBufferFromRing(BufferAccessStrategy strategy) +GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state) { BufferDesc *buf; Buffer bufnum; + uint32 local_buf_state; /* to avoid repeated (de-)referencing */ + /* Advance to next ring slot */ if (++strategy->current >= strategy->ring_size) @@ -616,13 +624,15 @@ GetBufferFromRing(BufferAccessStrategy strategy) * shouldn't re-use it. */ buf = GetBufferDescriptor(bufnum - 1); - LockBufHdr(buf); - if (buf->refcount == 0 && buf->usage_count <= 1) + local_buf_state = LockBufHdr(buf); + if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0 + && BUF_STATE_GET_USAGECOUNT(local_buf_state) <= 1) { strategy->current_was_in_ring = true; + *buf_state = local_buf_state; return buf; } - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); /* * Tell caller to allocate a new buffer with the normal allocation diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 17640cfe2a..68b402023a 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -108,6 +108,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, int b; int trycounter; bool found; + uint32 buf_state; INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); @@ -128,16 +129,21 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); #endif + buf_state = pg_atomic_read_u32(&bufHdr->state); + /* this part is equivalent to PinBuffer for a shared buffer */ if (LocalRefCount[b] == 0) { - if (bufHdr->usage_count < BM_MAX_USAGE_COUNT) - bufHdr->usage_count++; + if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT) + { + buf_state += BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, buf_state); + } } LocalRefCount[b]++; ResourceOwnerRememberBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(bufHdr)); - if (bufHdr->flags & BM_VALID) + if (buf_state & BM_VALID) *foundPtr = TRUE; else { @@ -169,9 +175,12 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, if (LocalRefCount[b] == 0) { - if (bufHdr->usage_count > 0) + buf_state = pg_atomic_read_u32(&bufHdr->state); + + if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0) { - bufHdr->usage_count--; + buf_state -= BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, buf_state); trycounter = NLocBuffer; } else @@ -193,7 +202,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, * this buffer is not referenced but it might still be dirty. if that's * the case, write it out before reusing it! */ - if (bufHdr->flags & BM_DIRTY) + if (buf_state & BM_DIRTY) { SMgrRelation oreln; Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); @@ -211,7 +220,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, false); /* Mark not-dirty now in case we error out below */ - bufHdr->flags &= ~BM_DIRTY; + buf_state &= ~BM_DIRTY; + pg_atomic_write_u32(&bufHdr->state, buf_state); pgBufferUsage.local_blks_written++; } @@ -228,7 +238,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, /* * Update the hash table: remove old entry, if any, and make new one. */ - if (bufHdr->flags & BM_TAG_VALID) + if (buf_state & BM_TAG_VALID) { hresult = (LocalBufferLookupEnt *) hash_search(LocalBufHash, (void *) &bufHdr->tag, @@ -237,7 +247,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, elog(ERROR, "local buffer hash table corrupted"); /* mark buffer invalid just in case hash insert fails */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID); + buf_state &= ~(BM_VALID | BM_TAG_VALID); + pg_atomic_write_u32(&bufHdr->state, buf_state); } hresult = (LocalBufferLookupEnt *) @@ -250,9 +261,11 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, * it's all ours now. */ bufHdr->tag = newTag; - bufHdr->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); - bufHdr->flags |= BM_TAG_VALID; - bufHdr->usage_count = 1; + buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); + buf_state |= BM_TAG_VALID; + buf_state &= ~BUF_USAGECOUNT_MASK; + buf_state += BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, buf_state); *foundPtr = FALSE; return bufHdr; @@ -267,6 +280,7 @@ MarkLocalBufferDirty(Buffer buffer) { int bufid; BufferDesc *bufHdr; + uint32 buf_state; Assert(BufferIsLocal(buffer)); @@ -280,10 +294,10 @@ MarkLocalBufferDirty(Buffer buffer) bufHdr = GetLocalBufferDescriptor(bufid); - if (!(bufHdr->flags & BM_DIRTY)) - pgBufferUsage.local_blks_dirtied++; + buf_state = pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY); - bufHdr->flags |= BM_DIRTY; + if (!(buf_state & BM_DIRTY)) + pgBufferUsage.local_blks_dirtied++; } /* @@ -307,8 +321,11 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 buf_state; - if ((bufHdr->flags & BM_TAG_VALID) && + buf_state = pg_atomic_read_u32(&bufHdr->state); + + if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) @@ -327,8 +344,9 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags = 0; - bufHdr->usage_count = 0; + buf_state &= ~BUF_FLAG_MASK; + buf_state &= ~BUF_USAGECOUNT_MASK; + pg_atomic_write_u32(&bufHdr->state, buf_state); } } } @@ -349,8 +367,11 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode) { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&bufHdr->state); - if ((bufHdr->flags & BM_TAG_VALID) && + if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode)) { if (LocalRefCount[i] != 0) @@ -367,8 +388,9 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode) elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags = 0; - bufHdr->usage_count = 0; + buf_state &= ~BUF_FLAG_MASK; + buf_state &= ~BUF_USAGECOUNT_MASK; + pg_atomic_write_u32(&bufHdr->state, buf_state); } } } diff --git a/src/backend/storage/lmgr/s_lock.c b/src/backend/storage/lmgr/s_lock.c index cc0bf5e01f..4a6ffb4f89 100644 --- a/src/backend/storage/lmgr/s_lock.c +++ b/src/backend/storage/lmgr/s_lock.c @@ -3,6 +3,38 @@ * s_lock.c * Hardware-dependent implementation of spinlocks. * + * When waiting for a contended spinlock we loop tightly for awhile, then + * delay using pg_usleep() and try again. Preferably, "awhile" should be a + * small multiple of the maximum time we expect a spinlock to be held. 100 + * iterations seems about right as an initial guess. However, on a + * uniprocessor the loop is a waste of cycles, while in a multi-CPU scenario + * it's usually better to spin a bit longer than to call the kernel, so we try + * to adapt the spin loop count depending on whether we seem to be in a + * uniprocessor or multiprocessor. + * + * Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd + * be wrong; there are platforms where that can result in a "stuck + * spinlock" failure. This has been seen particularly on Alphas; it seems + * that the first TAS after returning from kernel space will always fail + * on that hardware. + * + * Once we do decide to block, we use randomly increasing pg_usleep() + * delays. The first delay is 1 msec, then the delay randomly increases to + * about one second, after which we reset to 1 msec and start again. The + * idea here is that in the presence of heavy contention we need to + * increase the delay, else the spinlock holder may never get to run and + * release the lock. (Consider situation where spinlock holder has been + * nice'd down in priority by the scheduler --- it will not get scheduled + * until all would-be acquirers are sleeping, so if we always use a 1-msec + * sleep, there is a real possibility of starvation.) But we can't just + * clamp the delay to an upper bound, else it would take a long time to + * make a reasonable number of tries. + * + * We time out and declare error after NUM_DELAYS delays (thus, exactly + * that many tries). With the given settings, this will usually take 2 or + * so minutes. It seems better to fix the total number of tries (and thus + * the probability of unintended failure) than to fix the total time + * spent. * * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -21,6 +53,14 @@ #include "storage/s_lock.h" #include "storage/barrier.h" + +#define MIN_SPINS_PER_DELAY 10 +#define MAX_SPINS_PER_DELAY 1000 +#define NUM_DELAYS 1000 +#define MIN_DELAY_USEC 1000L +#define MAX_DELAY_USEC 1000000L + + slock_t dummy_spinlock; static int spins_per_delay = DEFAULT_SPINS_PER_DELAY; @@ -30,117 +70,107 @@ static int spins_per_delay = DEFAULT_SPINS_PER_DELAY; * s_lock_stuck() - complain about a stuck spinlock */ static void -s_lock_stuck(volatile slock_t *lock, const char *file, int line) +s_lock_stuck(void *p, const char *file, int line) { #if defined(S_LOCK_TEST) fprintf(stderr, "\nStuck spinlock (%p) detected at %s:%d.\n", - lock, file, line); + p, file, line); exit(1); #else elog(PANIC, "stuck spinlock (%p) detected at %s:%d", - lock, file, line); + p, file, line); #endif } - /* * s_lock(lock) - platform-independent portion of waiting for a spinlock. */ int s_lock(volatile slock_t *lock, const char *file, int line) { - /* - * We loop tightly for awhile, then delay using pg_usleep() and try again. - * Preferably, "awhile" should be a small multiple of the maximum time we - * expect a spinlock to be held. 100 iterations seems about right as an - * initial guess. However, on a uniprocessor the loop is a waste of - * cycles, while in a multi-CPU scenario it's usually better to spin a bit - * longer than to call the kernel, so we try to adapt the spin loop count - * depending on whether we seem to be in a uniprocessor or multiprocessor. - * - * Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd - * be wrong; there are platforms where that can result in a "stuck - * spinlock" failure. This has been seen particularly on Alphas; it seems - * that the first TAS after returning from kernel space will always fail - * on that hardware. - * - * Once we do decide to block, we use randomly increasing pg_usleep() - * delays. The first delay is 1 msec, then the delay randomly increases to - * about one second, after which we reset to 1 msec and start again. The - * idea here is that in the presence of heavy contention we need to - * increase the delay, else the spinlock holder may never get to run and - * release the lock. (Consider situation where spinlock holder has been - * nice'd down in priority by the scheduler --- it will not get scheduled - * until all would-be acquirers are sleeping, so if we always use a 1-msec - * sleep, there is a real possibility of starvation.) But we can't just - * clamp the delay to an upper bound, else it would take a long time to - * make a reasonable number of tries. - * - * We time out and declare error after NUM_DELAYS delays (thus, exactly - * that many tries). With the given settings, this will usually take 2 or - * so minutes. It seems better to fix the total number of tries (and thus - * the probability of unintended failure) than to fix the total time - * spent. - */ -#define MIN_SPINS_PER_DELAY 10 -#define MAX_SPINS_PER_DELAY 1000 -#define NUM_DELAYS 1000 -#define MIN_DELAY_USEC 1000L -#define MAX_DELAY_USEC 1000000L - - int spins = 0; - int delays = 0; - int cur_delay = 0; + SpinDelayStatus delayStatus = init_spin_delay((void *) lock); while (TAS_SPIN(lock)) { - /* CPU-specific delay each time through the loop */ - SPIN_DELAY(); + perform_spin_delay(&delayStatus); + } - /* Block the process every spins_per_delay tries */ - if (++spins >= spins_per_delay) - { - if (++delays > NUM_DELAYS) - s_lock_stuck(lock, file, line); + finish_spin_delay(&delayStatus); - if (cur_delay == 0) /* first time to delay? */ - cur_delay = MIN_DELAY_USEC; + return delayStatus.delays; +} - pg_usleep(cur_delay); +#ifdef USE_DEFAULT_S_UNLOCK +void +s_unlock(volatile slock_t *lock) +{ +#ifdef TAS_ACTIVE_WORD + /* HP's PA-RISC */ + *TAS_ACTIVE_WORD(lock) = -1; +#else + *lock = 0; +#endif +} +#endif + +/* + * Wait while spinning on a contended spinlock. + */ +void +perform_spin_delay(SpinDelayStatus *status) +{ + /* CPU-specific delay each time through the loop */ + SPIN_DELAY(); + + /* Block the process every spins_per_delay tries */ + if (++(status->spins) >= spins_per_delay) + { + if (++(status->delays) > NUM_DELAYS) + s_lock_stuck(status->ptr, status->file, status->line); + + if (status->cur_delay == 0) /* first time to delay? */ + status->cur_delay = MIN_DELAY_USEC; + + pg_usleep(status->cur_delay); #if defined(S_LOCK_TEST) - fprintf(stdout, "*"); - fflush(stdout); + fprintf(stdout, "*"); + fflush(stdout); #endif - /* increase delay by a random fraction between 1X and 2X */ - cur_delay += (int) (cur_delay * + /* increase delay by a random fraction between 1X and 2X */ + status->cur_delay += (int) (status->cur_delay * ((double) random() / (double) MAX_RANDOM_VALUE) + 0.5); - /* wrap back to minimum delay when max is exceeded */ - if (cur_delay > MAX_DELAY_USEC) - cur_delay = MIN_DELAY_USEC; + /* wrap back to minimum delay when max is exceeded */ + if (status->cur_delay > MAX_DELAY_USEC) + status->cur_delay = MIN_DELAY_USEC; - spins = 0; - } + status->spins = 0; } +} - /* - * If we were able to acquire the lock without delaying, it's a good - * indication we are in a multiprocessor. If we had to delay, it's a sign - * (but not a sure thing) that we are in a uniprocessor. Hence, we - * decrement spins_per_delay slowly when we had to delay, and increase it - * rapidly when we didn't. It's expected that spins_per_delay will - * converge to the minimum value on a uniprocessor and to the maximum - * value on a multiprocessor. - * - * Note: spins_per_delay is local within our current process. We want to - * average these observations across multiple backends, since it's - * relatively rare for this function to even get entered, and so a single - * backend might not live long enough to converge on a good value. That - * is handled by the two routines below. - */ - if (cur_delay == 0) +/* + * After acquiring a spinlock, update estimates about how long to loop. + * + * If we were able to acquire the lock without delaying, it's a good + * indication we are in a multiprocessor. If we had to delay, it's a sign + * (but not a sure thing) that we are in a uniprocessor. Hence, we + * decrement spins_per_delay slowly when we had to delay, and increase it + * rapidly when we didn't. It's expected that spins_per_delay will + * converge to the minimum value on a uniprocessor and to the maximum + * value on a multiprocessor. + * + * Note: spins_per_delay is local within our current process. We want to + * average these observations across multiple backends, since it's + * relatively rare for this function to even get entered, and so a single + * backend might not live long enough to converge on a good value. That + * is handled by the two routines below. + */ +void +finish_spin_delay(SpinDelayStatus *status) +{ + if (status->cur_delay == 0) { /* we never had to delay */ if (spins_per_delay < MAX_SPINS_PER_DELAY) @@ -151,22 +181,8 @@ s_lock(volatile slock_t *lock, const char *file, int line) if (spins_per_delay > MIN_SPINS_PER_DELAY) spins_per_delay = Max(spins_per_delay - 1, MIN_SPINS_PER_DELAY); } - return delays; } -#ifdef USE_DEFAULT_S_UNLOCK -void -s_unlock(volatile slock_t *lock) -{ -#ifdef TAS_ACTIVE_WORD - /* HP's PA-RISC */ - *TAS_ACTIVE_WORD(lock) = -1; -#else - *lock = 0; -#endif -} -#endif - /* * Set local copy of spins_per_delay during backend startup. * diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 61e3060c9f..b2d7776f2a 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -63,12 +63,15 @@ extern void ShmemBackendArrayAllocation(void); #endif /* - * Note: MAX_BACKENDS is limited to 2^23-1 because inval.c stores the - * backend ID as a 3-byte signed integer. Even if that limitation were - * removed, we still could not exceed INT_MAX/4 because some places compute - * 4*MaxBackends without any overflow check. This is rechecked in the relevant - * GUC check hooks and in RegisterBackgroundWorker(). + * Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved + * for buffer references in buf_internals.h. This limitation could be lifted + * by using a 64bit state; but it's unlikely to be worthwhile as 2^18-1 + * backends exceed currently realistic configurations. Even if that limitation + * were removed, we still could not a) exceed 2^23-1 because inval.c stores + * the backend ID as a 3-byte signed integer, b) INT_MAX/4 because some places + * compute 4*MaxBackends without any overflow check. This is rechecked in the + * relevant GUC check hooks and in RegisterBackgroundWorker(). */ -#define MAX_BACKENDS 0x7fffff +#define MAX_BACKENDS 0x3FFFF #endif /* _POSTMASTER_H */ diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index d04363b166..f8f71255fd 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -21,29 +21,51 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/smgr.h" +#include "port/atomics.h" #include "storage/spin.h" #include "utils/relcache.h" +/* + * Buffer state is a single 32-bit variable where following data is combined. + * + * - 18 bits refcount + * - 4 bits usage count + * - 10 bits of flags + * + * Combining these values allows to perform some operations without locking + * the buffer header, by modifying them together with a CAS loop. + * + * The definition of buffer state components is below. + */ +#define BUF_REFCOUNT_ONE 1 +#define BUF_REFCOUNT_MASK ((1U << 18) - 1) +#define BUF_USAGECOUNT_MASK 0x003C0000U +#define BUF_USAGECOUNT_ONE (1U << 18) +#define BUF_USAGECOUNT_SHIFT 18 +#define BUF_FLAG_MASK 0xFFC00000U + +/* Get refcount and usagecount from buffer state */ +#define BUF_STATE_GET_REFCOUNT(state) ((state) & BUF_REFCOUNT_MASK) +#define BUF_STATE_GET_USAGECOUNT(state) (((state) & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT) + /* * Flags for buffer descriptors * * Note: TAG_VALID essentially means that there is a buffer hashtable * entry associated with the buffer's tag. */ -#define BM_DIRTY (1 << 0) /* data needs writing */ -#define BM_VALID (1 << 1) /* data is valid */ -#define BM_TAG_VALID (1 << 2) /* tag is assigned */ -#define BM_IO_IN_PROGRESS (1 << 3) /* read or write in progress */ -#define BM_IO_ERROR (1 << 4) /* previous I/O failed */ -#define BM_JUST_DIRTIED (1 << 5) /* dirtied since write started */ -#define BM_PIN_COUNT_WAITER (1 << 6) /* have waiter for sole pin */ -#define BM_CHECKPOINT_NEEDED (1 << 7) /* must write for checkpoint */ -#define BM_PERMANENT (1 << 8) /* permanent relation (not +#define BM_LOCKED (1U << 22) /* buffer header is locked */ +#define BM_DIRTY (1U << 23) /* data needs writing */ +#define BM_VALID (1U << 24) /* data is valid */ +#define BM_TAG_VALID (1U << 25) /* tag is assigned */ +#define BM_IO_IN_PROGRESS (1U << 26) /* read or write in progress */ +#define BM_IO_ERROR (1U << 27) /* previous I/O failed */ +#define BM_JUST_DIRTIED (1U << 28) /* dirtied since write started */ +#define BM_PIN_COUNT_WAITER (1U << 29) /* have waiter for sole pin */ +#define BM_CHECKPOINT_NEEDED (1U << 30) /* must write for checkpoint */ +#define BM_PERMANENT (1U << 31) /* permanent relation (not * unlogged) */ - -typedef bits16 BufFlags; - /* * The maximum allowed value of usage_count represents a tradeoff between * accuracy and speed of the clock-sweep buffer management algorithm. A @@ -113,18 +135,29 @@ typedef struct buftag /* * BufferDesc -- shared descriptor/state data for a single shared buffer. * - * Note: buf_hdr_lock must be held to examine or change the tag, flags, - * usage_count, refcount, or wait_backend_pid fields. buf_id field never - * changes after initialization, so does not need locking. freeNext is - * protected by the buffer_strategy_lock not buf_hdr_lock. The LWLock can - * take care of itself. The buf_hdr_lock is *not* used to control access to - * the data in the buffer! + * Note: Buffer header lock (BM_LOCKED flag) must be held to examine or change + * the tag, state or wait_backend_pid fields. In general, buffer header lock + * is a spinlock which is combined with flags, refcount and usagecount into + * single atomic variable. This layout allow us to do some operations in a + * single atomic operation, without actually acquiring and releasing spinlock; + * for instance, increase or decrease refcount. buf_id field never changes + * after initialization, so does not need locking. freeNext is protected by + * the buffer_strategy_lock not buffer header lock. The LWLock can take care + * of itself. The buffer header lock is *not* used to control access to the + * data in the buffer! + * + * It's assumed that nobody changes the state field while buffer header lock + * is held. Thus buffer header lock holder can do complex updates of the + * state variable in single write, simultaneously with lock release (cleaning + * BM_LOCKED flag). On the other hand, updating of state without holding + * buffer header lock is restricted to CAS, which insure that BM_LOCKED flag + * is not set. Atomic increment/decrement, OR/AND etc. are not allowed. * * An exception is that if we have the buffer pinned, its tag can't change - * underneath us, so we can examine the tag without locking the spinlock. + * underneath us, so we can examine the tag without locking the buffer header. * Also, in places we do one-time reads of the flags without bothering to - * lock the spinlock; this is generally for situations where we don't expect - * the flag bit being tested to be changing. + * lock the buffer header; this is generally for situations where we don't + * expect the flag bit being tested to be changing. * * We can't physically remove items from a disk page if another backend has * the buffer pinned. Hence, a backend may need to wait for all other pins @@ -142,13 +175,12 @@ typedef struct buftag typedef struct BufferDesc { BufferTag tag; /* ID of page contained in buffer */ - BufFlags flags; /* see bit definitions above */ - uint8 usage_count; /* usage counter for clock sweep code */ - slock_t buf_hdr_lock; /* protects a subset of fields, see above */ - unsigned refcount; /* # of backends holding pins on buffer */ - int wait_backend_pid; /* backend PID of pin-count waiter */ - int buf_id; /* buffer's index number (from 0) */ + + /* state of the tag, containing flags, refcount and usagecount */ + pg_atomic_uint32 state; + + int wait_backend_pid; /* backend PID of pin-count waiter */ int freeNext; /* link in freelist chain */ LWLock content_lock; /* to lock access to buffer contents */ @@ -202,11 +234,15 @@ extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray; #define FREENEXT_NOT_IN_LIST (-2) /* - * Macros for acquiring/releasing a shared buffer header's spinlock. - * Do not apply these to local buffers! + * Functions for acquiring/releasing a shared buffer header's spinlock. Do + * not apply these to local buffers! */ -#define LockBufHdr(bufHdr) SpinLockAcquire(&(bufHdr)->buf_hdr_lock) -#define UnlockBufHdr(bufHdr) SpinLockRelease(&(bufHdr)->buf_hdr_lock) +extern uint32 LockBufHdr(BufferDesc *desc); +#define UnlockBufHdr(desc, s) \ + do { \ + pg_atomic_write_u32(&(desc)->state, (s) & (~BM_LOCKED)); \ + pg_write_barrier(); \ + } while (0) /* @@ -267,7 +303,8 @@ extern void IssuePendingWritebacks(WritebackContext *context); extern void ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag); /* freelist.c */ -extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy); +extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, + uint32 *buf_state); extern void StrategyFreeBuffer(BufferDesc *buf); extern bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf); diff --git a/src/include/storage/s_lock.h b/src/include/storage/s_lock.h index 8b240cd647..11410e221b 100644 --- a/src/include/storage/s_lock.h +++ b/src/include/storage/s_lock.h @@ -991,4 +991,22 @@ extern int s_lock(volatile slock_t *lock, const char *file, int line); extern void set_spins_per_delay(int shared_spins_per_delay); extern int update_spins_per_delay(int shared_spins_per_delay); +/* + * Support for spin delay which is useful in various places where + * spinlock-like procedures take place. + */ +typedef struct +{ + int spins; + int delays; + int cur_delay; + void *ptr; + const char *file; + int line; +} SpinDelayStatus; + +#define init_spin_delay(ptr) {0, 0, 0, (ptr), __FILE__, __LINE__} +void perform_spin_delay(SpinDelayStatus *status); +void finish_spin_delay(SpinDelayStatus *status); + #endif /* S_LOCK_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e293fc0bc8..cdaad2089a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1859,6 +1859,7 @@ SpGistScanOpaqueData SpGistState SpGistTypeDesc SpecialJoinInfo +SpinDelayStatus SplitInterval SplitLR SplitVar -- 2.40.0