]> granicus.if.org Git - postgresql/commitdiff
Seems I was too optimistic in supposing that sinval's maxMsgNum could be
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 20 Jun 2008 00:24:53 +0000 (00:24 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 20 Jun 2008 00:24:53 +0000 (00:24 +0000)
read and written without a lock.  The value itself is atomic, sure, but on
processors with weak memory ordering it's possible for a reader to see the
value change before it sees the associated message written into the buffer
array.  Fix by introducing a spinlock that's used just to read and write
maxMsgNum.  (We could do this with less overhead if we recognized a concept
of "memory access barrier"; is it worth introducing such a thing?  At the
moment probably not --- I can't measure any clear slowdown from adding the
spinlock, so this solution is probably fine.)  Per buildfarm results.

src/backend/storage/ipc/sinvaladt.c

index 0befc4a93419a526d857219c7a80dc60a38e3436..e414e4a507105aef95fdde4e830b915b60490aa9 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.71 2008/06/19 21:32:56 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.72 2008/06/20 00:24:53 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -23,6 +23,7 @@
 #include "storage/proc.h"
 #include "storage/shmem.h"
 #include "storage/sinvaladt.h"
+#include "storage/spin.h"
 
 
 /*
  * can operate in parallel with one or more readers, because the writer
  * has no need to touch anyone's ProcState, except in the infrequent cases
  * when SICleanupQueue is needed.  The only point of overlap is that
- * the writer might change maxMsgNum while readers are looking at it.
- * This should be okay: we are assuming that fetching or storing an int
- * is atomic, an assumption also made elsewhere in Postgres.  However
- * readers mustn't assume that maxMsgNum isn't changing under them.
+ * the writer wants to change maxMsgNum while readers need to read it.
+ * We deal with that by having a spinlock that readers must take for just
+ * long enough to read maxMsgNum, while writers take it for just long enough
+ * to write maxMsgNum.  (The exact rule is that you need the spinlock to
+ * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
+ * spinlock to write maxMsgNum unless you are holding both locks.)
+ *
+ * Note: since maxMsgNum is an int and hence presumably atomically readable/
+ * writable, the spinlock might seem unnecessary.  The reason it is needed
+ * is to provide a memory barrier: we need to be sure that messages written
+ * to the array are actually there before maxMsgNum is increased, and that
+ * readers will see that data after fetching maxMsgNum.  Multiprocessors
+ * that have weak memory-ordering guarantees can fail without the memory
+ * barrier instructions that are included in the spinlock sequences.
  */
 
 
@@ -153,6 +164,8 @@ typedef struct SISeg
        int                     lastBackend;    /* index of last active procState entry, +1 */
        int                     maxBackends;    /* size of procState array */
 
+       slock_t         msgnumLock;             /* spinlock protecting maxMsgNum */
+
        /*
         * Circular buffer holding shared-inval messages
         */
@@ -209,12 +222,13 @@ CreateSharedInvalidationState(void)
        if (found)
                return;
 
-       /* Clear message counters, save size of procState array */
+       /* Clear message counters, save size of procState array, init spinlock */
        shmInvalBuffer->minMsgNum = 0;
        shmInvalBuffer->maxMsgNum = 0;
        shmInvalBuffer->nextThreshold = CLEANUP_MIN;
        shmInvalBuffer->lastBackend = 0;
        shmInvalBuffer->maxBackends = MaxBackends;
+       SpinLockInit(&shmInvalBuffer->msgnumLock);
 
        /* The buffer[] array is initially all unused, so we need not fill it */
 
@@ -365,6 +379,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
        {
                int             nthistime = Min(n, WRITE_QUANTUM);
                int             numMsgs;
+               int             max;
 
                n -= nthistime;
 
@@ -388,10 +403,21 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
                /*
                 * Insert new message(s) into proper slot of circular buffer
                 */
+               max = segP->maxMsgNum;
                while (nthistime-- > 0)
                {
-                       segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data++;
-                       segP->maxMsgNum++;
+                       segP->buffer[max % MAXNUMMESSAGES] = *data++;
+                       max++;
+               }
+
+               /* Update current value of maxMsgNum using spinlock */
+               {
+                       /* use volatile pointer to prevent code rearrangement */
+                       volatile SISeg *vsegP = segP;
+
+                       SpinLockAcquire(&vsegP->msgnumLock);
+                       vsegP->maxMsgNum = max;
+                       SpinLockRelease(&vsegP->msgnumLock);
                }
 
                LWLockRelease(SInvalWriteLock);
@@ -431,6 +457,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
 {
        SISeg      *segP;
        ProcState  *stateP;
+       int                     max;
        int                     n;
        
        LWLockAcquire(SInvalReadLock, LW_SHARED);
@@ -438,6 +465,16 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
        segP = shmInvalBuffer;
        stateP = &segP->procState[MyBackendId - 1];
 
+       /* Fetch current value of maxMsgNum using spinlock */
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile SISeg *vsegP = segP;
+
+               SpinLockAcquire(&vsegP->msgnumLock);
+               max = vsegP->maxMsgNum;
+               SpinLockRelease(&vsegP->msgnumLock);
+       }
+
        if (stateP->resetState)
        {
                /*
@@ -445,7 +482,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
                 * since the reset, as well; and that means we should clear the
                 * signaled flag, too.
                 */
-               stateP->nextMsgNum = segP->maxMsgNum;
+               stateP->nextMsgNum = max;
                stateP->resetState = false;
                stateP->signaled = false;
                LWLockRelease(SInvalReadLock);
@@ -459,13 +496,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
         * There may be other backends that haven't read the message(s), so we
         * cannot delete them here.  SICleanupQueue() will eventually remove them
         * from the queue.
-        *
-        * Note: depending on the compiler, we might read maxMsgNum only once
-        * here, or each time through the loop.  It doesn't matter (as long as
-        * each fetch is atomic).
         */
        n = 0;
-       while (n < datasize && stateP->nextMsgNum < segP->maxMsgNum)
+       while (n < datasize && stateP->nextMsgNum < max)
        {
                data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
                stateP->nextMsgNum++;
@@ -474,7 +507,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
        /*
         * Reset our "signaled" flag whenever we have caught up completely.
         */
-       if (stateP->nextMsgNum >= segP->maxMsgNum)
+       if (stateP->nextMsgNum >= max)
                stateP->signaled = false;
 
        LWLockRelease(SInvalReadLock);