*
*
* To avoid - as we used to - requiring an array with NBuffers entries to keep
- * track of local buffers we use a small sequentially searched array
+ * track of local buffers, we use a small sequentially searched array
* (PrivateRefCountArray) and a overflow hash table (PrivateRefCountHash) to
* keep track of backend local pins.
*
*
* Note that in most scenarios the number of pinned buffers will not exceed
* REFCOUNT_ARRAY_ENTRIES.
+ *
+ *
+ * To enter a buffer into the refcount tracking mechanism first reserve a free
+ * entry using ReservePrivateRefCountEntry() and then later, if necessary,
+ * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing
+ * memory allocations in NewPrivateRefCountEntry() which can be important
+ * because in some scenarios it's called with a spinlock held...
*/
static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES];
static HTAB *PrivateRefCountHash = NULL;
static int32 PrivateRefCountOverflowed = 0;
static uint32 PrivateRefCountClock = 0;
+static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
-static PrivateRefCountEntry* GetPrivateRefCountEntry(Buffer buffer, bool create, bool do_move);
+static void ReservePrivateRefCountEntry(void);
+static PrivateRefCountEntry* NewPrivateRefCountEntry(Buffer buffer);
+static PrivateRefCountEntry* GetPrivateRefCountEntry(Buffer buffer, bool do_move);
static inline int32 GetPrivateRefCount(Buffer buffer);
static void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref);
+/*
+ * Ensure that the the PrivateRefCountArray has sufficient space to store one
+ * more entry. This has to be called before using NewPrivateRefCountEntry() to
+ * fill a new entry - but it's perfectly fine to not use a reserved entry.
+ */
+static void
+ReservePrivateRefCountEntry(void)
+{
+ /* Already reserved (or freed), nothing to do */
+ if (ReservedRefCountEntry != NULL)
+ return;
+
+ /*
+ * First search for a free entry the array, that'll be sufficient in the
+ * majority of cases.
+ */
+ {
+ int i;
+
+ for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
+ {
+ PrivateRefCountEntry *res;
+
+ res = &PrivateRefCountArray[i];
+
+ if (res->buffer == InvalidBuffer)
+ {
+ ReservedRefCountEntry = res;
+ return;
+ }
+ }
+ }
+
+ /*
+ * No luck. All array entries are full. Move one array entry into the hash
+ * table.
+ */
+ {
+ /*
+ * Move entry from the current clock position in the array into the
+ * hashtable. Use that slot.
+ */
+ PrivateRefCountEntry *hashent;
+ bool found;
+
+ /* select victim slot */
+ ReservedRefCountEntry =
+ &PrivateRefCountArray[PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
+
+ /* Better be used, otherwise we shouldn't get here. */
+ Assert(ReservedRefCountEntry->buffer != InvalidBuffer);
+
+ /* enter victim array entry into hashtable */
+ hashent = hash_search(PrivateRefCountHash,
+ (void *) &(ReservedRefCountEntry->buffer),
+ HASH_ENTER,
+ &found);
+ Assert(!found);
+ hashent->refcount = ReservedRefCountEntry->refcount;
+
+ /* clear the now free array slot */
+ ReservedRefCountEntry->buffer = InvalidBuffer;
+ ReservedRefCountEntry->refcount = 0;
+
+ PrivateRefCountOverflowed++;
+ }
+}
+
+/*
+ * Fill a previously reserved refcount entry.
+ */
+static PrivateRefCountEntry*
+NewPrivateRefCountEntry(Buffer buffer)
+{
+ PrivateRefCountEntry *res;
+
+ /* only allowed to be called when a reservation has been made */
+ Assert(ReservedRefCountEntry != NULL);
+
+ /* use up the reserved entry */
+ res = ReservedRefCountEntry;
+ ReservedRefCountEntry = NULL;
+
+ /* and fill it */
+ res->buffer = buffer;
+ res->refcount = 0;
+
+ return res;
+}
+
/*
* Return the PrivateRefCount entry for the passed buffer.
*
- * Returns NULL if create = false is passed and the buffer doesn't have a
- * PrivateRefCount entry; allocates a new PrivateRefCountEntry if currently
- * none exists and create = true is passed.
- *
- * If do_move is true - only allowed for create = false - the entry is
- * optimized for frequent access.
- *
- * When a returned refcount entry isn't used anymore it has to be forgotten,
- * using ForgetPrivateRefCountEntry().
- *
- * Only works for shared buffers.
+ * Returns NULL if a buffer doesn't have a refcount entry. Otherwise, if
+ * do_move is true, and the entry resides in the hashtable the entry is
+ * optimized for frequent access by moving it to the array.
*/
static PrivateRefCountEntry*
-GetPrivateRefCountEntry(Buffer buffer, bool create, bool do_move)
+GetPrivateRefCountEntry(Buffer buffer, bool do_move)
{
PrivateRefCountEntry *res;
- PrivateRefCountEntry *free = NULL;
- bool found = false;
int i;
- Assert(!create || do_move);
Assert(BufferIsValid(buffer));
Assert(!BufferIsLocal(buffer));
if (res->buffer == buffer)
return res;
-
- /* Remember where to put a new refcount, should it become necessary. */
- if (free == NULL && res->buffer == InvalidBuffer)
- free = res;
}
/*
* By here we know that the buffer, if already pinned, isn't residing in
* the array.
+ *
+ * Only look up the buffer in the hashtable if we've previously overflowed
+ * into it.
*/
- res = NULL;
- found = false;
+ if (PrivateRefCountOverflowed == 0)
+ return NULL;
- /*
- * Look up the buffer in the hashtable if we've previously overflowed into
- * it.
- */
- if (PrivateRefCountOverflowed > 0)
+ res = hash_search(PrivateRefCountHash,
+ (void *) &buffer,
+ HASH_FIND,
+ NULL);
+
+ if (res == NULL)
+ return NULL;
+ else if (!do_move)
{
- res = hash_search(PrivateRefCountHash,
- (void *) &buffer,
- HASH_FIND,
- &found);
+ /* caller doesn't want us to move the hash entry into the array */
+ return res;
}
-
- if (!found)
+ else
{
- if (!create)
- {
- /* Neither array nor hash have an entry and no new entry is needed */
- return NULL;
- }
- else if (free != NULL)
- {
- /* add entry into the free array slot */
- free->buffer = buffer;
- free->refcount = 0;
-
- return free;
- }
- else
- {
- /*
- * Move entry from the current clock position in the array into the
- * hashtable. Use that slot.
- */
- PrivateRefCountEntry *arrayent;
- PrivateRefCountEntry *hashent;
-
- /* select victim slot */
- arrayent = &PrivateRefCountArray[
- PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
- Assert(arrayent->buffer != InvalidBuffer);
+ /* move buffer from hashtable into the free array slot */
+ bool found;
+ PrivateRefCountEntry *free;
- /* enter victim array entry into hashtable */
- hashent = hash_search(PrivateRefCountHash,
- (void *) &arrayent->buffer,
- HASH_ENTER,
- &found);
- Assert(!found);
- hashent->refcount = arrayent->refcount;
+ /* Ensure there's a free array slot */
+ ReservePrivateRefCountEntry();
- /* fill the now free array slot */
- arrayent->buffer = buffer;
- arrayent->refcount = 0;
+ /* Use up the reserved slot */
+ Assert(ReservedRefCountEntry != NULL);
+ free = ReservedRefCountEntry;
+ ReservedRefCountEntry = NULL;
+ Assert(free->buffer == InvalidBuffer);
- PrivateRefCountOverflowed++;
+ /* and fill it */
+ free->buffer = buffer;
+ free->refcount = res->refcount;
- return arrayent;
+ /* delete from hashtable */
+ hash_search(PrivateRefCountHash,
+ (void *) &buffer,
+ HASH_REMOVE,
+ &found);
+ Assert(found);
+ Assert(PrivateRefCountOverflowed > 0);
+ PrivateRefCountOverflowed--;
- }
+ return free;
}
- else
- {
- if (!do_move)
- {
- return res;
- }
- else if (found && free != NULL)
- {
- /* move buffer from hashtable into the free array slot */
-
- /* fill array slot */
- free->buffer = buffer;
- free->refcount = res->refcount;
-
- /* delete from hashtable */
- hash_search(PrivateRefCountHash,
- (void *) &buffer,
- HASH_REMOVE,
- &found);
- Assert(found);
- Assert(PrivateRefCountOverflowed > 0);
- PrivateRefCountOverflowed--;
-
- return free;
- }
- else
- {
- /*
- * Swap the entry in the hash table with the one in the array at the
- * current clock position.
- */
- PrivateRefCountEntry *arrayent;
- PrivateRefCountEntry *hashent;
-
- /* select victim slot */
- arrayent = &PrivateRefCountArray[
- PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
- Assert(arrayent->buffer != InvalidBuffer);
-
- /* enter victim entry into the hashtable */
- hashent = hash_search(PrivateRefCountHash,
- (void *) &arrayent->buffer,
- HASH_ENTER,
- &found);
- Assert(!found);
- hashent->refcount = arrayent->refcount;
-
- /* fill now free array entry with previously searched entry */
- arrayent->buffer = res->buffer;
- arrayent->refcount = res->refcount;
-
- /* and remove the old entry */
- hash_search(PrivateRefCountHash,
- (void *) &arrayent->buffer,
- HASH_REMOVE,
- &found);
- Assert(found);
-
- /* PrivateRefCountOverflowed stays the same -1 + +1 = 0*/
-
- return arrayent;
- }
- }
-
- Assert(false); /* unreachable */
- return NULL;
}
/*
Assert(BufferIsValid(buffer));
Assert(!BufferIsLocal(buffer));
- ref = GetPrivateRefCountEntry(buffer, false, false);
+ /*
+ * Not moving the entry - that's ok for the current users, but we might
+ * want to change this one day.
+ */
+ ref = GetPrivateRefCountEntry(buffer, false);
if (ref == NULL)
return 0;
ref < &PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES])
{
ref->buffer = InvalidBuffer;
+ /*
+ * Mark the just used entry as reserved - in many scenarios that
+ * allows us to avoid ever having to search the array/hash for free
+ * entries.
+ */
+ ReservedRefCountEntry = ref;
}
else
{
/* Loop here in case we have to try another victim buffer */
for (;;)
{
+ /*
+ * Ensure, while the spinlock's not yet held, that there's a free refcount
+ * entry.
+ */
+ ReservePrivateRefCountEntry();
+
/*
* Select a victim buffer. The buffer is returned with its header
* spinlock still held!
bool result;
PrivateRefCountEntry *ref;
- ref = GetPrivateRefCountEntry(b + 1, true, true);
+ ref = GetPrivateRefCountEntry(b + 1, true);
- if (ref->refcount == 0)
+ if (ref == NULL)
{
+ ReservePrivateRefCountEntry();
+ ref = NewPrivateRefCountEntry(b + 1);
+
LockBufHdr(buf);
buf->refcount++;
if (strategy == NULL)
* PinBuffer_Locked -- as above, but caller already locked the buffer header.
* The spinlock is released before return.
*
+ * As this function is called with the spinlock held, the caller has to
+ * previously call ReservePrivateRefCountEntry().
+ *
* Currently, no callers of this function want to modify the buffer's
* usage_count at all, so there's no need for a strategy parameter.
* Also we don't bother with a BM_VALID test (the caller could check that for
* itself).
*
+ * Also all callers only ever use this function when it's known that the
+ * buffer can't have a preexisting pin by this backend. That allows us to skip
+ * searching the private refcount array & hash, which is a boon, because the
+ * spinlock is still held.
+ *
* Note: use of this routine is frequently mandatory, not just an optimization
* to save a spin lock/unlock cycle, because we need to pin a buffer before
* its state can change under us.
int b = buf->buf_id;
PrivateRefCountEntry *ref;
- ref = GetPrivateRefCountEntry(b + 1, true, true);
+ /*
+ * As explained, We don't expect any preexisting pins. That allows us to
+ * manipulate the PrivateRefCount after releasing the spinlock
+ */
+ Assert(GetPrivateRefCountEntry(b + 1, false) == NULL);
- if (ref->refcount == 0)
- buf->refcount++;
+ buf->refcount++;
UnlockBufHdr(buf);
+
+ ref = NewPrivateRefCountEntry(b + 1);
ref->refcount++;
- Assert(ref->refcount > 0);
+
ResourceOwnerRememberBuffer(CurrentResourceOwner,
BufferDescriptorGetBuffer(buf));
}
UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
{
PrivateRefCountEntry *ref;
- int b = buf->buf_id;
- ref = GetPrivateRefCountEntry(b + 1, false, false);
+ /* not moving as we're likely deleting it soon anyway */
+ ref = GetPrivateRefCountEntry(buf->buf_id + 1, false);
Assert(ref != NULL);
if (fixOwner)
volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
int result = 0;
+ ReservePrivateRefCountEntry();
+
/*
* Check whether buffer needs writing.
*
if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
continue;
+ ReservePrivateRefCountEntry();
+
LockBufHdr(bufHdr);
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
if (bufHdr->tag.rnode.dbNode != dbid)
continue;
+ ReservePrivateRefCountEntry();
+
LockBufHdr(bufHdr);
if (bufHdr->tag.rnode.dbNode == dbid &&
(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
void
ReleaseBuffer(Buffer buffer)
{
- volatile BufferDesc *bufHdr;
- PrivateRefCountEntry *ref;
-
if (!BufferIsValid(buffer))
elog(ERROR, "bad buffer ID: %d", buffer);
- ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
-
if (BufferIsLocal(buffer))
{
+ ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
+
Assert(LocalRefCount[-buffer - 1] > 0);
LocalRefCount[-buffer - 1]--;
return;
}
- bufHdr = &BufferDescriptors[buffer - 1];
-
- ref = GetPrivateRefCountEntry(buffer, false, false);
- Assert(ref != NULL);
- Assert(ref->refcount > 0);
-
- if (ref->refcount > 1)
- ref->refcount--;
- else
- UnpinBuffer(bufHdr, false);
+ UnpinBuffer(&BufferDescriptors[buffer - 1], true);
}
/*
else
{
PrivateRefCountEntry *ref;
- ref = GetPrivateRefCountEntry(buffer, false, true);
+ ref = GetPrivateRefCountEntry(buffer, true);
Assert(ref != NULL);
ref->refcount++;
}