]> granicus.if.org Git - postgresql/blob - src/backend/storage/ipc/sinvaladt.c
Banish caddr_t (mostly), use Datum where appropriate.
[postgresql] / src / backend / storage / ipc / sinvaladt.c
1 /*-------------------------------------------------------------------------
2  *
3  * sinvaladt.c
4  *        POSTGRES shared cache invalidation segment definitions.
5  *
6  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.33 2000/10/02 19:42:48 petere Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include <signal.h>
18 #include <unistd.h>
19
20 #include "miscadmin.h"
21 #include "storage/backendid.h"
22 #include "storage/proc.h"
23 #include "storage/sinval.h"
24 #include "storage/sinvaladt.h"
25
26 SISeg      *shmInvalBuffer;
27
28 static void SISegmentAttach(IpcMemoryId shmid);
29 static void SISegInit(SISeg *segP, int maxBackends);
30 static void CleanupInvalidationState(int status, SISeg *segP);
31 static void SISetProcStateInvalid(SISeg *segP);
32
33 /*
34  * SISegmentInit
35  *              Create a new SI memory segment, or attach to an existing one
36  *
37  * This is called with createNewSegment = true by the postmaster (or by
38  * a standalone backend), and subsequently with createNewSegment = false
39  * by backends started by the postmaster.
40  *
41  * Note: maxBackends param is only valid when createNewSegment is true
42  */
43 int
44 SISegmentInit(bool createNewSegment, IPCKey key, int maxBackends)
45 {
46         int                     segSize;
47         IpcMemoryId shmId;
48
49         if (createNewSegment)
50         {
51                 /* Kill existing segment, if any */
52                 IpcMemoryKill(key);
53
54                 /*
55                  * Figure space needed. Note sizeof(SISeg) includes the first
56                  * ProcState entry.
57                  */
58                 segSize = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
59
60                 /* Get a shared segment */
61                 shmId = IpcMemoryCreate(key, segSize, IPCProtection);
62                 if (shmId < 0)
63                 {
64                         perror("SISegmentInit: segment create failed");
65                         return -1;                      /* an error */
66                 }
67
68                 /* Attach to the shared cache invalidation segment */
69                 /* sets the global variable shmInvalBuffer */
70                 SISegmentAttach(shmId);
71
72                 /* Init shared memory contents */
73                 SISegInit(shmInvalBuffer, maxBackends);
74         }
75         else
76         {
77                 /* find existing segment */
78                 shmId = IpcMemoryIdGet(key, 0);
79                 if (shmId < 0)
80                 {
81                         perror("SISegmentInit: segment get failed");
82                         return -1;                      /* an error */
83                 }
84
85                 /* Attach to the shared cache invalidation segment */
86                 /* sets the global variable shmInvalBuffer */
87                 SISegmentAttach(shmId);
88         }
89         return 1;
90 }
91
92 /*
93  * SISegmentAttach
94  *              Attach to specified shared memory segment
95  */
96 static void
97 SISegmentAttach(IpcMemoryId shmid)
98 {
99         shmInvalBuffer = (SISeg *) IpcMemoryAttach(shmid);
100
101         if (shmInvalBuffer == IpcMemAttachFailed)
102         {
103                 /* XXX use validity function */
104                 elog(FATAL, "SISegmentAttach: Could not attach segment: %m");
105         }
106 }
107
108 /*
109  * SISegInit
110  *              Initialize contents of a new shared memory sinval segment
111  */
112 static void
113 SISegInit(SISeg *segP, int maxBackends)
114 {
115         int                     i;
116
117         /* Clear message counters, save size of procState array */
118         segP->minMsgNum = 0;
119         segP->maxMsgNum = 0;
120         segP->maxBackends = maxBackends;
121
122         /* The buffer[] array is initially all unused, so we need not fill it */
123
124         /* Mark all backends inactive */
125         for (i = 0; i < maxBackends; i++)
126         {
127                 segP->procState[i].nextMsgNum = -1;             /* inactive */
128                 segP->procState[i].resetState = false;
129                 segP->procState[i].tag = InvalidBackendTag;
130                 segP->procState[i].procStruct = INVALID_OFFSET;
131         }
132 }
133
134 /*
135  * SIBackendInit
136  *              Initialize a new backend to operate on the sinval buffer
137  *
138  * NB: this routine, and all following ones, must be executed with the
139  * SInvalLock spinlock held, since there may be multiple backends trying
140  * to access the buffer.
141  */
142 int
143 SIBackendInit(SISeg *segP)
144 {
145         int                     index;
146         ProcState  *stateP = NULL;
147
148         Assert(MyBackendTag > 0);
149
150         /* Check for duplicate backend tags (should never happen) */
151         for (index = 0; index < segP->maxBackends; index++)
152         {
153                 if (segP->procState[index].tag == MyBackendTag)
154                         elog(FATAL, "SIBackendInit: tag %d already in use", MyBackendTag);
155         }
156
157         /* Look for a free entry in the procState array */
158         for (index = 0; index < segP->maxBackends; index++)
159         {
160                 if (segP->procState[index].tag == InvalidBackendTag)
161                 {
162                         stateP = &segP->procState[index];
163                         break;
164                 }
165         }
166
167         /*
168          * elog() with spinlock held is probably not too cool, but this
169          * condition should never happen anyway.
170          */
171         if (stateP == NULL)
172         {
173                 elog(NOTICE, "SIBackendInit: no free procState slot available");
174                 MyBackendId = InvalidBackendTag;
175                 return 0;
176         }
177
178         MyBackendId = (stateP - &segP->procState[0]) + 1;
179
180 #ifdef  INVALIDDEBUG
181         elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.",
182                  MyBackendTag, MyBackendId);
183 #endif   /* INVALIDDEBUG */
184
185         /* mark myself active, with all extant messages already read */
186         stateP->nextMsgNum = segP->maxMsgNum;
187         stateP->resetState = false;
188         stateP->tag = MyBackendTag;
189         stateP->procStruct = MAKE_OFFSET(MyProc);
190
191         /* register exit routine to mark my entry inactive at exit */
192         on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
193
194         return 1;
195 }
196
197 /*
198  * CleanupInvalidationState
199  *              Mark the current backend as no longer active.
200  *
201  * This function is called via on_shmem_exit() during backend shutdown,
202  * so the caller has NOT acquired the lock for us.
203  */
204 static void
205 CleanupInvalidationState(int status,
206                                                  SISeg *segP)
207 {
208         Assert(PointerIsValid(segP));
209
210         SpinAcquire(SInvalLock);
211
212         segP->procState[MyBackendId - 1].nextMsgNum = -1;
213         segP->procState[MyBackendId - 1].resetState = false;
214         segP->procState[MyBackendId - 1].tag = InvalidBackendTag;
215         segP->procState[MyBackendId - 1].procStruct = INVALID_OFFSET;
216
217         SpinRelease(SInvalLock);
218 }
219
220 /*
221  * SIInsertDataEntry
222  *              Add a new invalidation message to the buffer.
223  *
224  * If we are unable to insert the message because the buffer is full,
225  * then clear the buffer and assert the "reset" flag to each backend.
226  * This will cause all the backends to discard *all* invalidatable state.
227  *
228  * Returns true for normal successful insertion, false if had to reset.
229  */
230 bool
231 SIInsertDataEntry(SISeg *segP, SharedInvalidData *data)
232 {
233         int                     numMsgs = segP->maxMsgNum - segP->minMsgNum;
234
235         /* Is the buffer full? */
236         if (numMsgs >= MAXNUMMESSAGES)
237         {
238
239                 /*
240                  * Don't panic just yet: slowest backend might have consumed some
241                  * messages but not yet have done SIDelExpiredDataEntries() to
242                  * advance minMsgNum.  So, make sure minMsgNum is up-to-date.
243                  */
244                 SIDelExpiredDataEntries(segP);
245                 numMsgs = segP->maxMsgNum - segP->minMsgNum;
246                 if (numMsgs >= MAXNUMMESSAGES)
247                 {
248                         /* Yup, it's definitely full, no choice but to reset */
249                         SISetProcStateInvalid(segP);
250                         return false;
251                 }
252         }
253
254         /*
255          * Try to prevent table overflow.  When the table is 70% full send a
256          * SIGUSR2 (ordinarily a NOTIFY signal) to the postmaster, which will
257          * send it back to all the backends.  This will force idle backends to
258          * execute a transaction to look through pg_listener for NOTIFY
259          * messages, and as a byproduct of the transaction start they will
260          * read SI entries.
261          *
262          * This should never happen if all the backends are actively executing
263          * queries, but if a backend is sitting idle then it won't be starting
264          * transactions and so won't be reading SI entries.
265          *
266          * dz - 27 Jan 1998
267          */
268         if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
269                 IsUnderPostmaster)
270         {
271                 if (DebugLvl >= 1)
272                         elog(DEBUG, "SIInsertDataEntry: table is 70%% full, signaling postmaster");
273                 kill(getppid(), SIGUSR2);
274         }
275
276         /*
277          * Insert new message into proper slot of circular buffer
278          */
279         segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
280         segP->maxMsgNum++;
281
282         return true;
283 }
284
285 /*
286  * SISetProcStateInvalid
287  *              Flush pending messages from buffer, assert reset flag for each backend
288  *
289  * This is used only to recover from SI buffer overflow.
290  */
291 static void
292 SISetProcStateInvalid(SISeg *segP)
293 {
294         int                     i;
295
296         segP->minMsgNum = 0;
297         segP->maxMsgNum = 0;
298
299         for (i = 0; i < segP->maxBackends; i++)
300         {
301                 if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
302                 {
303                         segP->procState[i].resetState = true;
304                         segP->procState[i].nextMsgNum = 0;
305                 }
306         }
307 }
308
309 /*
310  * SIGetDataEntry
311  *              get next SI message for specified backend, if there is one
312  *
313  * Possible return values:
314  *      0: no SI message available
315  *      1: next SI message has been extracted into *data
316  *              (there may be more messages available after this one!)
317  * -1: SI reset message extracted
318  */
319 int
320 SIGetDataEntry(SISeg *segP, int backendId,
321                            SharedInvalidData *data)
322 {
323         ProcState  *stateP = &segP->procState[backendId - 1];
324
325         Assert(stateP->tag == MyBackendTag);
326
327         if (stateP->resetState)
328         {
329
330                 /*
331                  * Force reset.  We can say we have dealt with any messages added
332                  * since the reset, as well...
333                  */
334                 stateP->resetState = false;
335                 stateP->nextMsgNum = segP->maxMsgNum;
336                 return -1;
337         }
338
339         if (stateP->nextMsgNum >= segP->maxMsgNum)
340                 return 0;                               /* nothing to read */
341
342         /*
343          * Retrieve message and advance my counter.
344          */
345         *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
346         stateP->nextMsgNum++;
347
348         /*
349          * There may be other backends that haven't read the message, so we
350          * cannot delete it here. SIDelExpiredDataEntries() should be called
351          * to remove dead messages.
352          */
353         return 1;                                       /* got a message */
354 }
355
356 /*
357  * SIDelExpiredDataEntries
358  *              Remove messages that have been consumed by all active backends
359  */
360 void
361 SIDelExpiredDataEntries(SISeg *segP)
362 {
363         int                     min,
364                                 i,
365                                 h;
366
367         min = segP->maxMsgNum;
368         if (min == segP->minMsgNum)
369                 return;                                 /* fast path if no messages exist */
370
371         /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
372
373         for (i = 0; i < segP->maxBackends; i++)
374         {
375                 h = segP->procState[i].nextMsgNum;
376                 if (h >= 0)
377                 {                                               /* backend active */
378                         if (h < min)
379                                 min = h;
380                 }
381         }
382         segP->minMsgNum = min;
383
384         /*
385          * When minMsgNum gets really large, decrement all message counters so
386          * as to forestall overflow of the counters.
387          */
388         if (min >= MSGNUMWRAPAROUND)
389         {
390                 segP->minMsgNum -= MSGNUMWRAPAROUND;
391                 segP->maxMsgNum -= MSGNUMWRAPAROUND;
392                 for (i = 0; i < segP->maxBackends; i++)
393                 {
394                         if (segP->procState[i].nextMsgNum >= 0)
395                                 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
396                 }
397         }
398 }