]> granicus.if.org Git - postgresql/commitdiff
Fix race condition in multixact code: it's possible to try to read a
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 28 Oct 2005 17:27:29 +0000 (17:27 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 28 Oct 2005 17:27:29 +0000 (17:27 +0000)
multixact's starting offset before the offset has been stored into the
SLRU file.  A simple fix would be to hold the MultiXactGenLock until the
offset has been stored, but that looks like a big concurrency hit.  Instead
rely on knowledge that unset offsets will be zero, and loop when we see
a zero.  This requires a little extra hacking to ensure that zero is never
a valid value for the offset.  Problem reported by Matteo Beccati, fix
ideas from Martijn van Oosterhout, Alvaro Herrera, and Tom Lane.

src/backend/access/transam/multixact.c

index ffe14ed6bf1dd53fa960a884c7bdd317311401f5..fcc2546879b03baf86ca28e49af5c441cfdf968d 100644 (file)
@@ -42,7 +42,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/access/transam/multixact.c,v 1.9 2005/10/15 02:49:09 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/multixact.c,v 1.10 2005/10/28 17:27:29 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -631,7 +631,16 @@ CreateMultiXactId(int nxids, TransactionId *xids)
        }
 
        /*
-        * OK, assign the MXID and offsets range to use
+        * Critical section from here until we've written the data; we don't
+        * want to error out with a partly written MultiXact structure.
+        * (In particular, failing to write our start offset after advancing
+        * nextMXact would effectively corrupt the previous MultiXact.)
+        */
+       START_CRIT_SECTION();
+
+       /*
+        * Assign the MXID and offsets range to use, and make sure there is
+        * space in the OFFSETs and MEMBERs files.
         */
        multi = GetNewMultiXactId(nxids, &offset);
 
@@ -668,6 +677,9 @@ CreateMultiXactId(int nxids, TransactionId *xids)
        /* Now enter the information into the OFFSETs and MEMBERs logs */
        RecordNewMultiXact(multi, offset, nxids, xids);
 
+       /* Done with critical section */
+       END_CRIT_SECTION();
+
        /* Store the new MultiXactId in the local cache, too */
        mXactCachePut(multi, nxids, xids);
 
@@ -761,6 +773,7 @@ static MultiXactId
 GetNewMultiXactId(int nxids, MultiXactOffset *offset)
 {
        MultiXactId result;
+       MultiXactOffset nextOffset;
 
        debug_elog3(DEBUG2, "GetNew: for %d xids", nxids);
 
@@ -784,19 +797,28 @@ GetNewMultiXactId(int nxids, MultiXactOffset *offset)
         * Advance counter.  As in GetNewTransactionId(), this must not happen
         * until after ExtendMultiXactOffset has succeeded!
         *
-        * We don't care about MultiXactId wraparound here; it will be handled by the
-        * next iteration.      But note that nextMXact may be InvalidMultiXactId
+        * We don't care about MultiXactId wraparound here; it will be handled by
+        * the next iteration.  But note that nextMXact may be InvalidMultiXactId
         * after this routine exits, so anyone else looking at the variable must
         * be prepared to deal with that.
         */
        (MultiXactState->nextMXact)++;
 
        /*
-        * Reserve the members space.  Same considerations as above.
+        * Reserve the members space.  Same considerations as above.  Also, be
+        * careful not to return zero as the starting offset for any multixact.
+        * See GetMultiXactIdMembers() for motivation.
         */
-       *offset = MultiXactState->nextOffset;
+       nextOffset = MultiXactState->nextOffset;
+       if (nextOffset == 0)
+       {
+               *offset = 1;
+               nxids++;                                /* allocate member slot 0 too */
+       }
+       else
+               *offset = nextOffset;
 
-       ExtendMultiXactMember(*offset, nxids);
+       ExtendMultiXactMember(nextOffset, nxids);
 
        MultiXactState->nextOffset += nxids;
 
@@ -824,6 +846,7 @@ GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids)
        MultiXactOffset *offptr;
        MultiXactOffset offset;
        int                     length;
+       int                     truelength;
        int                     i;
        MultiXactId nextMXact;
        MultiXactId tmpMXact;
@@ -849,13 +872,13 @@ GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids)
        /*
         * We check known limits on MultiXact before resorting to the SLRU area.
         *
-        * An ID older than our OldestVisibleMXactId[] entry can't possibly still be
-        * running, and we'd run the risk of trying to read already-truncated SLRU
-        * data if we did try to examine it.
+        * An ID older than our OldestVisibleMXactId[] entry can't possibly still
+        * be running, and we'd run the risk of trying to read already-truncated
+        * SLRU data if we did try to examine it.
         *
-        * Conversely, an ID >= nextMXact shouldn't ever be seen here; if it is seen,
-        * it implies undetected ID wraparound has occurred.  We just silently
-        * assume that such an ID is no longer running.
+        * Conversely, an ID >= nextMXact shouldn't ever be seen here; if it is
+        * seen, it implies undetected ID wraparound has occurred.  We just
+        * silently assume that such an ID is no longer running.
         *
         * Shared lock is enough here since we aren't modifying any global state.
         * Also, we can examine our own OldestVisibleMXactId without the lock,
@@ -868,27 +891,58 @@ GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids)
                return -1;
        }
 
+       /*
+        * Acquire the shared lock just long enough to grab the current counter
+        * values.  We may need both nextMXact and nextOffset; see below.
+        */
        LWLockAcquire(MultiXactGenLock, LW_SHARED);
 
