1 /*-------------------------------------------------------------------------
4 * POSTGRES shared cache invalidation segment definitions.
6 * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.49 2002/11/21 06:36:08 tgl Exp $
13 *-------------------------------------------------------------------------
17 #include "miscadmin.h"
18 #include "storage/backendid.h"
19 #include "storage/ipc.h"
20 #include "storage/pmsignal.h"
21 #include "storage/proc.h"
22 #include "storage/sinvaladt.h"
24 SISeg *shmInvalBuffer;
26 static void CleanupInvalidationState(int status, Datum arg);
27 static void SISetProcStateInvalid(SISeg *segP);
31 * SInvalShmemSize --- return shared-memory space needed
34 SInvalShmemSize(int maxBackends)
37 * Figure space needed. Note sizeof(SISeg) includes the first
40 return sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
45 * Create and initialize a new SI message buffer
48 SIBufferInit(int maxBackends)
54 /* Allocate space in shared memory */
55 segSize = SInvalShmemSize(maxBackends);
56 shmInvalBuffer = segP = (SISeg *) ShmemAlloc(segSize);
58 /* Clear message counters, save size of procState array */
61 segP->lastBackend = 0;
62 segP->maxBackends = maxBackends;
63 segP->freeBackends = maxBackends;
65 /* The buffer[] array is initially all unused, so we need not fill it */
67 /* Mark all backends inactive */
68 for (i = 0; i < maxBackends; i++)
70 segP->procState[i].nextMsgNum = -1; /* inactive */
71 segP->procState[i].resetState = false;
72 segP->procState[i].procStruct = INVALID_OFFSET;
78 * Initialize a new backend to operate on the sinval buffer
82 * 0 Failed to find a free procState slot (ie, MaxBackends exceeded)
83 * <0 Some other failure (not currently used)
85 * NB: this routine, and all following ones, must be executed with the
86 * SInvalLock lock held, since there may be multiple backends trying
87 * to access the buffer.
90 SIBackendInit(SISeg *segP)
93 ProcState *stateP = NULL;
95 /* Look for a free entry in the procState array */
96 for (index = 0; index < segP->lastBackend; index++)
98 if (segP->procState[index].nextMsgNum < 0) /* inactive slot? */
100 stateP = &segP->procState[index];
107 if (segP->lastBackend < segP->maxBackends)
109 stateP = &segP->procState[segP->lastBackend];
110 Assert(stateP->nextMsgNum < 0);
115 /* out of procState slots */
116 MyBackendId = InvalidBackendId;
121 MyBackendId = (stateP - &segP->procState[0]) + 1;
124 elog(DEBUG1, "SIBackendInit: backend id %d", MyBackendId);
125 #endif /* INVALIDDEBUG */
127 /* Reduce free slot count */
128 segP->freeBackends--;
130 /* mark myself active, with all extant messages already read */
131 stateP->nextMsgNum = segP->maxMsgNum;
132 stateP->resetState = false;
133 stateP->procStruct = MAKE_OFFSET(MyProc);
135 /* register exit routine to mark my entry inactive at exit */
136 on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
142 * CleanupInvalidationState
143 * Mark the current backend as no longer active.
145 * This function is called via on_shmem_exit() during backend shutdown,
146 * so the caller has NOT acquired the lock for us.
148 * arg is really of type "SISeg*".
151 CleanupInvalidationState(int status, Datum arg)
153 SISeg *segP = (SISeg *) DatumGetPointer(arg);
156 Assert(PointerIsValid(segP));
158 LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
160 /* Mark myself inactive */
161 segP->procState[MyBackendId - 1].nextMsgNum = -1;
162 segP->procState[MyBackendId - 1].resetState = false;
163 segP->procState[MyBackendId - 1].procStruct = INVALID_OFFSET;
165 /* Recompute index of last active backend */
166 for (i = segP->lastBackend; i > 0; i--)
168 if (segP->procState[i - 1].nextMsgNum >= 0)
171 segP->lastBackend = i;
173 /* Adjust free slot count */
174 segP->freeBackends++;
176 LWLockRelease(SInvalLock);
181 * Add a new invalidation message to the buffer.
183 * If we are unable to insert the message because the buffer is full,
184 * then clear the buffer and assert the "reset" flag to each backend.
185 * This will cause all the backends to discard *all* invalidatable state.
187 * Returns true for normal successful insertion, false if had to reset.
190 SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
192 int numMsgs = segP->maxMsgNum - segP->minMsgNum;
194 /* Is the buffer full? */
195 if (numMsgs >= MAXNUMMESSAGES)
198 * Don't panic just yet: slowest backend might have consumed some
199 * messages but not yet have done SIDelExpiredDataEntries() to
200 * advance minMsgNum. So, make sure minMsgNum is up-to-date.
202 SIDelExpiredDataEntries(segP);
203 numMsgs = segP->maxMsgNum - segP->minMsgNum;
204 if (numMsgs >= MAXNUMMESSAGES)
206 /* Yup, it's definitely full, no choice but to reset */
207 SISetProcStateInvalid(segP);
213 * Try to prevent table overflow. When the table is 70% full send a
214 * WAKEN_CHILDREN request to the postmaster. The postmaster will send
215 * a SIGUSR2 signal (ordinarily a NOTIFY signal) to all the backends.
216 * This will force idle backends to execute a transaction to look
217 * through pg_listener for NOTIFY messages, and as a byproduct of the
218 * transaction start they will read SI entries.
220 * This should never happen if all the backends are actively executing
221 * queries, but if a backend is sitting idle then it won't be starting
222 * transactions and so won't be reading SI entries.
226 if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
229 elog(DEBUG3, "SIInsertDataEntry: table is 70%% full, signaling postmaster");
230 SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
234 * Insert new message into proper slot of circular buffer
236 segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
243 * SISetProcStateInvalid
244 * Flush pending messages from buffer, assert reset flag for each backend
246 * This is used only to recover from SI buffer overflow.
249 SISetProcStateInvalid(SISeg *segP)
256 for (i = 0; i < segP->lastBackend; i++)
258 if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
260 segP->procState[i].resetState = true;
261 segP->procState[i].nextMsgNum = 0;
268 * get next SI message for specified backend, if there is one
270 * Possible return values:
271 * 0: no SI message available
272 * 1: next SI message has been extracted into *data
273 * (there may be more messages available after this one!)
274 * -1: SI reset message extracted
276 * NB: this can run in parallel with other instances of SIGetDataEntry
277 * executing on behalf of other backends. See comments in sinval.c in
278 * ReceiveSharedInvalidMessages().
281 SIGetDataEntry(SISeg *segP, int backendId,
282 SharedInvalidationMessage *data)
284 ProcState *stateP = &segP->procState[backendId - 1];
286 if (stateP->resetState)
289 * Force reset. We can say we have dealt with any messages added
290 * since the reset, as well...
292 stateP->resetState = false;
293 stateP->nextMsgNum = segP->maxMsgNum;
297 if (stateP->nextMsgNum >= segP->maxMsgNum)
298 return 0; /* nothing to read */
301 * Retrieve message and advance my counter.
303 *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
304 stateP->nextMsgNum++;
307 * There may be other backends that haven't read the message, so we
308 * cannot delete it here. SIDelExpiredDataEntries() should be called
309 * to remove dead messages.
311 return 1; /* got a message */
315 * SIDelExpiredDataEntries
316 * Remove messages that have been consumed by all active backends
319 SIDelExpiredDataEntries(SISeg *segP)
325 min = segP->maxMsgNum;
326 if (min == segP->minMsgNum)
327 return; /* fast path if no messages exist */
329 /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
331 for (i = 0; i < segP->lastBackend; i++)
333 h = segP->procState[i].nextMsgNum;
335 { /* backend active */
340 segP->minMsgNum = min;
343 * When minMsgNum gets really large, decrement all message counters so
344 * as to forestall overflow of the counters.
346 if (min >= MSGNUMWRAPAROUND)
348 segP->minMsgNum -= MSGNUMWRAPAROUND;
349 segP->maxMsgNum -= MSGNUMWRAPAROUND;
350 for (i = 0; i < segP->lastBackend; i++)
352 if (segP->procState[i].nextMsgNum >= 0)
353 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;