]> granicus.if.org Git - postgresql/commitdiff
Rewrite the sinval messaging mechanism to reduce contention and avoid
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 19 Jun 2008 21:32:56 +0000 (21:32 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 19 Jun 2008 21:32:56 +0000 (21:32 +0000)
unnecessary cache resets.  The major changes are:

* When the queue overflows, we only issue a cache reset to the specific
backend or backends that still haven't read the oldest message, rather
than resetting everyone as in the original coding.

* When we observe backend(s) falling well behind, we signal SIGUSR1
to only one backend, the one that is furthest behind and doesn't already
have a signal outstanding for it.  When it finishes catching up, it will
in turn signal SIGUSR1 to the next-furthest-back guy, if there is one that
is far enough behind to justify a signal.  The PMSIGNAL_WAKEN_CHILDREN
mechanism is removed.

* We don't attempt to clean out dead messages after every message-receipt
operation; rather, we do it on the insertion side, and only when the queue
fullness passes certain thresholds.

* Split SInvalLock into SInvalReadLock and SInvalWriteLock so that readers
don't block writers nor vice versa (except during the infrequent queue
cleanout operations).

* Transfer multiple sinval messages for each acquisition of a read or
write lock.

src/backend/postmaster/postmaster.c
src/backend/storage/ipc/sinval.c
src/backend/storage/ipc/sinvaladt.c
src/backend/utils/cache/inval.c
src/include/storage/lwlock.h
src/include/storage/pmsignal.h
src/include/storage/sinval.h
src/include/storage/sinvaladt.h

index 751bb8244d383d46ce3dd5366d9906dea395c8ca..73d6dae56d8f7ea9674bc8235c01132e1aee05c3 100644 (file)
@@ -37,7 +37,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.558 2008/06/06 22:35:22 alvherre Exp $
+ *       $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.559 2008/06/19 21:32:56 tgl Exp $
  *
  * NOTES
  *
@@ -3829,16 +3829,6 @@ sigusr1_handler(SIGNAL_ARGS)
                load_role();
        }
 
-       if (CheckPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN))
-       {
-               /*
-                * Send SIGUSR1 to all children (triggers CatchupInterruptHandler).
-                * See storage/ipc/sinval[adt].c for the use of this.
-                */
-               if (Shutdown <= SmartShutdown)
-                       SignalChildren(SIGUSR1);
-       }
-
        if (CheckPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER) &&
                PgArchPID != 0)
        {
index 4b8a8f1afbdc487131c8738a1648f1b5c2aa340f..e2c6ca2aec9b976105015a6ed34bbe7b09bcd1df 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.85 2008/03/17 11:50:26 alvherre Exp $
+ *       $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.86 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -17,9 +17,7 @@
 #include "access/xact.h"
 #include "commands/async.h"
 #include "miscadmin.h"
-#include "storage/backendid.h"
 #include "storage/ipc.h"
-#include "storage/proc.h"
 #include "storage/sinvaladt.h"
 #include "utils/inval.h"
 
@@ -27,9 +25,9 @@
 /*
  * Because backends sitting idle will not be reading sinval events, we
  * need a way to give an idle backend a swift kick in the rear and make
- * it catch up before the sinval queue overflows and forces everyone
- * through a cache reset exercise.     This is done by broadcasting SIGUSR1
- * to all backends when the queue is threatening to become full.
+ * it catch up before the sinval queue overflows and forces it to go
+ * through a cache reset exercise.     This is done by sending SIGUSR1
+ * to any backend that gets too far behind.
  *
  * State for catchup events consists of two flags: one saying whether
  * the signal handler is currently allowed to call ProcessCatchupEvent
@@ -47,67 +45,101 @@ static void ProcessCatchupEvent(void);
 
 
 /*
- * SendSharedInvalidMessage
- *     Add a shared-cache-invalidation message to the global SI message queue.
+ * SendSharedInvalidMessages
+ *     Add shared-cache-invalidation message(s) to the global SI message queue.
  */
 void
-SendSharedInvalidMessage(SharedInvalidationMessage *msg)
+SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
 {
-       bool            insertOK;
-
-       insertOK = SIInsertDataEntry(msg);
-       if (!insertOK)
-               elog(DEBUG4, "SI buffer overflow");
+       SIInsertDataEntries(msgs, n);
 }
 
 /*
  * ReceiveSharedInvalidMessages
  *             Process shared-cache-invalidation messages waiting for this backend
  *
+ * We guarantee to process all messages that had been queued before the
+ * routine was entered.  It is of course possible for more messages to get
+ * queued right after our last SIGetDataEntries call.
+ *
  * NOTE: it is entirely possible for this routine to be invoked recursively
  * as a consequence of processing inside the invalFunction or resetFunction.
- * Hence, we must be holding no SI resources when we call them.  The only
- * bad side-effect is that SIDelExpiredDataEntries might be called extra
- * times on the way out of a nested call.
+ * Furthermore, such a recursive call must guarantee that all outstanding
+ * inval messages have been processed before it exits.  This is the reason
+ * for the strange-looking choice to use a statically allocated buffer array
+ * and counters; it's so that a recursive call can process messages already
+ * sucked out of sinvaladt.c.
  */
 void
 ReceiveSharedInvalidMessages(
                                          void (*invalFunction) (SharedInvalidationMessage *msg),
                                                         void (*resetFunction) (void))
 {
-       SharedInvalidationMessage data;
-       int                     getResult;
-       bool            gotMessage = false;
+#define MAXINVALMSGS 32
+       static SharedInvalidationMessage messages[MAXINVALMSGS];
+       /*
+        * We use volatile here to prevent bugs if a compiler doesn't realize
+        * that recursion is a possibility ...
+        */
+       static volatile int nextmsg = 0;
+       static volatile int nummsgs = 0;
 
-       for (;;)
+       /* Deal with any messages still pending from an outer recursion */
+       while (nextmsg < nummsgs)
        {
-               /*
-                * We can discard any pending catchup event, since we will not exit
-                * this loop until we're fully caught up.
-                */
-               catchupInterruptOccurred = 0;
+               SharedInvalidationMessage *msg = &messages[nextmsg++];
 
-               getResult = SIGetDataEntry(MyBackendId, &data);
+               invalFunction(msg);
+       }
+
+       do
+       {
+               int                     getResult;
+
+               nextmsg = nummsgs = 0;
+
+               /* Try to get some more messages */
+               getResult = SIGetDataEntries(messages, MAXINVALMSGS);
 
-               if (getResult == 0)
-                       break;                          /* nothing more to do */
                if (getResult < 0)
                {
                        /* got a reset message */
                        elog(DEBUG4, "cache state reset");
                        resetFunction();
+                       break;                          /* nothing more to do */
                }
-               else
+
+               /* Process them, being wary that a recursive call might eat some */
+               nextmsg = 0;
+               nummsgs = getResult;
+
+               while (nextmsg < nummsgs)
                {
-                       /* got a normal data message */
-                       invalFunction(&data);
+                       SharedInvalidationMessage *msg = &messages[nextmsg++];
+
+                       invalFunction(msg);
                }
-               gotMessage = true;
-       }
 
-       /* If we got any messages, try to release dead messages */
-       if (gotMessage)
-               SIDelExpiredDataEntries(false);
+               /*
+                * We only need to loop if the last SIGetDataEntries call (which
+                * might have been within a recursive call) returned a full buffer.
+                */
+       } while (nummsgs == MAXINVALMSGS);
+
+       /*
+        * We are now caught up.  If we received a catchup signal, reset that
+        * flag, and call SICleanupQueue().  This is not so much because we
+        * need to flush dead messages right now, as that we want to pass on
+        * the catchup signal to the next slowest backend.  "Daisy chaining" the
+        * catchup signal this way avoids creating spikes in system load for
+        * what should be just a background maintenance activity.
+        */
+       if (catchupInterruptOccurred)
+       {
+               catchupInterruptOccurred = 0;
+               elog(DEBUG4, "sinval catchup complete, cleaning queue");
+               SICleanupQueue(false, 0);
+       }
 }
 
 