-       if (!MultiXactIdPrecedes(multi, MultiXactState->nextMXact))
+       nextMXact = MultiXactState->nextMXact;
+       nextOffset = MultiXactState->nextOffset;
+
+       LWLockRelease(MultiXactGenLock);
+
+       if (!MultiXactIdPrecedes(multi, nextMXact))
        {
-               LWLockRelease(MultiXactGenLock);
                debug_elog2(DEBUG2, "GetMembers: it's too new!");
                *xids = NULL;
                return -1;
        }
 
        /*
-        * Before releasing the lock, save the current counter values, because the
-        * target MultiXactId may be just one less than nextMXact.      We will need
-        * to use nextOffset as the endpoint if so.
+        * Find out the offset at which we need to start reading MultiXactMembers
+        * and the number of members in the multixact.  We determine the latter
+        * as the difference between this multixact's starting offset and the
+        * next one's.  However, there are some corner cases to worry about:
+        *
+        * 1. This multixact may be the latest one created, in which case there
+        * is no next one to look at.  In this case the nextOffset value we just
+        * saved is the correct endpoint.
+        *
+        * 2. The next multixact may still be in process of being filled in:
+        * that is, another process may have done GetNewMultiXactId but not yet
+        * written the offset entry for that ID.  In that scenario, it is
+        * guaranteed that the offset entry for that multixact exists (because
+        * GetNewMultiXactId won't release MultiXactGenLock until it does)
+        * but contains zero (because we are careful to pre-zero offset pages).
+        * Because GetNewMultiXactId will never return zero as the starting offset
+        * for a multixact, when we read zero as the next multixact's offset, we
+        * know we have this case.  We sleep for a bit and try again.
+        *
+        * 3. Because GetNewMultiXactId increments offset zero to offset one
+        * to handle case #2, there is an ambiguity near the point of offset
+        * wraparound.  If we see next multixact's offset is one, is that our
+        * multixact's actual endpoint, or did it end at zero with a subsequent
+        * increment?  We handle this using the knowledge that if the zero'th
+        * member slot wasn't filled, it'll contain zero, and zero isn't a valid
+        * transaction ID so it can't be a multixact member.  Therefore, if we
+        * read a zero from the members array, just ignore it.
+        *
+        * This is all pretty messy, but the mess occurs only in infrequent corner
+        * cases, so it seems better than holding the MultiXactGenLock for a long
+        * time on every multixact creation.
         */
-       nextMXact = MultiXactState->nextMXact;
-       nextOffset = MultiXactState->nextOffset;
-
-       LWLockRelease(MultiXactGenLock);
-
-       /* Get the offset at which we need to start reading MultiXactMembers */
+retry:
        LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
 
        pageno = MultiXactIdToOffsetPage(multi);
@@ -899,20 +953,23 @@ GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids)
        offptr += entryno;
        offset = *offptr;
 
+       Assert(offset != 0);
+
        /*
-        * How many members do we need to read?  If we are at the end of the
-        * assigned MultiXactIds, use the offset just saved above.      Else we need
-        * to check the MultiXactId following ours.
-        *
-        * Use the same increment rule as GetNewMultiXactId(), that is, don't handle
-        * wraparound explicitly until needed.
+        * Use the same increment rule as GetNewMultiXactId(), that is, don't
+        * handle wraparound explicitly until needed.
         */
        tmpMXact = multi + 1;
 
        if (nextMXact == tmpMXact)
+       {
+               /* Corner case 1: there is no next multixact */
                length = nextOffset - offset;
+       }
        else
        {
+               MultiXactOffset nextMXOffset;
+
                /* handle wraparound if needed */
                if (tmpMXact < FirstMultiXactId)
                        tmpMXact = FirstMultiXactId;
@@ -927,7 +984,17 @@ GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids)
 
                offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
                offptr += entryno;
-               length = *offptr - offset;
+               nextMXOffset = *offptr;
+
+               if (nextMXOffset == 0)
+               {
+                       /* Corner case 2: next multixact is still being filled in */
+                       LWLockRelease(MultiXactOffsetControlLock);
+                       pg_usleep(1000L);
+                       goto retry;
+               }
+
+               length = nextMXOffset - offset;
        }
 
        LWLockRelease(MultiXactOffsetControlLock);
@@ -938,6 +1005,7 @@ GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids)
        /* Now get the members themselves. */
        LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);
 
+       truelength = 0;
        prev_pageno = -1;
        for (i = 0; i < length; i++, offset++)
        {
@@ -956,7 +1024,14 @@ GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids)
                        MultiXactMemberCtl->shared->page_buffer[slotno];
                xactptr += entryno;
 
-               ptr[i] = *xactptr;
+               if (!TransactionIdIsValid(*xactptr))
+               {
+                       /* Corner case 3: we must be looking at unused slot zero */
+                       Assert(offset == 0);
+                       continue;
+               }
+
+               ptr[truelength++] = *xactptr;
        }
 
        LWLockRelease(MultiXactMemberControlLock);
@@ -964,11 +1039,11 @@ GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids)
        /*
         * Copy the result into the local cache.
         */
-       mXactCachePut(multi, length, ptr);
+       mXactCachePut(multi, truelength, ptr);
 
        debug_elog3(DEBUG2, "GetMembers: no cache for %s",
-                               mxid_to_string(multi, length, ptr));
-       return length;
+                               mxid_to_string(multi, truelength, ptr));
+       return truelength;
 }
 
 /*