*/
#include "postgres.h"
+#include "port/atomics.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
+#include "storage/proc.h"
+
+#define INT_ACCESS_ONCE(var) ((int)(*((volatile int *)&(var))))
/*
/* Spinlock: protects the values below */
slock_t buffer_strategy_lock;
- /* Clock sweep hand: index of next buffer to consider grabbing */
- int nextVictimBuffer;
+ /*
+ * Clock sweep hand: index of next buffer to consider grabbing. Note that
+ * this isn't a concrete buffer - we only ever increase the value. So, to
+ * get an actual buffer, it needs to be used modulo NBuffers.
+ */
+ pg_atomic_uint32 nextVictimBuffer;
int firstFreeBuffer; /* Head of list of unused buffers */
int lastFreeBuffer; /* Tail of list of unused buffers */
* Statistics. These counters should be wide enough that they can't
* overflow during a single bgwriter cycle.
*/
- uint32 completePasses; /* Complete cycles of the clock sweep */
- uint32 numBufferAllocs; /* Buffers allocated since last reset */
+ uint32 completePasses; /* Complete cycles of the clock sweep */
+ pg_atomic_uint32 numBufferAllocs; /* Buffers allocated since last reset */
/*
- * Notification latch, or NULL if none. See StrategyNotifyBgWriter.
+ * Bgworker process to be notified upon activity or -1 if none. See
+ * StrategyNotifyBgWriter.
*/
- Latch *bgwriterLatch;
+ int bgwprocno;
} BufferStrategyControl;
/* Pointers to shared state */
static void AddBufferToRing(BufferAccessStrategy strategy,
volatile BufferDesc *buf);
+/*
+ * ClockSweepTick - Helper routine for StrategyGetBuffer()
+ *
+ * Move the clock hand one buffer ahead of its current position and return the
+ * id of the buffer now under the hand.
+ */
+static inline uint32
+ClockSweepTick(void)
+{
+ uint32 victim;
+
+ /*
+ * Atomically move hand ahead one buffer - if there's several processes
+ * doing this, this can lead to buffers being returned slightly out of
+ * apparent order.
+ */
+ victim =
+ pg_atomic_fetch_add_u32(&StrategyControl->nextVictimBuffer, 1);
+
+ if (victim >= NBuffers)
+ {
+ uint32 originalVictim = victim;
+
+ /* always wrap what we look up in BufferDescriptors */
+ victim = victim % NBuffers;
+
+ /*
+ * If we're the one that just caused a wraparound, force
+ * completePasses to be incremented while holding the spinlock. We
+ * need the spinlock so StrategySyncStart() can return a consistent
+ * value consisting of nextVictimBuffer and completePasses.
+ */
+ if (victim == 0)
+ {
+ uint32 expected;
+ uint32 wrapped;
+ bool success = false;
+
+ expected = originalVictim + 1;
+
+ while (!success)
+ {
+ /*
+ * Acquire the spinlock while increasing completePasses. That
+ * allows other readers to read nextVictimBuffer and
+ * completePasses in a consistent manner which is required for
+ * StrategySyncStart(). In theory delaying the increment
+ * could lead to a overflow of nextVictimBuffers, but that's
+ * highly unlikely and wouldn't be particularly harmful.
+ */
+ SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
+
+ wrapped = expected % NBuffers;
+
+ success = pg_atomic_compare_exchange_u32(&StrategyControl->nextVictimBuffer,
+ &expected, wrapped);
+ if (success)
+ StrategyControl->completePasses++;
+ SpinLockRelease(&StrategyControl->buffer_strategy_lock);
+ }
+ }
+ }
+ return victim;
+}
/*
* StrategyGetBuffer
StrategyGetBuffer(BufferAccessStrategy strategy)
{
volatile BufferDesc *buf;
- Latch *bgwriterLatch;
+ int bgwprocno;
int trycounter;
/*
return buf;
}
- /* Nope, so lock the freelist */
- SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
+ /*
+ * If asked, we need to waken the bgwriter. Since we don't want to rely on
+ * a spinlock for this we force a read from shared memory once, and then
+ * set the latch based on that value. We need to go through that length
+ * because otherwise bgprocno might be reset while/after we check because
+ * the compiler might just reread from memory.
+ *
+ * This can possibly set the latch of the wrong process if the bgwriter
+ * dies in the wrong moment. But since PGPROC->procLatch is never
+ * deallocated the worst consequence of that is that we set the latch of
+ * some arbitrary process.
+ */
+ bgwprocno = INT_ACCESS_ONCE(StrategyControl->bgwprocno);
+ if (bgwprocno != -1)
+ {
+ /* reset bgwprocno first, before setting the latch */
+ StrategyControl->bgwprocno = -1;
+ pg_write_barrier();
+
+ /*
+ * Not acquiring ProcArrayLock here which is slightly icky. It's
+ * actually fine because procLatch isn't ever freed, so we just can
+ * potentially set the wrong process' (or no process') latch.
+ */
+ SetLatch(&ProcGlobal->allProcs[bgwprocno].procLatch);
+ }
/*
* We count buffer allocation requests so that the bgwriter can estimate
* the rate of buffer consumption. Note that buffers recycled by a
* strategy object are intentionally not counted here.
*/
- StrategyControl->numBufferAllocs++;
+ pg_atomic_fetch_add_u32(&StrategyControl->numBufferAllocs, 1);
/*
- * If bgwriterLatch is set, we need to waken the bgwriter, but we should
- * not do so while holding buffer_strategy_lock; so release and re-grab.
- * This is annoyingly tedious, but it happens at most once per bgwriter
- * cycle, so the performance hit is minimal.
+ * First check, without acquiring the lock, whether there's buffers in the
+ * freelist. Since we otherwise don't require the spinlock in every
+ * StrategyGetBuffer() invocation, it'd be sad to acquire it here -
+ * uselessly in most cases. That obviously leaves a race where a buffer is
+ * put on the freelist but we don't see the store yet - but that's pretty
+ * harmless, it'll just get used during the next buffer acquisition.
+ *
+ * If there's buffers on the freelist, acquire the spinlock to pop one
+ * buffer of the freelist. Then check whether that buffer is usable and
+ * repeat if not.
+ *
+ * Note that the freeNext fields are considered to be protected by the
+ * buffer_strategy_lock not the individual buffer spinlocks, so it's OK to
+ * manipulate them without holding the spinlock.
*/
- bgwriterLatch = StrategyControl->bgwriterLatch;
- if (bgwriterLatch)
+ if (StrategyControl->firstFreeBuffer >= 0)
{
- StrategyControl->bgwriterLatch = NULL;
- SpinLockRelease(&StrategyControl->buffer_strategy_lock);
- SetLatch(bgwriterLatch);
- SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
- }
+ while (true)
+ {
+ /* Acquire the spinlock to remove element from the freelist */
+ SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
- /*
- * Try to get a buffer from the freelist. Note that the freeNext fields
- * are considered to be protected by the buffer_strategy_lock not the
- * individual buffer spinlocks, so it's OK to manipulate them without
- * holding the spinlock.
- */
- while (StrategyControl->firstFreeBuffer >= 0)
- {
- buf = &BufferDescriptors[StrategyControl->firstFreeBuffer];
- Assert(buf->freeNext != FREENEXT_NOT_IN_LIST);
+ if (StrategyControl->firstFreeBuffer < 0)
+ {
+ SpinLockRelease(&StrategyControl->buffer_strategy_lock);
+ break;
+ }
- /* Unconditionally remove buffer from freelist */
- StrategyControl->firstFreeBuffer = buf->freeNext;
- buf->freeNext = FREENEXT_NOT_IN_LIST;
+ buf = &BufferDescriptors[StrategyControl->firstFreeBuffer];
+ Assert(buf->freeNext != FREENEXT_NOT_IN_LIST);
- /*
- * Release the lock so someone else can access the freelist (or run
- * the clocksweep) while we check out this buffer.
- */
- SpinLockRelease(&StrategyControl->buffer_strategy_lock);
+ /* Unconditionally remove buffer from freelist */
+ StrategyControl->firstFreeBuffer = buf->freeNext;
+ buf->freeNext = FREENEXT_NOT_IN_LIST;
- /*
- * If the buffer is pinned or has a nonzero usage_count, we cannot use
- * it; discard it and retry. (This can only happen if VACUUM put a
- * valid buffer in the freelist and then someone else used it before
- * we got to it. It's probably impossible altogether as of 8.3, but
- * we'd better check anyway.)
- */
- LockBufHdr(buf);
- if (buf->refcount == 0 && buf->usage_count == 0)
- {
- if (strategy != NULL)
- AddBufferToRing(strategy, buf);
- return buf;
- }
- UnlockBufHdr(buf);
+ /*
+ * Release the lock so someone else can access the freelist while
+ * we check out this buffer.
+ */
+ SpinLockRelease(&StrategyControl->buffer_strategy_lock);
- /* Reacquire the lock and go around for another pass. */
- SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
+ /*
+ * If the buffer is pinned or has a nonzero usage_count, we cannot
+ * use it; discard it and retry. (This can only happen if VACUUM
+ * put a valid buffer in the freelist and then someone else used
+ * it before we got to it. It's probably impossible altogether as
+ * of 8.3, but we'd better check anyway.)
+ */
+ LockBufHdr(buf);
+ if (buf->refcount == 0 && buf->usage_count == 0)
+ {
+ if (strategy != NULL)
+ AddBufferToRing(strategy, buf);
+ return buf;
+ }
+ UnlockBufHdr(buf);
+
+ }
}
/* Nothing on the freelist, so run the "clock sweep" algorithm */
trycounter = NBuffers;
for (;;)
{
- buf = &BufferDescriptors[StrategyControl->nextVictimBuffer];
-
- if (++StrategyControl->nextVictimBuffer >= NBuffers)
- {
- StrategyControl->nextVictimBuffer = 0;
- StrategyControl->completePasses++;
- }
- /* Release the lock before manipulating the candidate buffer. */
- SpinLockRelease(&StrategyControl->buffer_strategy_lock);
+ buf = &BufferDescriptors[ClockSweepTick()];
/*
* If the buffer is pinned or has a nonzero usage_count, we cannot use
elog(ERROR, "no unpinned buffers available");
}
UnlockBufHdr(buf);
-
- /* Reacquire the lock and get a new candidate buffer. */
- SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
}
}
int
StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc)
{
+ uint32 nextVictimBuffer;
int result;
SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
- result = StrategyControl->nextVictimBuffer;
+ nextVictimBuffer = pg_atomic_read_u32(&StrategyControl->nextVictimBuffer);
+ result = nextVictimBuffer % NBuffers;
+
if (complete_passes)
+ {
*complete_passes = StrategyControl->completePasses;
+ /*
+ * Additionally add the number of wraparounds that happened before
+ * completePasses could be incremented. C.f. ClockSweepTick().
+ */
+ *complete_passes += nextVictimBuffer / NBuffers;
+ }
+
if (num_buf_alloc)
{
- *num_buf_alloc = StrategyControl->numBufferAllocs;
- StrategyControl->numBufferAllocs = 0;
+ *num_buf_alloc = pg_atomic_exchange_u32(&StrategyControl->numBufferAllocs, 0);
}
SpinLockRelease(&StrategyControl->buffer_strategy_lock);
return result;
* from hibernation, and is not meant for anybody else to use.
*/
void
-StrategyNotifyBgWriter(Latch *bgwriterLatch)
+StrategyNotifyBgWriter(int bgwprocno)
{
/*
* We acquire buffer_strategy_lock just to ensure that the store appears
* infrequently, so there's no performance penalty from being safe.
*/
SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
- StrategyControl->bgwriterLatch = bgwriterLatch;
+ StrategyControl->bgwprocno = bgwprocno;
SpinLockRelease(&StrategyControl->buffer_strategy_lock);
}
StrategyControl->lastFreeBuffer = NBuffers - 1;
/* Initialize the clock sweep pointer */
- StrategyControl->nextVictimBuffer = 0;
+ pg_atomic_init_u32(&StrategyControl->nextVictimBuffer, 0);
/* Clear statistics */
StrategyControl->completePasses = 0;
- StrategyControl->numBufferAllocs = 0;
+ pg_atomic_init_u32(&StrategyControl->numBufferAllocs, 0);
/* No pending notification */
- StrategyControl->bgwriterLatch = NULL;
+ StrategyControl->bgwprocno = -1;
}
else
Assert(!init);