1 /*-------------------------------------------------------------------------
4 * POSTGRES shared cache invalidation segment definitions.
6 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.43 2001/11/05 17:46:28 momjian Exp $
13 *-------------------------------------------------------------------------
17 #include "miscadmin.h"
18 #include "storage/backendid.h"
19 #include "storage/pmsignal.h"
20 #include "storage/proc.h"
21 #include "storage/sinvaladt.h"
23 SISeg *shmInvalBuffer;
25 static void CleanupInvalidationState(int status, Datum arg);
26 static void SISetProcStateInvalid(SISeg *segP);
30 * SInvalShmemSize --- return shared-memory space needed
33 SInvalShmemSize(int maxBackends)
36 * Figure space needed. Note sizeof(SISeg) includes the first
39 return sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
44 * Create and initialize a new SI message buffer
47 SIBufferInit(int maxBackends)
53 /* Allocate space in shared memory */
54 segSize = SInvalShmemSize(maxBackends);
55 shmInvalBuffer = segP = (SISeg *) ShmemAlloc(segSize);
57 /* Clear message counters, save size of procState array */
60 segP->lastBackend = 0;
61 segP->maxBackends = maxBackends;
63 /* The buffer[] array is initially all unused, so we need not fill it */
65 /* Mark all backends inactive */
66 for (i = 0; i < maxBackends; i++)
68 segP->procState[i].nextMsgNum = -1; /* inactive */
69 segP->procState[i].resetState = false;
70 segP->procState[i].procStruct = INVALID_OFFSET;
76 * Initialize a new backend to operate on the sinval buffer
80 * 0 Failed to find a free procState slot (ie, MaxBackends exceeded)
81 * <0 Some other failure (not currently used)
83 * NB: this routine, and all following ones, must be executed with the
84 * SInvalLock lock held, since there may be multiple backends trying
85 * to access the buffer.
88 SIBackendInit(SISeg *segP)
91 ProcState *stateP = NULL;
93 /* Look for a free entry in the procState array */
94 for (index = 0; index < segP->lastBackend; index++)
96 if (segP->procState[index].nextMsgNum < 0) /* inactive slot? */
98 stateP = &segP->procState[index];
105 if (segP->lastBackend < segP->maxBackends)
107 stateP = &segP->procState[segP->lastBackend];
108 Assert(stateP->nextMsgNum < 0);
113 /* out of procState slots */
114 MyBackendId = InvalidBackendId;
119 MyBackendId = (stateP - &segP->procState[0]) + 1;
122 elog(DEBUG, "SIBackendInit: backend id %d", MyBackendId);
123 #endif /* INVALIDDEBUG */
125 /* mark myself active, with all extant messages already read */
126 stateP->nextMsgNum = segP->maxMsgNum;
127 stateP->resetState = false;
128 stateP->procStruct = MAKE_OFFSET(MyProc);
130 /* register exit routine to mark my entry inactive at exit */
131 on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
137 * CleanupInvalidationState
138 * Mark the current backend as no longer active.
140 * This function is called via on_shmem_exit() during backend shutdown,
141 * so the caller has NOT acquired the lock for us.
143 * arg is really of type "SISeg*".
146 CleanupInvalidationState(int status, Datum arg)
148 SISeg *segP = (SISeg *) DatumGetPointer(arg);
151 Assert(PointerIsValid(segP));
153 LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
155 /* Mark myself inactive */
156 segP->procState[MyBackendId - 1].nextMsgNum = -1;
157 segP->procState[MyBackendId - 1].resetState = false;
158 segP->procState[MyBackendId - 1].procStruct = INVALID_OFFSET;
160 /* Recompute index of last active backend */
161 for (i = segP->lastBackend; i > 0; i--)
163 if (segP->procState[i - 1].nextMsgNum >= 0)
166 segP->lastBackend = i;
168 LWLockRelease(SInvalLock);
173 * Add a new invalidation message to the buffer.
175 * If we are unable to insert the message because the buffer is full,
176 * then clear the buffer and assert the "reset" flag to each backend.
177 * This will cause all the backends to discard *all* invalidatable state.
179 * Returns true for normal successful insertion, false if had to reset.
182 SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
184 int numMsgs = segP->maxMsgNum - segP->minMsgNum;
186 /* Is the buffer full? */
187 if (numMsgs >= MAXNUMMESSAGES)
190 * Don't panic just yet: slowest backend might have consumed some
191 * messages but not yet have done SIDelExpiredDataEntries() to
192 * advance minMsgNum. So, make sure minMsgNum is up-to-date.
194 SIDelExpiredDataEntries(segP);
195 numMsgs = segP->maxMsgNum - segP->minMsgNum;
196 if (numMsgs >= MAXNUMMESSAGES)
198 /* Yup, it's definitely full, no choice but to reset */
199 SISetProcStateInvalid(segP);
205 * Try to prevent table overflow. When the table is 70% full send a
206 * WAKEN_CHILDREN request to the postmaster. The postmaster will send
207 * a SIGUSR2 signal (ordinarily a NOTIFY signal) to all the backends.
208 * This will force idle backends to execute a transaction to look
209 * through pg_listener for NOTIFY messages, and as a byproduct of the
210 * transaction start they will read SI entries.
212 * This should never happen if all the backends are actively executing
213 * queries, but if a backend is sitting idle then it won't be starting
214 * transactions and so won't be reading SI entries.
218 if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
222 elog(DEBUG, "SIInsertDataEntry: table is 70%% full, signaling postmaster");
223 SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
227 * Insert new message into proper slot of circular buffer
229 segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
236 * SISetProcStateInvalid
237 * Flush pending messages from buffer, assert reset flag for each backend
239 * This is used only to recover from SI buffer overflow.
242 SISetProcStateInvalid(SISeg *segP)
249 for (i = 0; i < segP->lastBackend; i++)
251 if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
253 segP->procState[i].resetState = true;
254 segP->procState[i].nextMsgNum = 0;
261 * get next SI message for specified backend, if there is one
263 * Possible return values:
264 * 0: no SI message available
265 * 1: next SI message has been extracted into *data
266 * (there may be more messages available after this one!)
267 * -1: SI reset message extracted
269 * NB: this can run in parallel with other instances of SIGetDataEntry
270 * executing on behalf of other backends. See comments in sinval.c in
271 * ReceiveSharedInvalidMessages().
274 SIGetDataEntry(SISeg *segP, int backendId,
275 SharedInvalidationMessage *data)
277 ProcState *stateP = &segP->procState[backendId - 1];
279 if (stateP->resetState)
282 * Force reset. We can say we have dealt with any messages added
283 * since the reset, as well...
285 stateP->resetState = false;
286 stateP->nextMsgNum = segP->maxMsgNum;
290 if (stateP->nextMsgNum >= segP->maxMsgNum)
291 return 0; /* nothing to read */
294 * Retrieve message and advance my counter.
296 *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
297 stateP->nextMsgNum++;
300 * There may be other backends that haven't read the message, so we
301 * cannot delete it here. SIDelExpiredDataEntries() should be called
302 * to remove dead messages.
304 return 1; /* got a message */
308 * SIDelExpiredDataEntries
309 * Remove messages that have been consumed by all active backends
312 SIDelExpiredDataEntries(SISeg *segP)
318 min = segP->maxMsgNum;
319 if (min == segP->minMsgNum)
320 return; /* fast path if no messages exist */
322 /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
324 for (i = 0; i < segP->lastBackend; i++)
326 h = segP->procState[i].nextMsgNum;
328 { /* backend active */
333 segP->minMsgNum = min;
336 * When minMsgNum gets really large, decrement all message counters so
337 * as to forestall overflow of the counters.
339 if (min >= MSGNUMWRAPAROUND)
341 segP->minMsgNum -= MSGNUMWRAPAROUND;
342 segP->maxMsgNum -= MSGNUMWRAPAROUND;
343 for (i = 0; i < segP->lastBackend; i++)
345 if (segP->procState[i].nextMsgNum >= 0)
346 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;