index ddbc08ef55f608eebf6d2146a82c9e30291a4eb2..0befc4a93419a526d857219c7a80dc60a38e3436 100644 (file)
@@ -1,24 +1,25 @@
 /*-------------------------------------------------------------------------
  *
  * sinvaladt.c
- *       POSTGRES shared cache invalidation segment definitions.
+ *       POSTGRES shared cache invalidation data manager.
  *
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.70 2008/06/17 20:07:08 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.71 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
+#include <signal.h>
+#include <unistd.h>
+
 #include "miscadmin.h"
 #include "storage/backendid.h"
 #include "storage/ipc.h"
-#include "storage/lwlock.h"
-#include "storage/pmsignal.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
 #include "storage/sinvaladt.h"
 /*
  * Conceptually, the shared cache invalidation messages are stored in an
  * infinite array, where maxMsgNum is the next array subscript to store a
- * submitted message in, minMsgNum is the smallest array subscript containing a
- * message not yet read by all backends, and we always have maxMsgNum >=
+ * submitted message in, minMsgNum is the smallest array subscript containing
+ * message not yet read by all backends, and we always have maxMsgNum >=
  * minMsgNum.  (They are equal when there are no messages pending.)  For each
  * active backend, there is a nextMsgNum pointer indicating the next message it
  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
  * backend.
  *
+ * (In the current implementation, minMsgNum is a lower bound for the
+ * per-process nextMsgNum values, but it isn't rigorously kept equal to the
+ * smallest nextMsgNum --- it may lag behind.  We only update it when
+ * SICleanupQueue is called, and we try not to do that often.)
+ *
  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
  * entries.  We translate MsgNum values into circular-buffer indexes by
  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
  * MAXNUMMESSAGES is a constant and a power of 2).     As long as maxMsgNum
  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
- * in the buffer.  If the buffer does overflow, we reset it to empty and
- * force each backend to "reset", ie, discard all its invalidatable state.
+ * in the buffer.  If the buffer does overflow, we recover by setting the
+ * "reset" flag for each backend that has fallen too far behind.  A backend
+ * that is in "reset" state is ignored while determining minMsgNum.  When
+ * it does finally attempt to receive inval messages, it must discard all
+ * its invalidatable state, since it won't know what it missed.
+ *
+ * To reduce the probability of needing resets, we send a "catchup" interrupt
+ * to any backend that seems to be falling unreasonably far behind.  The
+ * normal behavior is that at most one such interrupt is in flight at a time;
+ * when a backend completes processing a catchup interrupt, it executes
+ * SICleanupQueue, which will signal the next-furthest-behind backend if
+ * needed.  This avoids undue contention from multiple backends all trying
+ * to catch up at once.  However, the furthest-back backend might be stuck
+ * in a state where it can't catch up.  Eventually it will get reset, so it
+ * won't cause any more problems for anyone but itself.  But we don't want
+ * to find that a bunch of other backends are now too close to the reset
+ * threshold to be saved.  So SICleanupQueue is designed to occasionally
+ * send extra catchup interrupts as the queue gets fuller, to backends that
+ * are far behind and haven't gotten one yet.  As long as there aren't a lot
+ * of "stuck" backends, we won't need a lot of extra interrupts, since ones
+ * that aren't stuck will propagate their interrupts to the next guy.
  *
  * We would have problems if the MsgNum values overflow an integer, so
  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
  * large so that we don't need to do this often.  It must be a multiple of
  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
  * to be moved when we do it.
+ *
+ * Access to the shared sinval array is protected by two locks, SInvalReadLock
+ * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
+ * authorizes them to modify their own ProcState but not to modify or even
+ * look at anyone else's.  When we need to perform array-wide updates,
+ * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
+ * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
+ * mode) to serialize adding messages to the queue.  Note that a writer
+ * can operate in parallel with one or more readers, because the writer
+ * has no need to touch anyone's ProcState, except in the infrequent cases
+ * when SICleanupQueue is needed.  The only point of overlap is that
+ * the writer might change maxMsgNum while readers are looking at it.
+ * This should be okay: we are assuming that fetching or storing an int
+ * is atomic, an assumption also made elsewhere in Postgres.  However
+ * readers mustn't assume that maxMsgNum isn't changing under them.
  */
 
 
  *
  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
