*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.83 2008/01/01 19:45:51 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.84 2008/03/16 19:47:33 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
static void ProcessCatchupEvent(void);
-/****************************************************************************/
-/* CreateSharedInvalidationState() Initialize SI buffer */
-/* */
-/* should be called only by the POSTMASTER */
-/****************************************************************************/
-void
-CreateSharedInvalidationState(void)
-{
- /* SInvalLock must be initialized already, during LWLock init */
- SIBufferInit();
-}
-
-/*
- * InitBackendSharedInvalidationState
- * Initialize new backend's state info in buffer segment.
- */
-void
-InitBackendSharedInvalidationState(void)
-{
- int flag;
-
- LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
- flag = SIBackendInit(shmInvalBuffer);
- LWLockRelease(SInvalLock);
- if (flag < 0) /* unexpected problem */
- elog(FATAL, "shared cache invalidation initialization failed");
- if (flag == 0) /* expected problem: MaxBackends exceeded */
- ereport(FATAL,
- (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
- errmsg("sorry, too many clients already")));
-}
-
/*
* SendSharedInvalidMessage
* Add a shared-cache-invalidation message to the global SI message queue.
{
bool insertOK;
- LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
- insertOK = SIInsertDataEntry(shmInvalBuffer, msg);
- LWLockRelease(SInvalLock);
+ insertOK = SIInsertDataEntry(msg);
if (!insertOK)
elog(DEBUG4, "SI buffer overflow");
}
*/
catchupInterruptOccurred = 0;
- /*
- * We can run SIGetDataEntry in parallel with other backends running
- * SIGetDataEntry for themselves, since each instance will modify only
- * fields of its own backend's ProcState, and no instance will look at
- * fields of other backends' ProcStates. We express this by grabbing
- * SInvalLock in shared mode. Note that this is not exactly the
- * normal (read-only) interpretation of a shared lock! Look closely at
- * the interactions before allowing SInvalLock to be grabbed in shared
- * mode for any other reason!
- */
- LWLockAcquire(SInvalLock, LW_SHARED);
- getResult = SIGetDataEntry(shmInvalBuffer, MyBackendId, &data);
- LWLockRelease(SInvalLock);
+ getResult = SIGetDataEntry(MyBackendId, &data);
if (getResult == 0)
break; /* nothing more to do */
/* If we got any messages, try to release dead messages */
if (gotMessage)
- {
- LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
- SIDelExpiredDataEntries(shmInvalBuffer);
- LWLockRelease(SInvalLock);
- }
+ SIDelExpiredDataEntries(false);
}
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.66 2008/01/01 19:45:51 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.67 2008/03/16 19:47:33 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
#include "storage/sinvaladt.h"
-SISeg *shmInvalBuffer;
+/*
+ * Conceptually, the shared cache invalidation messages are stored in an
+ * infinite array, where maxMsgNum is the next array subscript to store a
+ * submitted message in, minMsgNum is the smallest array subscript containing a
+ * message not yet read by all backends, and we always have maxMsgNum >=
+ * minMsgNum. (They are equal when there are no messages pending.) For each
+ * active backend, there is a nextMsgNum pointer indicating the next message it
+ * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
+ * backend.
+ *
+ * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
+ * entries. We translate MsgNum values into circular-buffer indexes by
+ * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
+ * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
+ * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
+ * in the buffer. If the buffer does overflow, we reset it to empty and
+ * force each backend to "reset", ie, discard all its invalidatable state.
+ *
+ * We would have problems if the MsgNum values overflow an integer, so
+ * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
+ * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
+ * large so that we don't need to do this often. It must be a multiple of
+ * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
+ * to be moved when we do it.
+ */
+
+
+/*
+ * Configurable parameters.
+ *
+ * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
+ * Must be a power of 2 for speed.
+ *
+ * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
+ * Must be a multiple of MAXNUMMESSAGES. Should be large.
+ */
+
+#define MAXNUMMESSAGES 4096
+#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
+
+
+/* Shared cache invalidation memory segment */
+typedef struct SISeg
+{
+ /*
+ * General state information
+ */
+ int minMsgNum; /* oldest message still needed */
+ int maxMsgNum; /* next message number to be assigned */
+ int lastBackend; /* index of last active procState entry, +1 */
+ int maxBackends; /* size of procState array */
+ int freeBackends; /* number of empty procState slots */
+
+ /*
+ * Next LocalTransactionId to use for each idle backend slot. We keep
+ * this here because it is indexed by BackendId and it is convenient to
+ * copy the value to and from local memory when MyBackendId is set.
+ */
+ LocalTransactionId *nextLXID; /* array of maxBackends entries */
+
+ /*
+ * Circular buffer holding shared-inval messages
+ */
+ SharedInvalidationMessage buffer[MAXNUMMESSAGES];
+
+ /*
+ * Per-backend state info.
+ *
+ * We declare procState as 1 entry because C wants a fixed-size array, but
+ * actually it is maxBackends entries long.
+ */
+ ProcState procState[1]; /* reflects the invalidation state */
+} SISeg;
+
+static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
+
static LocalTransactionId nextLocalTransactionId;
}
/*
- * SIBufferInit
- * Create and initialize a new SI message buffer
+ * SharedInvalBufferInit
+ * Create and initialize the SI message buffer
*/
void
-SIBufferInit(void)
+CreateSharedInvalidationState(void)
{
- SISeg *segP;
Size size;
int i;
bool found;
size = offsetof(SISeg, procState);
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
- shmInvalBuffer = segP = (SISeg *)
+ shmInvalBuffer = (SISeg *)
ShmemInitStruct("shmInvalBuffer", size, &found);
if (found)
return;
- segP->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
+ shmInvalBuffer->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
/* Clear message counters, save size of procState array */
- segP->minMsgNum = 0;
- segP->maxMsgNum = 0;
- segP->lastBackend = 0;
- segP->maxBackends = MaxBackends;
- segP->freeBackends = MaxBackends;
+ shmInvalBuffer->minMsgNum = 0;
+ shmInvalBuffer->maxMsgNum = 0;
+ shmInvalBuffer->lastBackend = 0;
+ shmInvalBuffer->maxBackends = MaxBackends;
+ shmInvalBuffer->freeBackends = MaxBackends;
/* The buffer[] array is initially all unused, so we need not fill it */
/* Mark all backends inactive, and initialize nextLXID */
- for (i = 0; i < segP->maxBackends; i++)
+ for (i = 0; i < shmInvalBuffer->maxBackends; i++)
{
- segP->procState[i].nextMsgNum = -1; /* inactive */
- segP->procState[i].resetState = false;
- segP->nextLXID[i] = InvalidLocalTransactionId;
+ shmInvalBuffer->procState[i].nextMsgNum = -1; /* inactive */
+ shmInvalBuffer->procState[i].resetState = false;
+ shmInvalBuffer->nextLXID[i] = InvalidLocalTransactionId;
}
}
/*
- * SIBackendInit
+ * SharedInvalBackendInit
* Initialize a new backend to operate on the sinval buffer
- *
- * Returns:
- * >0 A-OK
- * 0 Failed to find a free procState slot (ie, MaxBackends exceeded)
- * <0 Some other failure (not currently used)
- *
- * NB: this routine, and all following ones, must be executed with the
- * SInvalLock lock held, since there may be multiple backends trying
- * to access the buffer.
*/
-int
-SIBackendInit(SISeg *segP)
+void
+SharedInvalBackendInit(void)
{
int index;
ProcState *stateP = NULL;
+ SISeg *segP = shmInvalBuffer;
+
+ LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
/* Look for a free entry in the procState array */
for (index = 0; index < segP->lastBackend; index++)
}
else
{
- /* out of procState slots */
+ /*
+ * out of procState slots: MaxBackends exceeded -- report normally
+ */
MyBackendId = InvalidBackendId;
- return 0;
+ LWLockRelease(SInvalLock);
+ ereport(FATAL,
+ (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
+ errmsg("sorry, too many clients already")));
}
}
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
+ LWLockRelease(SInvalLock);
+
/* register exit routine to mark my entry inactive at exit */
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
-
- return 1;
}
/*
* Returns true for normal successful insertion, false if had to reset.
*/
bool
-SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
+SIInsertDataEntry(SharedInvalidationMessage *data)
{
- int numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ int numMsgs;
+ bool signal_postmaster = false;
+ SISeg *segP;
+
+ LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+
+ segP = shmInvalBuffer;
+ numMsgs = segP->maxMsgNum - segP->minMsgNum;
/* Is the buffer full? */
if (numMsgs >= MAXNUMMESSAGES)
* messages but not yet have done SIDelExpiredDataEntries() to advance
* minMsgNum. So, make sure minMsgNum is up-to-date.
*/
- SIDelExpiredDataEntries(segP);
+ SIDelExpiredDataEntries(true);
numMsgs = segP->maxMsgNum - segP->minMsgNum;
if (numMsgs >= MAXNUMMESSAGES)
{
/* Yup, it's definitely full, no choice but to reset */
SISetProcStateInvalid(segP);
+ LWLockRelease(SInvalLock);
return false;
}
}
IsUnderPostmaster)
{
elog(DEBUG4, "SI table is 70%% full, signaling postmaster");
- SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
+ signal_postmaster = true;
}
/*
segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
segP->maxMsgNum++;
+ LWLockRelease(SInvalLock);
+
+ if (signal_postmaster)
+ SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
+
return true;
}
* -1: SI reset message extracted
*
* NB: this can run in parallel with other instances of SIGetDataEntry
- * executing on behalf of other backends. See comments in sinval.c in
- * ReceiveSharedInvalidMessages().
+ * executing on behalf of other backends, since each instance will modify only
+ * fields of its own backend's ProcState, and no instance will look at fields
+ * of other backends' ProcStates. We express this by grabbing SInvalLock in
+ * shared mode. Note that this is not exactly the normal (read-only)
+ * interpretation of a shared lock! Look closely at the interactions before
+ * allowing SInvalLock to be grabbed in shared mode for any other reason!
*/
int
-SIGetDataEntry(SISeg *segP, int backendId,
- SharedInvalidationMessage *data)
+SIGetDataEntry(int backendId, SharedInvalidationMessage *data)
{
- ProcState *stateP = &segP->procState[backendId - 1];
+ ProcState *stateP;
+ SISeg *segP;
+
+ LWLockAcquire(SInvalLock, LW_SHARED);
+
+ segP = shmInvalBuffer;
+ stateP = &segP->procState[backendId - 1];
if (stateP->resetState)
{
*/
stateP->resetState = false;
stateP->nextMsgNum = segP->maxMsgNum;
+ LWLockRelease(SInvalLock);
return -1;
}
if (stateP->nextMsgNum >= segP->maxMsgNum)
+ {
+ LWLockRelease(SInvalLock);
return 0; /* nothing to read */
+ }
/*
* Retrieve message and advance my counter.
* delete it here. SIDelExpiredDataEntries() should be called to remove
* dead messages.
*/
+
+ LWLockRelease(SInvalLock);
return 1; /* got a message */
}
* Remove messages that have been consumed by all active backends
*/
void
-SIDelExpiredDataEntries(SISeg *segP)
+SIDelExpiredDataEntries(bool locked)
{
+ SISeg *segP = shmInvalBuffer;
int min,
i,
h;
+ if (!locked)
+ LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+
min = segP->maxMsgNum;
if (min == segP->minMsgNum)
+ {
+ if (!locked)
+ LWLockRelease(SInvalLock);
return; /* fast path if no messages exist */
+ }
/* Recompute minMsgNum = minimum of all backends' nextMsgNum */
segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
}
}
+
+ if (!locked)
+ LWLockRelease(SInvalLock);
}
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.45 2008/01/01 19:45:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.46 2008/03/16 19:47:34 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
#include "storage/sinval.h"
-
/*
* The shared cache invalidation manager is responsible for transmitting
* invalidation messages between backends. Any message sent by any backend
* must be delivered to all already-running backends before it can be
* forgotten.
- *
- * Conceptually, the messages are stored in an infinite array, where
- * maxMsgNum is the next array subscript to store a submitted message in,
- * minMsgNum is the smallest array subscript containing a message not yet
- * read by all backends, and we always have maxMsgNum >= minMsgNum. (They
- * are equal when there are no messages pending.) For each active backend,
- * there is a nextMsgNum pointer indicating the next message it needs to read;
- * we have maxMsgNum >= nextMsgNum >= minMsgNum for every backend.
- *
- * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
- * entries. We translate MsgNum values into circular-buffer indexes by
- * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
- * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
- * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
- * in the buffer. If the buffer does overflow, we reset it to empty and
- * force each backend to "reset", ie, discard all its invalidatable state.
- *
- * We would have problems if the MsgNum values overflow an integer, so
- * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
- * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
- * large so that we don't need to do this often. It must be a multiple of
- * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
- * to be moved when we do it.
- *
+ *
* The struct type SharedInvalidationMessage, defining the contents of
* a single message, is defined in sinval.h.
*/
-
-/*
- * Configurable parameters.
- *
- * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
- * Must be a power of 2 for speed.
- *
- * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
- * Must be a multiple of MAXNUMMESSAGES. Should be large.
- */
-
-#define MAXNUMMESSAGES 4096
-#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
-
-
/* Per-backend state in shared invalidation structure */
typedef struct ProcState
{
bool resetState; /* true, if backend has to reset its state */
} ProcState;
-/* Shared cache invalidation memory segment */
-typedef struct SISeg
-{
- /*
- * General state information
- */
- int minMsgNum; /* oldest message still needed */
- int maxMsgNum; /* next message number to be assigned */
- int lastBackend; /* index of last active procState entry, +1 */
- int maxBackends; /* size of procState array */
- int freeBackends; /* number of empty procState slots */
-
- /*
- * Next LocalTransactionId to use for each idle backend slot. We keep
- * this here because it is indexed by BackendId and it is convenient to
- * copy the value to and from local memory when MyBackendId is set.
- */
- LocalTransactionId *nextLXID; /* array of maxBackends entries */
-
- /*
- * Circular buffer holding shared-inval messages
- */
- SharedInvalidationMessage buffer[MAXNUMMESSAGES];
-
- /*
- * Per-backend state info.
- *
- * We declare procState as 1 entry because C wants a fixed-size array, but
- * actually it is maxBackends entries long.
- */
- ProcState procState[1]; /* reflects the invalidation state */
-} SISeg;
-
-
-extern SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
-
/*
* prototypes for functions in sinvaladt.c
*/
-extern void SIBufferInit(void);
-extern int SIBackendInit(SISeg *segP);
+extern Size SInvalShmemSize(void);
+extern void CreateSharedInvalidationState(void);
+extern void SharedInvalBackendInit(void);
-extern bool SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data);
-extern int SIGetDataEntry(SISeg *segP, int backendId,
- SharedInvalidationMessage *data);
-extern void SIDelExpiredDataEntries(SISeg *segP);
+extern bool SIInsertDataEntry(SharedInvalidationMessage *data);
+extern int SIGetDataEntry(int backendId, SharedInvalidationMessage *data);
+extern void SIDelExpiredDataEntries(bool locked);
extern LocalTransactionId GetNextLocalTransactionId(void);