]> granicus.if.org Git - postgresql/blob - src/backend/storage/ipc/sinvaladt.c
Code review for superuser_reserved_connections patch. Don't try to do
[postgresql] / src / backend / storage / ipc / sinvaladt.c
1 /*-------------------------------------------------------------------------
2  *
3  * sinvaladt.c
4  *        POSTGRES shared cache invalidation segment definitions.
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.49 2002/11/21 06:36:08 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "miscadmin.h"
18 #include "storage/backendid.h"
19 #include "storage/ipc.h"
20 #include "storage/pmsignal.h"
21 #include "storage/proc.h"
22 #include "storage/sinvaladt.h"
23
24 SISeg      *shmInvalBuffer;
25
26 static void CleanupInvalidationState(int status, Datum arg);
27 static void SISetProcStateInvalid(SISeg *segP);
28
29
30 /*
31  * SInvalShmemSize --- return shared-memory space needed
32  */
33 int
34 SInvalShmemSize(int maxBackends)
35 {
36         /*
37          * Figure space needed. Note sizeof(SISeg) includes the first
38          * ProcState entry.
39          */
40         return sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
41 }
42
43 /*
44  * SIBufferInit
45  *              Create and initialize a new SI message buffer
46  */
47 void
48 SIBufferInit(int maxBackends)
49 {
50         int                     segSize;
51         SISeg      *segP;
52         int                     i;
53
54         /* Allocate space in shared memory */
55         segSize = SInvalShmemSize(maxBackends);
56         shmInvalBuffer = segP = (SISeg *) ShmemAlloc(segSize);
57
58         /* Clear message counters, save size of procState array */
59         segP->minMsgNum = 0;
60         segP->maxMsgNum = 0;
61         segP->lastBackend = 0;
62         segP->maxBackends = maxBackends;
63         segP->freeBackends = maxBackends;
64
65         /* The buffer[] array is initially all unused, so we need not fill it */
66
67         /* Mark all backends inactive */
68         for (i = 0; i < maxBackends; i++)
69         {
70                 segP->procState[i].nextMsgNum = -1;             /* inactive */
71                 segP->procState[i].resetState = false;
72                 segP->procState[i].procStruct = INVALID_OFFSET;
73         }
74 }
75
76 /*
77  * SIBackendInit
78  *              Initialize a new backend to operate on the sinval buffer
79  *
80  * Returns:
81  *         >0   A-OK
82  *              0       Failed to find a free procState slot (ie, MaxBackends exceeded)
83  *         <0   Some other failure (not currently used)
84  *
85  * NB: this routine, and all following ones, must be executed with the
86  * SInvalLock lock held, since there may be multiple backends trying
87  * to access the buffer.
88  */
89 int
90 SIBackendInit(SISeg *segP)
91 {
92         int                     index;
93         ProcState  *stateP = NULL;
94
95         /* Look for a free entry in the procState array */
96         for (index = 0; index < segP->lastBackend; index++)
97         {
98                 if (segP->procState[index].nextMsgNum < 0)              /* inactive slot? */
99                 {
100                         stateP = &segP->procState[index];
101                         break;
102                 }
103         }
104
105         if (stateP == NULL)
106         {
107                 if (segP->lastBackend < segP->maxBackends)
108                 {
109                         stateP = &segP->procState[segP->lastBackend];
110                         Assert(stateP->nextMsgNum < 0);
111                         segP->lastBackend++;
112                 }
113                 else
114                 {
115                         /* out of procState slots */
116                         MyBackendId = InvalidBackendId;
117                         return 0;
118                 }
119         }
120
121         MyBackendId = (stateP - &segP->procState[0]) + 1;
122
123 #ifdef  INVALIDDEBUG
124         elog(DEBUG1, "SIBackendInit: backend id %d", MyBackendId);
125 #endif   /* INVALIDDEBUG */
126
127         /* Reduce free slot count */
128         segP->freeBackends--;
129
130         /* mark myself active, with all extant messages already read */
131         stateP->nextMsgNum = segP->maxMsgNum;
132         stateP->resetState = false;
133         stateP->procStruct = MAKE_OFFSET(MyProc);
134
135         /* register exit routine to mark my entry inactive at exit */
136         on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
137
138         return 1;
139 }
140
141 /*
142  * CleanupInvalidationState
143  *              Mark the current backend as no longer active.
144  *
145  * This function is called via on_shmem_exit() during backend shutdown,
146  * so the caller has NOT acquired the lock for us.
147  *
148  * arg is really of type "SISeg*".
149  */
150 static void
151 CleanupInvalidationState(int status, Datum arg)
152 {
153         SISeg      *segP = (SISeg *) DatumGetPointer(arg);
154         int                     i;
155
156         Assert(PointerIsValid(segP));
157
158         LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
159
160         /* Mark myself inactive */
161         segP->procState[MyBackendId - 1].nextMsgNum = -1;
162         segP->procState[MyBackendId - 1].resetState = false;
163         segP->procState[MyBackendId - 1].procStruct = INVALID_OFFSET;
164
165         /* Recompute index of last active backend */
166         for (i = segP->lastBackend; i > 0; i--)
167         {
168                 if (segP->procState[i - 1].nextMsgNum >= 0)
169                         break;
170         }
171         segP->lastBackend = i;
172
173         /* Adjust free slot count */
174         segP->freeBackends++;
175
176         LWLockRelease(SInvalLock);
177 }
178
179 /*
180  * SIInsertDataEntry
181  *              Add a new invalidation message to the buffer.
182  *
183  * If we are unable to insert the message because the buffer is full,
184  * then clear the buffer and assert the "reset" flag to each backend.
185  * This will cause all the backends to discard *all* invalidatable state.
186  *
187  * Returns true for normal successful insertion, false if had to reset.
188  */
189 bool
190 SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
191 {
192         int                     numMsgs = segP->maxMsgNum - segP->minMsgNum;
193
194         /* Is the buffer full? */
195         if (numMsgs >= MAXNUMMESSAGES)
196         {
197                 /*
198                  * Don't panic just yet: slowest backend might have consumed some
199                  * messages but not yet have done SIDelExpiredDataEntries() to
200                  * advance minMsgNum.  So, make sure minMsgNum is up-to-date.
201                  */
202                 SIDelExpiredDataEntries(segP);
203                 numMsgs = segP->maxMsgNum - segP->minMsgNum;
204                 if (numMsgs >= MAXNUMMESSAGES)
205                 {
206                         /* Yup, it's definitely full, no choice but to reset */
207                         SISetProcStateInvalid(segP);
208                         return false;
209                 }
210         }
211
212         /*
213          * Try to prevent table overflow.  When the table is 70% full send a
214          * WAKEN_CHILDREN request to the postmaster.  The postmaster will send
215          * a SIGUSR2 signal (ordinarily a NOTIFY signal) to all the backends.
216          * This will force idle backends to execute a transaction to look
217          * through pg_listener for NOTIFY messages, and as a byproduct of the
218          * transaction start they will read SI entries.
219          *
220          * This should never happen if all the backends are actively executing
221          * queries, but if a backend is sitting idle then it won't be starting
222          * transactions and so won't be reading SI entries.
223          *
224          * dz - 27 Jan 1998
225          */
226         if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
227                 IsUnderPostmaster)
228         {
229                 elog(DEBUG3, "SIInsertDataEntry: table is 70%% full, signaling postmaster");
230                 SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
231         }
232
233         /*
234          * Insert new message into proper slot of circular buffer
235          */
236         segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
237         segP->maxMsgNum++;
238
239         return true;
240 }
241
242 /*
243  * SISetProcStateInvalid
244  *              Flush pending messages from buffer, assert reset flag for each backend
245  *
246  * This is used only to recover from SI buffer overflow.
247  */
248 static void
249 SISetProcStateInvalid(SISeg *segP)
250 {
251         int                     i;
252
253         segP->minMsgNum = 0;
254         segP->maxMsgNum = 0;
255
256         for (i = 0; i < segP->lastBackend; i++)
257         {
258                 if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
259                 {
260                         segP->procState[i].resetState = true;
261                         segP->procState[i].nextMsgNum = 0;
262                 }
263         }
264 }
265
266 /*
267  * SIGetDataEntry
268  *              get next SI message for specified backend, if there is one
269  *
270  * Possible return values:
271  *      0: no SI message available
272  *      1: next SI message has been extracted into *data
273  *              (there may be more messages available after this one!)
274  * -1: SI reset message extracted
275  *
276  * NB: this can run in parallel with other instances of SIGetDataEntry
277  * executing on behalf of other backends.  See comments in sinval.c in
278  * ReceiveSharedInvalidMessages().
279  */
280 int
281 SIGetDataEntry(SISeg *segP, int backendId,
282                            SharedInvalidationMessage *data)
283 {
284         ProcState  *stateP = &segP->procState[backendId - 1];
285
286         if (stateP->resetState)
287         {
288                 /*
289                  * Force reset.  We can say we have dealt with any messages added
290                  * since the reset, as well...
291                  */
292                 stateP->resetState = false;
293                 stateP->nextMsgNum = segP->maxMsgNum;
294                 return -1;
295         }
296
297         if (stateP->nextMsgNum >= segP->maxMsgNum)
298                 return 0;                               /* nothing to read */
299
300         /*
301          * Retrieve message and advance my counter.
302          */
303         *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
304         stateP->nextMsgNum++;
305
306         /*
307          * There may be other backends that haven't read the message, so we
308          * cannot delete it here. SIDelExpiredDataEntries() should be called
309          * to remove dead messages.
310          */
311         return 1;                                       /* got a message */
312 }
313
314 /*
315  * SIDelExpiredDataEntries
316  *              Remove messages that have been consumed by all active backends
317  */
318 void
319 SIDelExpiredDataEntries(SISeg *segP)
320 {
321         int                     min,
322                                 i,
323                                 h;
324
325         min = segP->maxMsgNum;
326         if (min == segP->minMsgNum)
327                 return;                                 /* fast path if no messages exist */
328
329         /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
330
331         for (i = 0; i < segP->lastBackend; i++)
332         {
333                 h = segP->procState[i].nextMsgNum;
334                 if (h >= 0)
335                 {                                               /* backend active */
336                         if (h < min)
337                                 min = h;
338                 }
339         }
340         segP->minMsgNum = min;
341
342         /*
343          * When minMsgNum gets really large, decrement all message counters so
344          * as to forestall overflow of the counters.
345          */
346         if (min >= MSGNUMWRAPAROUND)
347         {
348                 segP->minMsgNum -= MSGNUMWRAPAROUND;
349                 segP->maxMsgNum -= MSGNUMWRAPAROUND;
350                 for (i = 0; i < segP->lastBackend; i++)
351                 {
352                         if (segP->procState[i].nextMsgNum >= 0)
353                                 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
354                 }
355         }
356 }