*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.71 2008/06/19 21:32:56 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.72 2008/06/20 00:24:53 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "storage/proc.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
+#include "storage/spin.h"
/*
* can operate in parallel with one or more readers, because the writer
* has no need to touch anyone's ProcState, except in the infrequent cases
* when SICleanupQueue is needed. The only point of overlap is that
- * the writer might change maxMsgNum while readers are looking at it.
- * This should be okay: we are assuming that fetching or storing an int
- * is atomic, an assumption also made elsewhere in Postgres. However
- * readers mustn't assume that maxMsgNum isn't changing under them.
+ * the writer wants to change maxMsgNum while readers need to read it.
+ * We deal with that by having a spinlock that readers must take for just
+ * long enough to read maxMsgNum, while writers take it for just long enough
+ * to write maxMsgNum. (The exact rule is that you need the spinlock to
+ * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
+ * spinlock to write maxMsgNum unless you are holding both locks.)
+ *
+ * Note: since maxMsgNum is an int and hence presumably atomically readable/
+ * writable, the spinlock might seem unnecessary. The reason it is needed
+ * is to provide a memory barrier: we need to be sure that messages written
+ * to the array are actually there before maxMsgNum is increased, and that
+ * readers will see that data after fetching maxMsgNum. Multiprocessors
+ * that have weak memory-ordering guarantees can fail without the memory
+ * barrier instructions that are included in the spinlock sequences.
*/
int lastBackend; /* index of last active procState entry, +1 */
int maxBackends; /* size of procState array */
+ slock_t msgnumLock; /* spinlock protecting maxMsgNum */
+
/*
* Circular buffer holding shared-inval messages
*/
if (found)
return;
- /* Clear message counters, save size of procState array */
+ /* Clear message counters, save size of procState array, init spinlock */
shmInvalBuffer->minMsgNum = 0;
shmInvalBuffer->maxMsgNum = 0;
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
shmInvalBuffer->lastBackend = 0;
shmInvalBuffer->maxBackends = MaxBackends;
+ SpinLockInit(&shmInvalBuffer->msgnumLock);
/* The buffer[] array is initially all unused, so we need not fill it */
{
int nthistime = Min(n, WRITE_QUANTUM);
int numMsgs;
+ int max;
n -= nthistime;
/*
* Insert new message(s) into proper slot of circular buffer
*/
+ max = segP->maxMsgNum;
while (nthistime-- > 0)
{
- segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data++;
- segP->maxMsgNum++;
+ segP->buffer[max % MAXNUMMESSAGES] = *data++;
+ max++;
+ }
+
+ /* Update current value of maxMsgNum using spinlock */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile SISeg *vsegP = segP;
+
+ SpinLockAcquire(&vsegP->msgnumLock);
+ vsegP->maxMsgNum = max;
+ SpinLockRelease(&vsegP->msgnumLock);
}
LWLockRelease(SInvalWriteLock);
{
SISeg *segP;
ProcState *stateP;
+ int max;
int n;
LWLockAcquire(SInvalReadLock, LW_SHARED);
segP = shmInvalBuffer;
stateP = &segP->procState[MyBackendId - 1];
+ /* Fetch current value of maxMsgNum using spinlock */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile SISeg *vsegP = segP;
+
+ SpinLockAcquire(&vsegP->msgnumLock);
+ max = vsegP->maxMsgNum;
+ SpinLockRelease(&vsegP->msgnumLock);
+ }
+
if (stateP->resetState)
{
/*
* since the reset, as well; and that means we should clear the
* signaled flag, too.
*/
- stateP->nextMsgNum = segP->maxMsgNum;
+ stateP->nextMsgNum = max;
stateP->resetState = false;
stateP->signaled = false;
LWLockRelease(SInvalReadLock);
* There may be other backends that haven't read the message(s), so we
* cannot delete them here. SICleanupQueue() will eventually remove them
* from the queue.
- *
- * Note: depending on the compiler, we might read maxMsgNum only once
- * here, or each time through the loop. It doesn't matter (as long as
- * each fetch is atomic).
*/
n = 0;
- while (n < datasize && stateP->nextMsgNum < segP->maxMsgNum)
+ while (n < datasize && stateP->nextMsgNum < max)
{
data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
stateP->nextMsgNum++;
/*
* Reset our "signaled" flag whenever we have caught up completely.
*/
- if (stateP->nextMsgNum >= segP->maxMsgNum)
+ if (stateP->nextMsgNum >= max)
stateP->signaled = false;
LWLockRelease(SInvalReadLock);