+ *
+ * CLEANUP_MIN: the minimum number of messages that must be in the buffer
+ * before we bother to call SICleanupQueue.
+ *
+ * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
+ * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
+ *
+ * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
+ * behind before we'll send it SIGUSR1.
+ *
+ * WRITE_QUANTUM: the max number of messages to push into the buffer per
+ * iteration of SIInsertDataEntries.  Noncritical but should be less than
+ * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
+ * per iteration.
  */
 
 #define MAXNUMMESSAGES 4096
-#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
+#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
+#define CLEANUP_MIN (MAXNUMMESSAGES / 2)
+#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
+#define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
+#define WRITE_QUANTUM 64
 
 /* Per-backend state in shared invalidation structure */
 typedef struct ProcState
 {
-       /* nextMsgNum is -1 in an inactive ProcState array entry. */
-       int                     nextMsgNum;             /* next message number to read, or -1 */
-       bool            resetState;             /* true, if backend has to reset its state */
+       /* procPid is zero in an inactive ProcState array entry. */
+       pid_t           procPid;                /* PID of backend, for signaling */
+       /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
+       int                     nextMsgNum;             /* next message number to read */
+       bool            resetState;             /* backend needs to reset its state */
+       bool            signaled;               /* backend has been sent catchup signal */
+
+       /*
+        * Next LocalTransactionId to use for each idle backend slot.  We keep
+        * this here because it is indexed by BackendId and it is convenient to
+        * copy the value to and from local memory when MyBackendId is set.
+        * It's meaningless in an active ProcState entry.
+        */
+       LocalTransactionId nextLXID;
 } ProcState;
 
 /* Shared cache invalidation memory segment */
@@ -80,16 +149,10 @@ typedef struct SISeg
         */
        int                     minMsgNum;              /* oldest message still needed */
        int                     maxMsgNum;              /* next message number to be assigned */
+       int                     nextThreshold;  /* # of messages to call SICleanupQueue */
        int                     lastBackend;    /* index of last active procState entry, +1 */
        int                     maxBackends;    /* size of procState array */
 
-       /*
-        * Next LocalTransactionId to use for each idle backend slot.  We keep
-        * this here because it is indexed by BackendId and it is convenient to
-        * copy the value to and from local memory when MyBackendId is set.
-        */
-       LocalTransactionId *nextLXID;           /* array of maxBackends entries */
-
        /*
         * Circular buffer holding shared-inval messages
         */
@@ -110,7 +173,6 @@ static SISeg *shmInvalBuffer;       /* pointer to the shared inval buffer */
 static LocalTransactionId nextLocalTransactionId;
 
 static void CleanupInvalidationState(int status, Datum arg);
-static void SISetProcStateInvalid(SISeg *segP);
 
 
 /*
@@ -124,8 +186,6 @@ SInvalShmemSize(void)
        size = offsetof(SISeg, procState);
        size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
 
-       size = add_size(size, mul_size(sizeof(LocalTransactionId), MaxBackends));
-
        return size;
 }
 
@@ -149,11 +209,10 @@ CreateSharedInvalidationState(void)
        if (found)
                return;
 
-       shmInvalBuffer->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
-
        /* Clear message counters, save size of procState array */
        shmInvalBuffer->minMsgNum = 0;
        shmInvalBuffer->maxMsgNum = 0;
