]> granicus.if.org Git - postgresql/blob - src/backend/storage/ipc/sinvaladt.c
7c4956ae600697c932289fc454dffa41630879c0
[postgresql] / src / backend / storage / ipc / sinvaladt.c
1 /*-------------------------------------------------------------------------
2  *
3  * sinvaladt.c
4  *        POSTGRES shared cache invalidation segment definitions.
5  *
6  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.67 2008/03/16 19:47:33 alvherre Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "miscadmin.h"
18 #include "storage/backendid.h"
19 #include "storage/ipc.h"
20 #include "storage/lwlock.h"
21 #include "storage/pmsignal.h"
22 #include "storage/proc.h"
23 #include "storage/shmem.h"
24 #include "storage/sinvaladt.h"
25
26
27 /*
28  * Conceptually, the shared cache invalidation messages are stored in an
29  * infinite array, where maxMsgNum is the next array subscript to store a
30  * submitted message in, minMsgNum is the smallest array subscript containing a
31  * message not yet read by all backends, and we always have maxMsgNum >=
32  * minMsgNum.  (They are equal when there are no messages pending.)  For each
33  * active backend, there is a nextMsgNum pointer indicating the next message it
34  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
35  * backend.
36  *
37  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
38  * entries.  We translate MsgNum values into circular-buffer indexes by
39  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
40  * MAXNUMMESSAGES is a constant and a power of 2).      As long as maxMsgNum
41  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
42  * in the buffer.  If the buffer does overflow, we reset it to empty and
43  * force each backend to "reset", ie, discard all its invalidatable state.
44  *
45  * We would have problems if the MsgNum values overflow an integer, so
46  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
47  * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
48  * large so that we don't need to do this often.  It must be a multiple of
49  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
50  * to be moved when we do it.
51  */
52
53
54 /*
55  * Configurable parameters.
56  *
57  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
58  * Must be a power of 2 for speed.
59  *
60  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
61  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
62  */
63
64 #define MAXNUMMESSAGES 4096
65 #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
66
67
68 /* Shared cache invalidation memory segment */
69 typedef struct SISeg
70 {
71         /*
72          * General state information
73          */
74         int                     minMsgNum;              /* oldest message still needed */
75         int                     maxMsgNum;              /* next message number to be assigned */
76         int                     lastBackend;    /* index of last active procState entry, +1 */
77         int                     maxBackends;    /* size of procState array */
78         int                     freeBackends;   /* number of empty procState slots */
79
80         /*
81          * Next LocalTransactionId to use for each idle backend slot.  We keep
82          * this here because it is indexed by BackendId and it is convenient to
83          * copy the value to and from local memory when MyBackendId is set.
84          */
85         LocalTransactionId *nextLXID;           /* array of maxBackends entries */
86
87         /*
88          * Circular buffer holding shared-inval messages
89          */
90         SharedInvalidationMessage buffer[MAXNUMMESSAGES];
91
92         /*
93          * Per-backend state info.
94          *
95          * We declare procState as 1 entry because C wants a fixed-size array, but
96          * actually it is maxBackends entries long.
97          */
98         ProcState       procState[1];   /* reflects the invalidation state */
99 } SISeg;
100
101 static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
102
103
104 static LocalTransactionId nextLocalTransactionId;
105
106 static void CleanupInvalidationState(int status, Datum arg);
107 static void SISetProcStateInvalid(SISeg *segP);
108
109
110 /*
111  * SInvalShmemSize --- return shared-memory space needed
112  */
113 Size
114 SInvalShmemSize(void)
115 {
116         Size            size;
117
118         size = offsetof(SISeg, procState);
119         size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
120
121         size = add_size(size, mul_size(sizeof(LocalTransactionId), MaxBackends));
122
123         return size;
124 }
125
126 /*
127  * SharedInvalBufferInit
128  *              Create and initialize the SI message buffer
129  */
130 void
131 CreateSharedInvalidationState(void)
132 {
133         Size            size;
134         int                     i;
135         bool            found;
136
137         /* Allocate space in shared memory */
138         size = offsetof(SISeg, procState);
139         size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
140
141         shmInvalBuffer = (SISeg *)
142                 ShmemInitStruct("shmInvalBuffer", size, &found);
143         if (found)
144                 return;
145
146         shmInvalBuffer->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
147
148         /* Clear message counters, save size of procState array */
149         shmInvalBuffer->minMsgNum = 0;
150         shmInvalBuffer->maxMsgNum = 0;
151         shmInvalBuffer->lastBackend = 0;
152         shmInvalBuffer->maxBackends = MaxBackends;
153         shmInvalBuffer->freeBackends = MaxBackends;
154
155         /* The buffer[] array is initially all unused, so we need not fill it */
156
157         /* Mark all backends inactive, and initialize nextLXID */
158         for (i = 0; i < shmInvalBuffer->maxBackends; i++)
159         {
160                 shmInvalBuffer->procState[i].nextMsgNum = -1;           /* inactive */
161                 shmInvalBuffer->procState[i].resetState = false;
162                 shmInvalBuffer->nextLXID[i] = InvalidLocalTransactionId;
163         }
164 }
165
166 /*
167  * SharedInvalBackendInit
168  *              Initialize a new backend to operate on the sinval buffer
169  */
170 void
171 SharedInvalBackendInit(void)
172 {
173         int                     index;
174         ProcState  *stateP = NULL;
175         SISeg      *segP = shmInvalBuffer;
176
177         LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
178
179         /* Look for a free entry in the procState array */
180         for (index = 0; index < segP->lastBackend; index++)
181         {
182                 if (segP->procState[index].nextMsgNum < 0)              /* inactive slot? */
183                 {
184                         stateP = &segP->procState[index];
185                         break;
186                 }
187         }
188
189         if (stateP == NULL)
190         {
191                 if (segP->lastBackend < segP->maxBackends)
192                 {
193                         stateP = &segP->procState[segP->lastBackend];
194                         Assert(stateP->nextMsgNum < 0);
195                         segP->lastBackend++;
196                 }
197                 else
198                 {
199                         /*
200                          * out of procState slots: MaxBackends exceeded -- report normally
201                          */
202                         MyBackendId = InvalidBackendId;
203                         LWLockRelease(SInvalLock);
204                         ereport(FATAL,
205                                         (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
206                                          errmsg("sorry, too many clients already")));
207                 }
208         }
209
210         MyBackendId = (stateP - &segP->procState[0]) + 1;
211
212 #ifdef  INVALIDDEBUG
213         elog(DEBUG2, "my backend id is %d", MyBackendId);
214 #endif   /* INVALIDDEBUG */
215
216         /* Advertise assigned backend ID in MyProc */
217         MyProc->backendId = MyBackendId;
218
219         /* Reduce free slot count */
220         segP->freeBackends--;
221
222         /* Fetch next local transaction ID into local memory */
223         nextLocalTransactionId = segP->nextLXID[MyBackendId - 1];
224
225         /* mark myself active, with all extant messages already read */
226         stateP->nextMsgNum = segP->maxMsgNum;
227         stateP->resetState = false;
228
229         LWLockRelease(SInvalLock);
230
231         /* register exit routine to mark my entry inactive at exit */
232         on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
233 }
234
235 /*
236  * CleanupInvalidationState
237  *              Mark the current backend as no longer active.
238  *
239  * This function is called via on_shmem_exit() during backend shutdown,
240  * so the caller has NOT acquired the lock for us.
241  *
242  * arg is really of type "SISeg*".
243  */
244 static void
245 CleanupInvalidationState(int status, Datum arg)
246 {
247         SISeg      *segP = (SISeg *) DatumGetPointer(arg);
248         int                     i;
249
250         Assert(PointerIsValid(segP));
251
252         LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
253
254         /* Update next local transaction ID for next holder of this backendID */
255         segP->nextLXID[MyBackendId - 1] = nextLocalTransactionId;
256
257         /* Mark myself inactive */
258         segP->procState[MyBackendId - 1].nextMsgNum = -1;
259         segP->procState[MyBackendId - 1].resetState = false;
260
261         /* Recompute index of last active backend */
262         for (i = segP->lastBackend; i > 0; i--)
263         {
264                 if (segP->procState[i - 1].nextMsgNum >= 0)
265                         break;
266         }
267         segP->lastBackend = i;
268
269         /* Adjust free slot count */
270         segP->freeBackends++;
271
272         LWLockRelease(SInvalLock);
273 }
274
275 /*
276  * SIInsertDataEntry
277  *              Add a new invalidation message to the buffer.
278  *
279  * If we are unable to insert the message because the buffer is full,
280  * then clear the buffer and assert the "reset" flag to each backend.
281  * This will cause all the backends to discard *all* invalidatable state.
282  *
283  * Returns true for normal successful insertion, false if had to reset.
284  */
285 bool
286 SIInsertDataEntry(SharedInvalidationMessage *data)
287 {
288         int                     numMsgs;
289         bool            signal_postmaster = false;
290         SISeg      *segP;
291
292         LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
293
294         segP = shmInvalBuffer;
295         numMsgs = segP->maxMsgNum - segP->minMsgNum;
296
297         /* Is the buffer full? */
298         if (numMsgs >= MAXNUMMESSAGES)
299         {
300                 /*
301                  * Don't panic just yet: slowest backend might have consumed some
302                  * messages but not yet have done SIDelExpiredDataEntries() to advance
303                  * minMsgNum.  So, make sure minMsgNum is up-to-date.
304                  */
305                 SIDelExpiredDataEntries(true);
306                 numMsgs = segP->maxMsgNum - segP->minMsgNum;
307                 if (numMsgs >= MAXNUMMESSAGES)
308                 {
309                         /* Yup, it's definitely full, no choice but to reset */
310                         SISetProcStateInvalid(segP);
311                         LWLockRelease(SInvalLock);
312                         return false;
313                 }
314         }
315
316         /*
317          * Try to prevent table overflow.  When the table is 70% full send a
318          * WAKEN_CHILDREN request to the postmaster.  The postmaster will send a
319          * SIGUSR1 signal to all the backends, which will cause sinval.c to read
320          * any pending SI entries.
321          *
322          * This should never happen if all the backends are actively executing
323          * queries, but if a backend is sitting idle then it won't be starting
324          * transactions and so won't be reading SI entries.
325          */
326         if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
327                 IsUnderPostmaster)
328         {
329                 elog(DEBUG4, "SI table is 70%% full, signaling postmaster");
330                 signal_postmaster = true;
331         }
332
333         /*
334          * Insert new message into proper slot of circular buffer
335          */
336         segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
337         segP->maxMsgNum++;
338
339         LWLockRelease(SInvalLock);
340
341         if (signal_postmaster)
342                 SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
343
344         return true;
345 }
346
347 /*
348  * SISetProcStateInvalid
349  *              Flush pending messages from buffer, assert reset flag for each backend
350  *
351  * This is used only to recover from SI buffer overflow.
352  */
353 static void
354 SISetProcStateInvalid(SISeg *segP)
355 {
356         int                     i;
357
358         segP->minMsgNum = 0;
359         segP->maxMsgNum = 0;
360
361         for (i = 0; i < segP->lastBackend; i++)
362         {
363                 if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
364                 {
365                         segP->procState[i].resetState = true;
366                         segP->procState[i].nextMsgNum = 0;
367                 }
368         }
369 }
370
371 /*
372  * SIGetDataEntry
373  *              get next SI message for specified backend, if there is one
374  *
375  * Possible return values:
376  *      0: no SI message available
377  *      1: next SI message has been extracted into *data
378  *              (there may be more messages available after this one!)
379  * -1: SI reset message extracted
380  *
381  * NB: this can run in parallel with other instances of SIGetDataEntry
382  * executing on behalf of other backends, since each instance will modify only
383  * fields of its own backend's ProcState, and no instance will look at fields
384  * of other backends' ProcStates.  We express this by grabbing SInvalLock in
385  * shared mode.  Note that this is not exactly the normal (read-only)
386  * interpretation of a shared lock! Look closely at the interactions before
387  * allowing SInvalLock to be grabbed in shared mode for any other reason!
388  */
389 int
390 SIGetDataEntry(int backendId, SharedInvalidationMessage *data)
391 {
392         ProcState  *stateP;
393         SISeg      *segP;
394         
395         LWLockAcquire(SInvalLock, LW_SHARED);
396
397         segP = shmInvalBuffer;
398         stateP = &segP->procState[backendId - 1];
399
400         if (stateP->resetState)
401         {
402                 /*
403                  * Force reset.  We can say we have dealt with any messages added
404                  * since the reset, as well...
405                  */
406                 stateP->resetState = false;
407                 stateP->nextMsgNum = segP->maxMsgNum;
408                 LWLockRelease(SInvalLock);
409                 return -1;
410         }
411
412         if (stateP->nextMsgNum >= segP->maxMsgNum)
413         {
414                 LWLockRelease(SInvalLock);
415                 return 0;                               /* nothing to read */
416         }
417
418         /*
419          * Retrieve message and advance my counter.
420          */
421         *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
422         stateP->nextMsgNum++;
423
424         /*
425          * There may be other backends that haven't read the message, so we cannot
426          * delete it here. SIDelExpiredDataEntries() should be called to remove
427          * dead messages.
428          */
429
430         LWLockRelease(SInvalLock);
431         return 1;                                       /* got a message */
432 }
433
434 /*
435  * SIDelExpiredDataEntries
436  *              Remove messages that have been consumed by all active backends
437  */
438 void
439 SIDelExpiredDataEntries(bool locked)
440 {
441         SISeg      *segP = shmInvalBuffer;
442         int                     min,
443                                 i,
444                                 h;
445
446         if (!locked)
447                 LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
448
449         min = segP->maxMsgNum;
450         if (min == segP->minMsgNum)
451         {
452                 if (!locked)
453                         LWLockRelease(SInvalLock);
454                 return;                                 /* fast path if no messages exist */
455         }
456
457         /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
458
459         for (i = 0; i < segP->lastBackend; i++)
460         {
461                 h = segP->procState[i].nextMsgNum;
462                 if (h >= 0)
463                 {                                               /* backend active */
464                         if (h < min)
465                                 min = h;
466                 }
467         }
468         segP->minMsgNum = min;
469
470         /*
471          * When minMsgNum gets really large, decrement all message counters so as
472          * to forestall overflow of the counters.
473          */
474         if (min >= MSGNUMWRAPAROUND)
475         {
476                 segP->minMsgNum -= MSGNUMWRAPAROUND;
477                 segP->maxMsgNum -= MSGNUMWRAPAROUND;
478                 for (i = 0; i < segP->lastBackend; i++)
479                 {
480                         if (segP->procState[i].nextMsgNum >= 0)
481                                 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
482                 }
483         }
484
485         if (!locked)
486                 LWLockRelease(SInvalLock);
487 }
488
489
490 /*
491  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
492  *
493  * We split VirtualTransactionIds into two parts so that it is possible
494  * to allocate a new one without any contention for shared memory, except
495  * for a bit of additional overhead during backend startup/shutdown.
496  * The high-order part of a VirtualTransactionId is a BackendId, and the
497  * low-order part is a LocalTransactionId, which we assign from a local
498  * counter.  To avoid the risk of a VirtualTransactionId being reused
499  * within a short interval, successive procs occupying the same backend ID
500  * slot should use a consecutive sequence of local IDs, which is implemented
501  * by copying nextLocalTransactionId as seen above.
502  */
503 LocalTransactionId
504 GetNextLocalTransactionId(void)
505 {
506         LocalTransactionId result;
507
508         /* loop to avoid returning InvalidLocalTransactionId at wraparound */
509         do
510         {
511                 result = nextLocalTransactionId++;
512         } while (!LocalTransactionIdIsValid(result));
513
514         return result;
515 }