int nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */
bool signaled; /* backend has been sent catchup signal */
+ bool hasMessages; /* backend has unread messages */
/*
* Backend only sends invalidations, never receives them. This only makes
shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
shmInvalBuffer->procState[i].resetState = false;
shmInvalBuffer->procState[i].signaled = false;
+ shmInvalBuffer->procState[i].hasMessages = false;
shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
}
}
SISeg *segP = shmInvalBuffer;
/*
- * This can run in parallel with read operations, and for that matter with
- * write operations; but not in parallel with additions and removals of
- * backends, nor in parallel with SICleanupQueue. It doesn't seem worth
- * having a third lock, so we choose to use SInvalWriteLock to serialize
- * additions/removals.
+ * This can run in parallel with read operations, but not with write
+ * operations, since SIInsertDataEntries relies on lastBackend to set
+ * hasMessages appropriately.
*/
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
stateP->signaled = false;
+ stateP->hasMessages = false;
stateP->sendOnly = sendOnly;
LWLockRelease(SInvalWriteLock);
int nthistime = Min(n, WRITE_QUANTUM);
int numMsgs;
int max;
+ int i;
n -= nthistime;
SpinLockRelease(&vsegP->msgnumLock);
}
+ /*
+ * Now that the maxMsgNum change is globally visible, we give
+ * everyone a swift kick to make sure they read the newly added
+ * messages. Releasing SInvalWriteLock will enforce a full memory
+ * barrier, so these (unlocked) changes will be committed to memory
+ * before we exit the function.
+ */
+ for (i = 0; i < segP->lastBackend; i++)
+ {
+ ProcState *stateP = &segP->procState[i];
+ stateP->hasMessages = TRUE;
+ }
+
LWLockRelease(SInvalWriteLock);
}
}
int max;
int n;
- LWLockAcquire(SInvalReadLock, LW_SHARED);
-
segP = shmInvalBuffer;
stateP = &segP->procState[MyBackendId - 1];
+ /*
+ * Before starting to take locks, do a quick, unlocked test to see whether
+ * there can possibly be anything to read. On a multiprocessor system,
+ * it's possible that this load could migrate backwards and occur before we
+ * actually enter this function, so we might miss a sinval message that
+ * was just added by some other processor. But they can't migrate
+ * backwards over a preceding lock acquisition, so it should be OK. If
+ * we haven't acquired a lock preventing against further relevant
+ * invalidations, any such occurrence is not much different than if the
+ * invalidation had arrived slightly later in the first place.
+ */
+ if (!stateP->hasMessages)
+ return 0;
+
+ LWLockAcquire(SInvalReadLock, LW_SHARED);
+
+ /*
+ * We must reset hasMessages before determining how many messages we're
+ * going to read. That way, if new messages arrive after we have
+ * determined how many we're reading, the flag will get reset and we'll
+ * notice those messages part-way through.
+ *
+ * Note that, if we don't end up reading all of the messages, we had
+ * better be certain to reset this flag before exiting!
+ */
+ stateP->hasMessages = FALSE;
+
/* Fetch current value of maxMsgNum using spinlock */
{
/* use volatile pointer to prevent code rearrangement */
}
/*
- * Reset our "signaled" flag whenever we have caught up completely.
+ * If we have caught up completely, reset our "signaled" flag so that
+ * we'll get another signal if we fall behind again.
+ *
+ * If we haven't catch up completely, reset the hasMessages flag so that
+ * we see the remaining messages next time.
*/
if (stateP->nextMsgNum >= max)
stateP->signaled = false;
+ else
+ stateP->hasMessages = TRUE;
LWLockRelease(SInvalReadLock);
return n;