+       shmInvalBuffer->nextThreshold = CLEANUP_MIN;
        shmInvalBuffer->lastBackend = 0;
        shmInvalBuffer->maxBackends = MaxBackends;
 
@@ -162,9 +221,11 @@ CreateSharedInvalidationState(void)
        /* Mark all backends inactive, and initialize nextLXID */
        for (i = 0; i < shmInvalBuffer->maxBackends; i++)
        {
-               shmInvalBuffer->procState[i].nextMsgNum = -1;           /* inactive */
+               shmInvalBuffer->procState[i].procPid = 0;                       /* inactive */
+               shmInvalBuffer->procState[i].nextMsgNum = 0;            /* meaningless */
                shmInvalBuffer->procState[i].resetState = false;
-               shmInvalBuffer->nextLXID[i] = InvalidLocalTransactionId;
+               shmInvalBuffer->procState[i].signaled = false;
+               shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
        }
 }
 
@@ -179,12 +240,19 @@ SharedInvalBackendInit(void)
        ProcState  *stateP = NULL;
        SISeg      *segP = shmInvalBuffer;
 
-       LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+       /*
+        * This can run in parallel with read operations, and for that matter
+        * with write operations; but not in parallel with additions and removals
+        * of backends, nor in parallel with SICleanupQueue.  It doesn't seem
+        * worth having a third lock, so we choose to use SInvalWriteLock to
+        * serialize additions/removals.
+        */
+       LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
 
        /* Look for a free entry in the procState array */
        for (index = 0; index < segP->lastBackend; index++)
        {
-               if (segP->procState[index].nextMsgNum < 0)              /* inactive slot? */
+               if (segP->procState[index].procPid == 0)                /* inactive slot? */
                {
                        stateP = &segP->procState[index];
                        break;
@@ -196,7 +264,7 @@ SharedInvalBackendInit(void)
                if (segP->lastBackend < segP->maxBackends)
                {
                        stateP = &segP->procState[segP->lastBackend];
-                       Assert(stateP->nextMsgNum < 0);
+                       Assert(stateP->procPid == 0);
                        segP->lastBackend++;
                }
                else
@@ -205,7 +273,7 @@ SharedInvalBackendInit(void)
                         * out of procState slots: MaxBackends exceeded -- report normally
                         */
                        MyBackendId = InvalidBackendId;
-                       LWLockRelease(SInvalLock);
+                       LWLockRelease(SInvalWriteLock);
                        ereport(FATAL,
                                        (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
                                         errmsg("sorry, too many clients already")));
@@ -214,21 +282,21 @@ SharedInvalBackendInit(void)
 
        MyBackendId = (stateP - &segP->procState[0]) + 1;
 
-#ifdef INVALIDDEBUG
-       elog(DEBUG2, "my backend id is %d", MyBackendId);
-#endif   /* INVALIDDEBUG */
+       elog(DEBUG4, "my backend id is %d", MyBackendId);
 
        /* Advertise assigned backend ID in MyProc */
        MyProc->backendId = MyBackendId;
 
        /* Fetch next local transaction ID into local memory */
-       nextLocalTransactionId = segP->nextLXID[MyBackendId - 1];
+       nextLocalTransactionId = stateP->nextLXID;
 
        /* mark myself active, with all extant messages already read */
+       stateP->procPid = MyProcPid;
        stateP->nextMsgNum = segP->maxMsgNum;
        stateP->resetState = false;
+       stateP->signaled = false;
 
-       LWLockRelease(SInvalLock);
+       LWLockRelease(SInvalWriteLock);
 
        /* register exit routine to mark my entry inactive at exit */
        on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
@@ -238,8 +306,7 @@ SharedInvalBackendInit(void)
  * CleanupInvalidationState
  *             Mark the current backend as no longer active.
  *
- * This function is called via on_shmem_exit() during backend shutdown,
- * so the caller has NOT acquired the lock for us.
+ * This function is called via on_shmem_exit() during backend shutdown.
  *
  * arg is really of type "SISeg*".
  */
@@ -247,227 +314,247 @@ static void
 CleanupInvalidationState(int status, Datum arg)
 {
        SISeg      *segP = (SISeg *) DatumGetPointer(arg);
+       ProcState  *stateP;
        int                     i;
 
        Assert(PointerIsValid(segP));
 
-       LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+       LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+
+       stateP = &segP->procState[MyBackendId - 1];
 
        /* Update next local transaction ID for next holder of this backendID */
-       segP->nextLXID[MyBackendId - 1] = nextLocalTransactionId;
+       stateP->nextLXID = nextLocalTransactionId;
 
        /* Mark myself inactive */
-       segP->procState[MyBackendId - 1].nextMsgNum = -1;
-       segP->procState[MyBackendId - 1].resetState = false;
+       stateP->procPid = 0;
+       stateP->nextMsgNum = 0;
+       stateP->resetState = false;
+       stateP->signaled = false;
 
        /* Recompute index of last active backend */
        for (i = segP->lastBackend; i > 0; i--)
        {
-               if (segP->procState[i - 1].nextMsgNum >= 0)
+               if (segP->procState[i - 1].procPid != 0)
                        break;
        }
        segP->lastBackend = i;
 
-       LWLockRelease(SInvalLock);
+       LWLockRelease(SInvalWriteLock);
 }
 
 /*
- * SIInsertDataEntry
- *             Add a new invalidation message to the buffer.
- *
- * If we are unable to insert the message because the buffer is full,
- * then clear the buffer and assert the "reset" flag to each backend.
- * This will cause all the backends to discard *all* invalidatable state.
- *
- * Returns true for normal successful insertion, false if had to reset.
+ * SIInsertDataEntries
+ *             Add new invalidation message(s) to the buffer.
  */
-bool
-SIInsertDataEntry(SharedInvalidationMessage *data)
+void
+SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
 {
-       int                     numMsgs;
-       bool            signal_postmaster = false;
-       SISeg      *segP;
+       SISeg      *segP = shmInvalBuffer;
+
+       /*
+        * N can be arbitrarily large.  We divide the work into groups of no more
+        * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
+        * an unreasonably long time.  (This is not so much because we care about
+        * letting in other writers, as that some just-caught-up backend might be
+        * trying to do SICleanupQueue to pass on its signal, and we don't want it
+        * to have to wait a long time.)  Also, we need to consider calling
+        * SICleanupQueue every so often.
+        */
+       while (n > 0)
+       {
+               int             nthistime = Min(n, WRITE_QUANTUM);
+               int             numMsgs;
 
-       LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+               n -= nthistime;
 
-       segP = shmInvalBuffer;
-       numMsgs = segP->maxMsgNum - segP->minMsgNum;
+               LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
 
-       /* Is the buffer full? */
-       if (numMsgs >= MAXNUMMESSAGES)
-       {
                /*
-                * Don't panic just yet: slowest backend might have consumed some
-                * messages but not yet have done SIDelExpiredDataEntries() to advance
-                * minMsgNum.  So, make sure minMsgNum is up-to-date.
+                * If the buffer is full, we *must* acquire some space.  Clean the
+                * queue and reset anyone who is preventing space from being freed.
+                * Otherwise, clean the queue only when it's exceeded the next
+                * fullness threshold.
                 */
-               SIDelExpiredDataEntries(true);
                numMsgs = segP->maxMsgNum - segP->minMsgNum;
-               if (numMsgs >= MAXNUMMESSAGES)
+               if (numMsgs + nthistime > MAXNUMMESSAGES)
                {
-                       /* Yup, it's definitely full, no choice but to reset */
-                       SISetProcStateInvalid(segP);
-                       LWLockRelease(SInvalLock);
-                       return false;
+                       SICleanupQueue(true, nthistime);
+                       Assert((segP->maxMsgNum - segP->minMsgNum + nthistime) <= MAXNUMMESSAGES);
                }
-       }
-
-       /*
-        * Try to prevent table overflow.  When the table is 70% full send a
-        * WAKEN_CHILDREN request to the postmaster.  The postmaster will send a
-        * SIGUSR1 signal to all the backends, which will cause sinval.c to read
-        * any pending SI entries.
-        *
-        * This should never happen if all the backends are actively executing
-        * queries, but if a backend is sitting idle then it won't be starting
-        * transactions and so won't be reading SI entries.
-        */
-       if (numMsgs == (MAXNUMMESSAGES * 70 / 100) && IsUnderPostmaster)
-               signal_postmaster = true;
-
-       /*
-        * Insert new message into proper slot of circular buffer
-        */
-       segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
-       segP->maxMsgNum++;
-
-       LWLockRelease(SInvalLock);
-
-       if (signal_postmaster)
-       {
-               elog(DEBUG4, "SI table is 70%% full, signaling postmaster");
-               SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
-       }
-
-       return true;
-}
-
-/*
- * SISetProcStateInvalid
- *             Flush pending messages from buffer, assert reset flag for each backend
- *
- * This is used only to recover from SI buffer overflow.
- */
-static void
-SISetProcStateInvalid(SISeg *segP)
-{
-       int                     i;
-
-       segP->minMsgNum = 0;
-       segP->maxMsgNum = 0;
+               else if (numMsgs >= segP->nextThreshold)
+                       SICleanupQueue(true, 0);
 
-       for (i = 0; i < segP->lastBackend; i++)
-       {
-               if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
+               /*
+                * Insert new message(s) into proper slot of circular buffer
+                */
+               while (nthistime-- > 0)
                {
-                       segP->procState[i].resetState = true;
-                       segP->procState[i].nextMsgNum = 0;
+                       segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data++;
+                       segP->maxMsgNum++;
                }
+
+               LWLockRelease(SInvalWriteLock);
        }
 }
 
 /*
- * SIGetDataEntry
- *             get next SI message for specified backend, if there is one
+ * SIGetDataEntries
+ *             get next SI message(s) for current backend, if there are any
  *
  * Possible return values:
- *     0: no SI message available
- *     1: next SI message has been extracted into *data
- *             (there may be more messages available after this one!)
- * -1: SI reset message extracted
+ *     0:   no SI message available
+ *     n>0: next n SI messages have been extracted into data[]
+ * -1:   SI reset message extracted
+ *
+ * If the return value is less than the array size "datasize", the caller
+ * can assume that there are no more SI messages after the one(s) returned.
+ * Otherwise, another call is needed to collect more messages.
  *
- * NB: this can run in parallel with other instances of SIGetDataEntry
+ * NB: this can run in parallel with other instances of SIGetDataEntries
  * executing on behalf of other backends, since each instance will modify only
  * fields of its own backend's ProcState, and no instance will look at fields
- * of other backends' ProcStates.  We express this by grabbing SInvalLock in
- * shared mode.  Note that this is not exactly the normal (read-only)
+ * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
+ * in shared mode.  Note that this is not exactly the normal (read-only)
  * interpretation of a shared lock! Look closely at the interactions before
- * allowing SInvalLock to be grabbed in shared mode for any other reason!
+ * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
+ *
+ * NB: this can also run in parallel with SIInsertDataEntries.  It is not
+ * guaranteed that we will return any messages added after the routine is
+ * entered.
+ *
+ * Note: we assume that "datasize" is not so large that it might be important
+ * to break our hold on SInvalReadLock into segments.
  */
 int
-SIGetDataEntry(int backendId, SharedInvalidationMessage *data)
+SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
 {
-       ProcState  *stateP;
        SISeg      *segP;
+       ProcState  *stateP;
+       int                     n;
        
-       LWLockAcquire(SInvalLock, LW_SHARED);
+       LWLockAcquire(SInvalReadLock, LW_SHARED);
 
        segP = shmInvalBuffer;
-       stateP = &segP->procState[backendId - 1];
+       stateP = &segP->procState[MyBackendId - 1];
 
        if (stateP->resetState)
        {
                /*
                 * Force reset.  We can say we have dealt with any messages added
-                * since the reset, as well...
+                * since the reset, as well; and that means we should clear the
+                * signaled flag, too.
                 */
-               stateP->resetState = false;
                stateP->nextMsgNum = segP->maxMsgNum;
-               LWLockRelease(SInvalLock);
+               stateP->resetState = false;
+               stateP->signaled = false;
+               LWLockRelease(SInvalReadLock);
                return -1;
        }
 
-       if (stateP->nextMsgNum >= segP->maxMsgNum)
-       {
-               LWLockRelease(SInvalLock);
-               return 0;                               /* nothing to read */
-       }
-
        /*
-        * Retrieve message and advance my counter.
+        * Retrieve messages and advance backend's counter, until data array is
+        * full or there are no more messages.
+        *
+        * There may be other backends that haven't read the message(s), so we
+        * cannot delete them here.  SICleanupQueue() will eventually remove them
+        * from the queue.
+        *
+        * Note: depending on the compiler, we might read maxMsgNum only once
+        * here, or each time through the loop.  It doesn't matter (as long as
+        * each fetch is atomic).
         */
-       *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
-       stateP->nextMsgNum++;
+       n = 0;
+       while (n < datasize && stateP->nextMsgNum < segP->maxMsgNum)
+       {
+               data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
+               stateP->nextMsgNum++;
+       }
 
        /*
-        * There may be other backends that haven't read the message, so we cannot
-        * delete it here. SIDelExpiredDataEntries() should be called to remove
-        * dead messages.
+        * Reset our "signaled" flag whenever we have caught up completely.
         */
+       if (stateP->nextMsgNum >= segP->maxMsgNum)
+               stateP->signaled = false;
 
-       LWLockRelease(SInvalLock);
-       return 1;                                       /* got a message */
+       LWLockRelease(SInvalReadLock);
+       return n;
 }
 
 /*
- * SIDelExpiredDataEntries
+ * SICleanupQueue
  *             Remove messages that have been consumed by all active backends
+ *
+ * callerHasWriteLock is TRUE if caller is holding SInvalWriteLock.
+ * minFree is the minimum number of free message slots required at completion.
+ *
+ * Possible side effects of this routine include marking one or more
+ * backends as "reset" in the array, and sending a catchup interrupt (SIGUSR1)
+ * to some backend that seems to be getting too far behind.  We signal at
+ * most one backend at a time, for reasons explained at the top of the file.
  */
 void
-SIDelExpiredDataEntries(bool locked)
+SICleanupQueue(bool callerHasWriteLock, int minFree)
 {
        SISeg      *segP = shmInvalBuffer;
        int                     min,
-                               i,
-                               h;
+                               minsig,
+                               lowbound,
+                               numMsgs,
+                               i;
+       ProcState  *needSig = NULL;
 
-       if (!locked)
-               LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+       /* Lock out all writers and readers */
+       if (!callerHasWriteLock)
+               LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+       LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
 
+       /*
+        * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify
+        * the furthest-back backend that needs signaling (if any), and reset
+        * any backends that are too far back.
+        */
        min = segP->maxMsgNum;
-       if (min == segP->minMsgNum)
-       {
-               if (!locked)
-                       LWLockRelease(SInvalLock);
-               return;                                 /* fast path if no messages exist */
-       }
-
-       /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
+       minsig = min - SIG_THRESHOLD;
+       lowbound = min - MAXNUMMESSAGES + minFree;
 
        for (i = 0; i < segP->lastBackend; i++)
        {
-               h = segP->procState[i].nextMsgNum;
-               if (h >= 0)
-               {                                               /* backend active */
-                       if (h < min)
-                               min = h;
+               ProcState  *stateP = &segP->procState[i];
+               int             n = stateP->nextMsgNum;
+
+               /* Ignore if inactive or already in reset state */
+               if (stateP->procPid == 0 || stateP->resetState)
+                       continue;
+
+               /*
+                * If we must free some space and this backend is preventing it,
+                * force him into reset state and then ignore until he catches up.
+                */
+               if (n < lowbound)
+               {
+                       stateP->resetState = true;
+                       /* no point in signaling him ... */
+                       continue;
+               }
+
+               /* Track the global minimum nextMsgNum */
+               if (n < min)
+                       min = n;
+
+               /* Also see who's furthest back of the unsignaled backends */
+               if (n < minsig && !stateP->signaled)
+               {
+                       minsig = n;
+                       needSig = stateP;
                }
        }
        segP->minMsgNum = min;
 
        /*
         * When minMsgNum gets really large, decrement all message counters so as
-        * to forestall overflow of the counters.
+        * to forestall overflow of the counters.  This happens seldom enough
+        * that folding it into the previous loop would be a loser.
         */
        if (min >= MSGNUMWRAPAROUND)
        {
@@ -475,13 +562,43 @@ SIDelExpiredDataEntries(bool locked)
                segP->maxMsgNum -= MSGNUMWRAPAROUND;
                for (i = 0; i < segP->lastBackend; i++)
                {
-                       if (segP->procState[i].nextMsgNum >= 0)
-                               segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
+                       /* we don't bother skipping inactive entries here */
+                       segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
                }
        }
 
-       if (!locked)
-               LWLockRelease(SInvalLock);
+       /*
+        * Determine how many messages are still in the queue, and set the
+        * threshold at which we should repeat SICleanupQueue().
+        */
+       numMsgs = segP->maxMsgNum - segP->minMsgNum;
+       if (numMsgs < CLEANUP_MIN)
+               segP->nextThreshold = CLEANUP_MIN;
+       else
+               segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
+
+       /*
+        * Lastly, signal anyone who needs a catchup interrupt.  Since kill()
+        * might not be fast, we don't want to hold locks while executing it.
+        */
+       if (needSig)
+       {
+               pid_t   his_pid = needSig->procPid;
+
+               needSig->signaled = true;
+               LWLockRelease(SInvalReadLock);
+               LWLockRelease(SInvalWriteLock);
+               elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
+               kill(his_pid, SIGUSR1);
+               if (callerHasWriteLock)
+                       LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+       }
+       else
+       {
+               LWLockRelease(SInvalReadLock);
+               if (!callerHasWriteLock)
+                       LWLockRelease(SInvalWriteLock);
+       }
 }
 
 
index 50e27923566493f32df7cc2ff0d600f1f16e0cad..050d7cc88de2a8eb4afff32ceaff49f9f0a5fdec 100644 (file)
@@ -80,7 +80,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.85 2008/06/19 00:46:05 alvherre Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.86 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -203,7 +203,7 @@ AddInvalidationMessage(InvalidationChunk **listHdr,
        if (chunk == NULL)
        {
                /* First time through; create initial chunk */
-#define FIRSTCHUNKSIZE 16
+#define FIRSTCHUNKSIZE 32
                chunk = (InvalidationChunk *)
                        MemoryContextAlloc(CurTransactionContext,
                                                           sizeof(InvalidationChunk) +
@@ -275,6 +275,23 @@ AppendInvalidationMessageList(InvalidationChunk **destHdr,
                } \
        } while (0)
 
+/*
+ * Process a list of invalidation messages group-wise.
+ *
+ * As above, but the code fragment can handle an array of messages.
+ * The fragment should refer to the messages as msgs[], with n entries.
+ */
+#define ProcessMessageListMulti(listHdr, codeFragment) \
+       do { \
+               InvalidationChunk *_chunk; \
+               for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
+               { \
+                       SharedInvalidationMessage *msgs = _chunk->msgs; \
+                       int             n = _chunk->nitems; \
+                       codeFragment; \
+               } \
+       } while (0)
+
 
 /* ----------------------------------------------------------------
  *                             Invalidation set support functions
@@ -371,6 +388,18 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr,
        ProcessMessageList(hdr->rclist, func(msg));
 }
 
+/*
+ * As above, but the function is able to process an array of messages
+ * rather than just one at a time.
+ */
+static void
+ProcessInvalidationMessagesMulti(InvalidationListHeader *hdr,
+                                                                void (*func) (const SharedInvalidationMessage *msgs, int n))
+{
+       ProcessMessageListMulti(hdr->cclist, func(msgs, n));
+       ProcessMessageListMulti(hdr->rclist, func(msgs, n));
+}
+
 /* ----------------------------------------------------------------
  *                                       private support functions
  * ----------------------------------------------------------------
@@ -792,7 +821,7 @@ inval_twophase_postcommit(TransactionId xid, uint16 info,
                case TWOPHASE_INFO_MSG:
                        msg = (SharedInvalidationMessage *) recdata;
                        Assert(len == sizeof(SharedInvalidationMessage));
-                       SendSharedInvalidMessage(msg);
+                       SendSharedInvalidMessages(msg, 1);
                        break;
                case TWOPHASE_INFO_FILE_BEFORE:
                        RelationCacheInitFileInvalidate(true);
@@ -850,8 +879,8 @@ AtEOXact_Inval(bool isCommit)
                AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
                                                                   &transInvalInfo->CurrentCmdInvalidMsgs);
 
-               ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
-                                                                       SendSharedInvalidMessage);
+               ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs,
+                                                                                SendSharedInvalidMessages);
 
                if (transInvalInfo->RelcacheInitFileInval)
                        RelationCacheInitFileInvalidate(false);
@@ -1033,8 +1062,8 @@ EndNonTransactionalInvalidation(void)
        /* Send out the invals */
        ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
                                                                LocalExecuteInvalidationMessage);
-       ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
-                                                               SendSharedInvalidMessage);
+       ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+                                                                        SendSharedInvalidMessages);
 
        /* Clean up and release memory */
        for (chunk = transInvalInfo->CurrentCmdInvalidMsgs.cclist;
index baccfbf5a68233670124516a291c8040cd9da3da..b1088fcd33d985883408a362bf6f8d07eb3782a3 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.38 2008/01/01 19:45:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.39 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -43,7 +43,8 @@ typedef enum LWLockId
        OidGenLock,
        XidGenLock,
        ProcArrayLock,
-       SInvalLock,
+       SInvalReadLock,
+       SInvalWriteLock,
        FreeSpaceLock,
        WALInsertLock,
        WALWriteLock,
index c02593e5a86914d67bbafbaa6b7d57219a82367f..94f1770ffce7a471826326ce0437b6b02ce91425 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.19 2008/01/01 19:45:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.20 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -23,7 +23,6 @@
 typedef enum
 {
        PMSIGNAL_PASSWORD_CHANGE,       /* pg_auth file has changed */
-       PMSIGNAL_WAKEN_CHILDREN,        /* send a SIGUSR1 signal to all backends */
        PMSIGNAL_WAKEN_ARCHIVER,        /* send a NOTIFY signal to xlog archiver */
        PMSIGNAL_ROTATE_LOGFILE,        /* send SIGUSR1 to syslogger to rotate logfile */
        PMSIGNAL_START_AUTOVAC_LAUNCHER,        /* start an autovacuum launcher */
index 343c8d94bdb4c84e645e9d36154e21083caf86a9..3601216f1b61052af7f9cde57bf60a7a3cdb591f 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/sinval.h,v 1.47 2008/03/16 19:47:34 alvherre Exp $
+ * $PostgreSQL: pgsql/src/include/storage/sinval.h,v 1.48 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -83,7 +83,8 @@ typedef union
 } SharedInvalidationMessage;
 
 
-extern void SendSharedInvalidMessage(SharedInvalidationMessage *msg);
+extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs,
+                                                                         int n);
 extern void ReceiveSharedInvalidMessages(
                                          void (*invalFunction) (SharedInvalidationMessage *msg),
                                                         void (*resetFunction) (void));
index 8535cba0f065350bc3badd2e969dd1e0e61f68d2..1748f8821b42d64233eafd9c182af63ed4e35ba3 100644 (file)
@@ -1,12 +1,13 @@
 /*-------------------------------------------------------------------------
  *
  * sinvaladt.h
- *       POSTGRES shared cache invalidation segment definitions.
+ *       POSTGRES shared cache invalidation data manager.
  *
  * The shared cache invalidation manager is responsible for transmitting
  * invalidation messages between backends.     Any message sent by any backend
  * must be delivered to all already-running backends before it can be
- * forgotten.
+ * forgotten.  (If we run out of space, we instead deliver a "RESET"
+ * message to backends that have fallen too far behind.)
  * 
  * The struct type SharedInvalidationMessage, defining the contents of
  * a single message, is defined in sinval.h.
@@ -14,7 +15,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.47 2008/03/17 11:50:27 alvherre Exp $
+ * $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.48 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -23,7 +24,6 @@
 
 #include "storage/sinval.h"
 
-
 /*
  * prototypes for functions in sinvaladt.c
  */
@@ -31,9 +31,9 @@ extern Size SInvalShmemSize(void);
 extern void CreateSharedInvalidationState(void);
 extern void SharedInvalBackendInit(void);
 
-extern bool SIInsertDataEntry(SharedInvalidationMessage *data);
-extern int SIGetDataEntry(int backendId, SharedInvalidationMessage *data);
-extern void SIDelExpiredDataEntries(bool locked);
+extern void SIInsertDataEntries(const SharedInvalidationMessage *data, int n);
+extern int SIGetDataEntries(SharedInvalidationMessage *data, int datasize);
+extern void SICleanupQueue(bool callerHasWriteLock, int minFree);
 
 extern LocalTransactionId GetNextLocalTransactionId(void);