1 /*-------------------------------------------------------------------------
4 * POSTGRES shared cache invalidation segment definitions.
6 * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.33 2000/10/02 19:42:48 petere Exp $
13 *-------------------------------------------------------------------------
20 #include "miscadmin.h"
21 #include "storage/backendid.h"
22 #include "storage/proc.h"
23 #include "storage/sinval.h"
24 #include "storage/sinvaladt.h"
26 SISeg *shmInvalBuffer;
28 static void SISegmentAttach(IpcMemoryId shmid);
29 static void SISegInit(SISeg *segP, int maxBackends);
30 static void CleanupInvalidationState(int status, SISeg *segP);
31 static void SISetProcStateInvalid(SISeg *segP);
35 * Create a new SI memory segment, or attach to an existing one
37 * This is called with createNewSegment = true by the postmaster (or by
38 * a standalone backend), and subsequently with createNewSegment = false
39 * by backends started by the postmaster.
41 * Note: maxBackends param is only valid when createNewSegment is true
44 SISegmentInit(bool createNewSegment, IPCKey key, int maxBackends)
51 /* Kill existing segment, if any */
55 * Figure space needed. Note sizeof(SISeg) includes the first
58 segSize = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
60 /* Get a shared segment */
61 shmId = IpcMemoryCreate(key, segSize, IPCProtection);
64 perror("SISegmentInit: segment create failed");
65 return -1; /* an error */
68 /* Attach to the shared cache invalidation segment */
69 /* sets the global variable shmInvalBuffer */
70 SISegmentAttach(shmId);
72 /* Init shared memory contents */
73 SISegInit(shmInvalBuffer, maxBackends);
77 /* find existing segment */
78 shmId = IpcMemoryIdGet(key, 0);
81 perror("SISegmentInit: segment get failed");
82 return -1; /* an error */
85 /* Attach to the shared cache invalidation segment */
86 /* sets the global variable shmInvalBuffer */
87 SISegmentAttach(shmId);
94 * Attach to specified shared memory segment
97 SISegmentAttach(IpcMemoryId shmid)
99 shmInvalBuffer = (SISeg *) IpcMemoryAttach(shmid);
101 if (shmInvalBuffer == IpcMemAttachFailed)
103 /* XXX use validity function */
104 elog(FATAL, "SISegmentAttach: Could not attach segment: %m");
110 * Initialize contents of a new shared memory sinval segment
113 SISegInit(SISeg *segP, int maxBackends)
117 /* Clear message counters, save size of procState array */
120 segP->maxBackends = maxBackends;
122 /* The buffer[] array is initially all unused, so we need not fill it */
124 /* Mark all backends inactive */
125 for (i = 0; i < maxBackends; i++)
127 segP->procState[i].nextMsgNum = -1; /* inactive */
128 segP->procState[i].resetState = false;
129 segP->procState[i].tag = InvalidBackendTag;
130 segP->procState[i].procStruct = INVALID_OFFSET;
136 * Initialize a new backend to operate on the sinval buffer
138 * NB: this routine, and all following ones, must be executed with the
139 * SInvalLock spinlock held, since there may be multiple backends trying
140 * to access the buffer.
143 SIBackendInit(SISeg *segP)
146 ProcState *stateP = NULL;
148 Assert(MyBackendTag > 0);
150 /* Check for duplicate backend tags (should never happen) */
151 for (index = 0; index < segP->maxBackends; index++)
153 if (segP->procState[index].tag == MyBackendTag)
154 elog(FATAL, "SIBackendInit: tag %d already in use", MyBackendTag);
157 /* Look for a free entry in the procState array */
158 for (index = 0; index < segP->maxBackends; index++)
160 if (segP->procState[index].tag == InvalidBackendTag)
162 stateP = &segP->procState[index];
168 * elog() with spinlock held is probably not too cool, but this
169 * condition should never happen anyway.
173 elog(NOTICE, "SIBackendInit: no free procState slot available");
174 MyBackendId = InvalidBackendTag;
178 MyBackendId = (stateP - &segP->procState[0]) + 1;
181 elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.",
182 MyBackendTag, MyBackendId);
183 #endif /* INVALIDDEBUG */
185 /* mark myself active, with all extant messages already read */
186 stateP->nextMsgNum = segP->maxMsgNum;
187 stateP->resetState = false;
188 stateP->tag = MyBackendTag;
189 stateP->procStruct = MAKE_OFFSET(MyProc);
191 /* register exit routine to mark my entry inactive at exit */
192 on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
198 * CleanupInvalidationState
199 * Mark the current backend as no longer active.
201 * This function is called via on_shmem_exit() during backend shutdown,
202 * so the caller has NOT acquired the lock for us.
205 CleanupInvalidationState(int status,
208 Assert(PointerIsValid(segP));
210 SpinAcquire(SInvalLock);
212 segP->procState[MyBackendId - 1].nextMsgNum = -1;
213 segP->procState[MyBackendId - 1].resetState = false;
214 segP->procState[MyBackendId - 1].tag = InvalidBackendTag;
215 segP->procState[MyBackendId - 1].procStruct = INVALID_OFFSET;
217 SpinRelease(SInvalLock);
222 * Add a new invalidation message to the buffer.
224 * If we are unable to insert the message because the buffer is full,
225 * then clear the buffer and assert the "reset" flag to each backend.
226 * This will cause all the backends to discard *all* invalidatable state.
228 * Returns true for normal successful insertion, false if had to reset.
231 SIInsertDataEntry(SISeg *segP, SharedInvalidData *data)
233 int numMsgs = segP->maxMsgNum - segP->minMsgNum;
235 /* Is the buffer full? */
236 if (numMsgs >= MAXNUMMESSAGES)
240 * Don't panic just yet: slowest backend might have consumed some
241 * messages but not yet have done SIDelExpiredDataEntries() to
242 * advance minMsgNum. So, make sure minMsgNum is up-to-date.
244 SIDelExpiredDataEntries(segP);
245 numMsgs = segP->maxMsgNum - segP->minMsgNum;
246 if (numMsgs >= MAXNUMMESSAGES)
248 /* Yup, it's definitely full, no choice but to reset */
249 SISetProcStateInvalid(segP);
255 * Try to prevent table overflow. When the table is 70% full send a
256 * SIGUSR2 (ordinarily a NOTIFY signal) to the postmaster, which will
257 * send it back to all the backends. This will force idle backends to
258 * execute a transaction to look through pg_listener for NOTIFY
259 * messages, and as a byproduct of the transaction start they will
262 * This should never happen if all the backends are actively executing
263 * queries, but if a backend is sitting idle then it won't be starting
264 * transactions and so won't be reading SI entries.
268 if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
272 elog(DEBUG, "SIInsertDataEntry: table is 70%% full, signaling postmaster");
273 kill(getppid(), SIGUSR2);
277 * Insert new message into proper slot of circular buffer
279 segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
286 * SISetProcStateInvalid
287 * Flush pending messages from buffer, assert reset flag for each backend
289 * This is used only to recover from SI buffer overflow.
292 SISetProcStateInvalid(SISeg *segP)
299 for (i = 0; i < segP->maxBackends; i++)
301 if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
303 segP->procState[i].resetState = true;
304 segP->procState[i].nextMsgNum = 0;
311 * get next SI message for specified backend, if there is one
313 * Possible return values:
314 * 0: no SI message available
315 * 1: next SI message has been extracted into *data
316 * (there may be more messages available after this one!)
317 * -1: SI reset message extracted
320 SIGetDataEntry(SISeg *segP, int backendId,
321 SharedInvalidData *data)
323 ProcState *stateP = &segP->procState[backendId - 1];
325 Assert(stateP->tag == MyBackendTag);
327 if (stateP->resetState)
331 * Force reset. We can say we have dealt with any messages added
332 * since the reset, as well...
334 stateP->resetState = false;
335 stateP->nextMsgNum = segP->maxMsgNum;
339 if (stateP->nextMsgNum >= segP->maxMsgNum)
340 return 0; /* nothing to read */
343 * Retrieve message and advance my counter.
345 *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
346 stateP->nextMsgNum++;
349 * There may be other backends that haven't read the message, so we
350 * cannot delete it here. SIDelExpiredDataEntries() should be called
351 * to remove dead messages.
353 return 1; /* got a message */
357 * SIDelExpiredDataEntries
358 * Remove messages that have been consumed by all active backends
361 SIDelExpiredDataEntries(SISeg *segP)
367 min = segP->maxMsgNum;
368 if (min == segP->minMsgNum)
369 return; /* fast path if no messages exist */
371 /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
373 for (i = 0; i < segP->maxBackends; i++)
375 h = segP->procState[i].nextMsgNum;
377 { /* backend active */
382 segP->minMsgNum = min;
385 * When minMsgNum gets really large, decrement all message counters so
386 * as to forestall overflow of the counters.
388 if (min >= MSGNUMWRAPAROUND)
390 segP->minMsgNum -= MSGNUMWRAPAROUND;
391 segP->maxMsgNum -= MSGNUMWRAPAROUND;
392 for (i = 0; i < segP->maxBackends; i++)
394 if (segP->procState[i].nextMsgNum >= 0)
395 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;