1 /*-------------------------------------------------------------------------
4 * POSTGRES shared cache invalidation segment definitions.
6 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.67 2008/03/16 19:47:33 alvherre Exp $
13 *-------------------------------------------------------------------------
17 #include "miscadmin.h"
18 #include "storage/backendid.h"
19 #include "storage/ipc.h"
20 #include "storage/lwlock.h"
21 #include "storage/pmsignal.h"
22 #include "storage/proc.h"
23 #include "storage/shmem.h"
24 #include "storage/sinvaladt.h"
28 * Conceptually, the shared cache invalidation messages are stored in an
29 * infinite array, where maxMsgNum is the next array subscript to store a
30 * submitted message in, minMsgNum is the smallest array subscript containing a
31 * message not yet read by all backends, and we always have maxMsgNum >=
32 * minMsgNum. (They are equal when there are no messages pending.) For each
33 * active backend, there is a nextMsgNum pointer indicating the next message it
34 * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
37 * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
38 * entries. We translate MsgNum values into circular-buffer indexes by
39 * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
40 * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
41 * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
42 * in the buffer. If the buffer does overflow, we reset it to empty and
43 * force each backend to "reset", ie, discard all its invalidatable state.
45 * We would have problems if the MsgNum values overflow an integer, so
46 * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
47 * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
48 * large so that we don't need to do this often. It must be a multiple of
49 * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
50 * to be moved when we do it.
55 * Configurable parameters.
57 * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
58 * Must be a power of 2 for speed.
60 * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
61 * Must be a multiple of MAXNUMMESSAGES. Should be large.
64 #define MAXNUMMESSAGES 4096
65 #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
68 /* Shared cache invalidation memory segment */
72 * General state information
74 int minMsgNum; /* oldest message still needed */
75 int maxMsgNum; /* next message number to be assigned */
76 int lastBackend; /* index of last active procState entry, +1 */
77 int maxBackends; /* size of procState array */
78 int freeBackends; /* number of empty procState slots */
81 * Next LocalTransactionId to use for each idle backend slot. We keep
82 * this here because it is indexed by BackendId and it is convenient to
83 * copy the value to and from local memory when MyBackendId is set.
85 LocalTransactionId *nextLXID; /* array of maxBackends entries */
88 * Circular buffer holding shared-inval messages
90 SharedInvalidationMessage buffer[MAXNUMMESSAGES];
93 * Per-backend state info.
95 * We declare procState as 1 entry because C wants a fixed-size array, but
96 * actually it is maxBackends entries long.
98 ProcState procState[1]; /* reflects the invalidation state */
101 static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
104 static LocalTransactionId nextLocalTransactionId;
106 static void CleanupInvalidationState(int status, Datum arg);
107 static void SISetProcStateInvalid(SISeg *segP);
111 * SInvalShmemSize --- return shared-memory space needed
114 SInvalShmemSize(void)
118 size = offsetof(SISeg, procState);
119 size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
121 size = add_size(size, mul_size(sizeof(LocalTransactionId), MaxBackends));
127 * SharedInvalBufferInit
128 * Create and initialize the SI message buffer
131 CreateSharedInvalidationState(void)
137 /* Allocate space in shared memory */
138 size = offsetof(SISeg, procState);
139 size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
141 shmInvalBuffer = (SISeg *)
142 ShmemInitStruct("shmInvalBuffer", size, &found);
146 shmInvalBuffer->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
148 /* Clear message counters, save size of procState array */
149 shmInvalBuffer->minMsgNum = 0;
150 shmInvalBuffer->maxMsgNum = 0;
151 shmInvalBuffer->lastBackend = 0;
152 shmInvalBuffer->maxBackends = MaxBackends;
153 shmInvalBuffer->freeBackends = MaxBackends;
155 /* The buffer[] array is initially all unused, so we need not fill it */
157 /* Mark all backends inactive, and initialize nextLXID */
158 for (i = 0; i < shmInvalBuffer->maxBackends; i++)
160 shmInvalBuffer->procState[i].nextMsgNum = -1; /* inactive */
161 shmInvalBuffer->procState[i].resetState = false;
162 shmInvalBuffer->nextLXID[i] = InvalidLocalTransactionId;
167 * SharedInvalBackendInit
168 * Initialize a new backend to operate on the sinval buffer
171 SharedInvalBackendInit(void)
174 ProcState *stateP = NULL;
175 SISeg *segP = shmInvalBuffer;
177 LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
179 /* Look for a free entry in the procState array */
180 for (index = 0; index < segP->lastBackend; index++)
182 if (segP->procState[index].nextMsgNum < 0) /* inactive slot? */
184 stateP = &segP->procState[index];
191 if (segP->lastBackend < segP->maxBackends)
193 stateP = &segP->procState[segP->lastBackend];
194 Assert(stateP->nextMsgNum < 0);
200 * out of procState slots: MaxBackends exceeded -- report normally
202 MyBackendId = InvalidBackendId;
203 LWLockRelease(SInvalLock);
205 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
206 errmsg("sorry, too many clients already")));
210 MyBackendId = (stateP - &segP->procState[0]) + 1;
213 elog(DEBUG2, "my backend id is %d", MyBackendId);
214 #endif /* INVALIDDEBUG */
216 /* Advertise assigned backend ID in MyProc */
217 MyProc->backendId = MyBackendId;
219 /* Reduce free slot count */
220 segP->freeBackends--;
222 /* Fetch next local transaction ID into local memory */
223 nextLocalTransactionId = segP->nextLXID[MyBackendId - 1];
225 /* mark myself active, with all extant messages already read */
226 stateP->nextMsgNum = segP->maxMsgNum;
227 stateP->resetState = false;
229 LWLockRelease(SInvalLock);
231 /* register exit routine to mark my entry inactive at exit */
232 on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
236 * CleanupInvalidationState
237 * Mark the current backend as no longer active.
239 * This function is called via on_shmem_exit() during backend shutdown,
240 * so the caller has NOT acquired the lock for us.
242 * arg is really of type "SISeg*".
245 CleanupInvalidationState(int status, Datum arg)
247 SISeg *segP = (SISeg *) DatumGetPointer(arg);
250 Assert(PointerIsValid(segP));
252 LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
254 /* Update next local transaction ID for next holder of this backendID */
255 segP->nextLXID[MyBackendId - 1] = nextLocalTransactionId;
257 /* Mark myself inactive */
258 segP->procState[MyBackendId - 1].nextMsgNum = -1;
259 segP->procState[MyBackendId - 1].resetState = false;
261 /* Recompute index of last active backend */
262 for (i = segP->lastBackend; i > 0; i--)
264 if (segP->procState[i - 1].nextMsgNum >= 0)
267 segP->lastBackend = i;
269 /* Adjust free slot count */
270 segP->freeBackends++;
272 LWLockRelease(SInvalLock);
277 * Add a new invalidation message to the buffer.
279 * If we are unable to insert the message because the buffer is full,
280 * then clear the buffer and assert the "reset" flag to each backend.
281 * This will cause all the backends to discard *all* invalidatable state.
283 * Returns true for normal successful insertion, false if had to reset.
286 SIInsertDataEntry(SharedInvalidationMessage *data)
289 bool signal_postmaster = false;
292 LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
294 segP = shmInvalBuffer;
295 numMsgs = segP->maxMsgNum - segP->minMsgNum;
297 /* Is the buffer full? */
298 if (numMsgs >= MAXNUMMESSAGES)
301 * Don't panic just yet: slowest backend might have consumed some
302 * messages but not yet have done SIDelExpiredDataEntries() to advance
303 * minMsgNum. So, make sure minMsgNum is up-to-date.
305 SIDelExpiredDataEntries(true);
306 numMsgs = segP->maxMsgNum - segP->minMsgNum;
307 if (numMsgs >= MAXNUMMESSAGES)
309 /* Yup, it's definitely full, no choice but to reset */
310 SISetProcStateInvalid(segP);
311 LWLockRelease(SInvalLock);
317 * Try to prevent table overflow. When the table is 70% full send a
318 * WAKEN_CHILDREN request to the postmaster. The postmaster will send a
319 * SIGUSR1 signal to all the backends, which will cause sinval.c to read
320 * any pending SI entries.
322 * This should never happen if all the backends are actively executing
323 * queries, but if a backend is sitting idle then it won't be starting
324 * transactions and so won't be reading SI entries.
326 if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
329 elog(DEBUG4, "SI table is 70%% full, signaling postmaster");
330 signal_postmaster = true;
334 * Insert new message into proper slot of circular buffer
336 segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
339 LWLockRelease(SInvalLock);
341 if (signal_postmaster)
342 SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
348 * SISetProcStateInvalid
349 * Flush pending messages from buffer, assert reset flag for each backend
351 * This is used only to recover from SI buffer overflow.
354 SISetProcStateInvalid(SISeg *segP)
361 for (i = 0; i < segP->lastBackend; i++)
363 if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
365 segP->procState[i].resetState = true;
366 segP->procState[i].nextMsgNum = 0;
373 * get next SI message for specified backend, if there is one
375 * Possible return values:
376 * 0: no SI message available
377 * 1: next SI message has been extracted into *data
378 * (there may be more messages available after this one!)
379 * -1: SI reset message extracted
381 * NB: this can run in parallel with other instances of SIGetDataEntry
382 * executing on behalf of other backends, since each instance will modify only
383 * fields of its own backend's ProcState, and no instance will look at fields
384 * of other backends' ProcStates. We express this by grabbing SInvalLock in
385 * shared mode. Note that this is not exactly the normal (read-only)
386 * interpretation of a shared lock! Look closely at the interactions before
387 * allowing SInvalLock to be grabbed in shared mode for any other reason!
390 SIGetDataEntry(int backendId, SharedInvalidationMessage *data)
395 LWLockAcquire(SInvalLock, LW_SHARED);
397 segP = shmInvalBuffer;
398 stateP = &segP->procState[backendId - 1];
400 if (stateP->resetState)
403 * Force reset. We can say we have dealt with any messages added
404 * since the reset, as well...
406 stateP->resetState = false;
407 stateP->nextMsgNum = segP->maxMsgNum;
408 LWLockRelease(SInvalLock);
412 if (stateP->nextMsgNum >= segP->maxMsgNum)
414 LWLockRelease(SInvalLock);
415 return 0; /* nothing to read */
419 * Retrieve message and advance my counter.
421 *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
422 stateP->nextMsgNum++;
425 * There may be other backends that haven't read the message, so we cannot
426 * delete it here. SIDelExpiredDataEntries() should be called to remove
430 LWLockRelease(SInvalLock);
431 return 1; /* got a message */
435 * SIDelExpiredDataEntries
436 * Remove messages that have been consumed by all active backends
439 SIDelExpiredDataEntries(bool locked)
441 SISeg *segP = shmInvalBuffer;
447 LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
449 min = segP->maxMsgNum;
450 if (min == segP->minMsgNum)
453 LWLockRelease(SInvalLock);
454 return; /* fast path if no messages exist */
457 /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
459 for (i = 0; i < segP->lastBackend; i++)
461 h = segP->procState[i].nextMsgNum;
463 { /* backend active */
468 segP->minMsgNum = min;
471 * When minMsgNum gets really large, decrement all message counters so as
472 * to forestall overflow of the counters.
474 if (min >= MSGNUMWRAPAROUND)
476 segP->minMsgNum -= MSGNUMWRAPAROUND;
477 segP->maxMsgNum -= MSGNUMWRAPAROUND;
478 for (i = 0; i < segP->lastBackend; i++)
480 if (segP->procState[i].nextMsgNum >= 0)
481 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
486 LWLockRelease(SInvalLock);
491 * GetNextLocalTransactionId --- allocate a new LocalTransactionId
493 * We split VirtualTransactionIds into two parts so that it is possible
494 * to allocate a new one without any contention for shared memory, except
495 * for a bit of additional overhead during backend startup/shutdown.
496 * The high-order part of a VirtualTransactionId is a BackendId, and the
497 * low-order part is a LocalTransactionId, which we assign from a local
498 * counter. To avoid the risk of a VirtualTransactionId being reused
499 * within a short interval, successive procs occupying the same backend ID
500 * slot should use a consecutive sequence of local IDs, which is implemented
501 * by copying nextLocalTransactionId as seen above.
504 GetNextLocalTransactionId(void)
506 LocalTransactionId result;
508 /* loop to avoid returning InvalidLocalTransactionId at wraparound */
511 result = nextLocalTransactionId++;
512 } while (!LocalTransactionIdIsValid(result));