]> granicus.if.org Git - postgresql/blob - src/backend/storage/ipc/sinvaladt.c
Add code to allow profiling of backends on Linux: save and restore the
[postgresql] / src / backend / storage / ipc / sinvaladt.c
1 /*-------------------------------------------------------------------------
2  *
3  * sinvaladt.c
4  *        POSTGRES shared cache invalidation segment definitions.
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.43 2001/11/05 17:46:28 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "miscadmin.h"
18 #include "storage/backendid.h"
19 #include "storage/pmsignal.h"
20 #include "storage/proc.h"
21 #include "storage/sinvaladt.h"
22
23 SISeg      *shmInvalBuffer;
24
25 static void CleanupInvalidationState(int status, Datum arg);
26 static void SISetProcStateInvalid(SISeg *segP);
27
28
29 /*
30  * SInvalShmemSize --- return shared-memory space needed
31  */
32 int
33 SInvalShmemSize(int maxBackends)
34 {
35         /*
36          * Figure space needed. Note sizeof(SISeg) includes the first
37          * ProcState entry.
38          */
39         return sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
40 }
41
42 /*
43  * SIBufferInit
44  *              Create and initialize a new SI message buffer
45  */
46 void
47 SIBufferInit(int maxBackends)
48 {
49         int                     segSize;
50         SISeg      *segP;
51         int                     i;
52
53         /* Allocate space in shared memory */
54         segSize = SInvalShmemSize(maxBackends);
55         shmInvalBuffer = segP = (SISeg *) ShmemAlloc(segSize);
56
57         /* Clear message counters, save size of procState array */
58         segP->minMsgNum = 0;
59         segP->maxMsgNum = 0;
60         segP->lastBackend = 0;
61         segP->maxBackends = maxBackends;
62
63         /* The buffer[] array is initially all unused, so we need not fill it */
64
65         /* Mark all backends inactive */
66         for (i = 0; i < maxBackends; i++)
67         {
68                 segP->procState[i].nextMsgNum = -1;             /* inactive */
69                 segP->procState[i].resetState = false;
70                 segP->procState[i].procStruct = INVALID_OFFSET;
71         }
72 }
73
74 /*
75  * SIBackendInit
76  *              Initialize a new backend to operate on the sinval buffer
77  *
78  * Returns:
79  *         >0   A-OK
80  *              0       Failed to find a free procState slot (ie, MaxBackends exceeded)
81  *         <0   Some other failure (not currently used)
82  *
83  * NB: this routine, and all following ones, must be executed with the
84  * SInvalLock lock held, since there may be multiple backends trying
85  * to access the buffer.
86  */
87 int
88 SIBackendInit(SISeg *segP)
89 {
90         int                     index;
91         ProcState  *stateP = NULL;
92
93         /* Look for a free entry in the procState array */
94         for (index = 0; index < segP->lastBackend; index++)
95         {
96                 if (segP->procState[index].nextMsgNum < 0)              /* inactive slot? */
97                 {
98                         stateP = &segP->procState[index];
99                         break;
100                 }
101         }
102
103         if (stateP == NULL)
104         {
105                 if (segP->lastBackend < segP->maxBackends)
106                 {
107                         stateP = &segP->procState[segP->lastBackend];
108                         Assert(stateP->nextMsgNum < 0);
109                         segP->lastBackend++;
110                 }
111                 else
112                 {
113                         /* out of procState slots */
114                         MyBackendId = InvalidBackendId;
115                         return 0;
116                 }
117         }
118
119         MyBackendId = (stateP - &segP->procState[0]) + 1;
120
121 #ifdef  INVALIDDEBUG
122         elog(DEBUG, "SIBackendInit: backend id %d", MyBackendId);
123 #endif   /* INVALIDDEBUG */
124
125         /* mark myself active, with all extant messages already read */
126         stateP->nextMsgNum = segP->maxMsgNum;
127         stateP->resetState = false;
128         stateP->procStruct = MAKE_OFFSET(MyProc);
129
130         /* register exit routine to mark my entry inactive at exit */
131         on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
132
133         return 1;
134 }
135
136 /*
137  * CleanupInvalidationState
138  *              Mark the current backend as no longer active.
139  *
140  * This function is called via on_shmem_exit() during backend shutdown,
141  * so the caller has NOT acquired the lock for us.
142  *
143  * arg is really of type "SISeg*".
144  */
145 static void
146 CleanupInvalidationState(int status, Datum arg)
147 {
148         SISeg      *segP = (SISeg *) DatumGetPointer(arg);
149         int                     i;
150
151         Assert(PointerIsValid(segP));
152
153         LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
154
155         /* Mark myself inactive */
156         segP->procState[MyBackendId - 1].nextMsgNum = -1;
157         segP->procState[MyBackendId - 1].resetState = false;
158         segP->procState[MyBackendId - 1].procStruct = INVALID_OFFSET;
159
160         /* Recompute index of last active backend */
161         for (i = segP->lastBackend; i > 0; i--)
162         {
163                 if (segP->procState[i - 1].nextMsgNum >= 0)
164                         break;
165         }
166         segP->lastBackend = i;
167
168         LWLockRelease(SInvalLock);
169 }
170
171 /*
172  * SIInsertDataEntry
173  *              Add a new invalidation message to the buffer.
174  *
175  * If we are unable to insert the message because the buffer is full,
176  * then clear the buffer and assert the "reset" flag to each backend.
177  * This will cause all the backends to discard *all* invalidatable state.
178  *
179  * Returns true for normal successful insertion, false if had to reset.
180  */
181 bool
182 SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
183 {
184         int                     numMsgs = segP->maxMsgNum - segP->minMsgNum;
185
186         /* Is the buffer full? */
187         if (numMsgs >= MAXNUMMESSAGES)
188         {
189                 /*
190                  * Don't panic just yet: slowest backend might have consumed some
191                  * messages but not yet have done SIDelExpiredDataEntries() to
192                  * advance minMsgNum.  So, make sure minMsgNum is up-to-date.
193                  */
194                 SIDelExpiredDataEntries(segP);
195                 numMsgs = segP->maxMsgNum - segP->minMsgNum;
196                 if (numMsgs >= MAXNUMMESSAGES)
197                 {
198                         /* Yup, it's definitely full, no choice but to reset */
199                         SISetProcStateInvalid(segP);
200                         return false;
201                 }
202         }
203
204         /*
205          * Try to prevent table overflow.  When the table is 70% full send a
206          * WAKEN_CHILDREN request to the postmaster.  The postmaster will send
207          * a SIGUSR2 signal (ordinarily a NOTIFY signal) to all the backends.
208          * This will force idle backends to execute a transaction to look
209          * through pg_listener for NOTIFY messages, and as a byproduct of the
210          * transaction start they will read SI entries.
211          *
212          * This should never happen if all the backends are actively executing
213          * queries, but if a backend is sitting idle then it won't be starting
214          * transactions and so won't be reading SI entries.
215          *
216          * dz - 27 Jan 1998
217          */
218         if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
219                 IsUnderPostmaster)
220         {
221                 if (DebugLvl >= 1)
222                         elog(DEBUG, "SIInsertDataEntry: table is 70%% full, signaling postmaster");
223                 SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
224         }
225
226         /*
227          * Insert new message into proper slot of circular buffer
228          */
229         segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
230         segP->maxMsgNum++;
231
232         return true;
233 }
234
235 /*
236  * SISetProcStateInvalid
237  *              Flush pending messages from buffer, assert reset flag for each backend
238  *
239  * This is used only to recover from SI buffer overflow.
240  */
241 static void
242 SISetProcStateInvalid(SISeg *segP)
243 {
244         int                     i;
245
246         segP->minMsgNum = 0;
247         segP->maxMsgNum = 0;
248
249         for (i = 0; i < segP->lastBackend; i++)
250         {
251                 if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
252                 {
253                         segP->procState[i].resetState = true;
254                         segP->procState[i].nextMsgNum = 0;
255                 }
256         }
257 }
258
259 /*
260  * SIGetDataEntry
261  *              get next SI message for specified backend, if there is one
262  *
263  * Possible return values:
264  *      0: no SI message available
265  *      1: next SI message has been extracted into *data
266  *              (there may be more messages available after this one!)
267  * -1: SI reset message extracted
268  *
269  * NB: this can run in parallel with other instances of SIGetDataEntry
270  * executing on behalf of other backends.  See comments in sinval.c in
271  * ReceiveSharedInvalidMessages().
272  */
273 int
274 SIGetDataEntry(SISeg *segP, int backendId,
275                            SharedInvalidationMessage *data)
276 {
277         ProcState  *stateP = &segP->procState[backendId - 1];
278
279         if (stateP->resetState)
280         {
281                 /*
282                  * Force reset.  We can say we have dealt with any messages added
283                  * since the reset, as well...
284                  */
285                 stateP->resetState = false;
286                 stateP->nextMsgNum = segP->maxMsgNum;
287                 return -1;
288         }
289
290         if (stateP->nextMsgNum >= segP->maxMsgNum)
291                 return 0;                               /* nothing to read */
292
293         /*
294          * Retrieve message and advance my counter.
295          */
296         *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
297         stateP->nextMsgNum++;
298
299         /*
300          * There may be other backends that haven't read the message, so we
301          * cannot delete it here. SIDelExpiredDataEntries() should be called
302          * to remove dead messages.
303          */
304         return 1;                                       /* got a message */
305 }
306
307 /*
308  * SIDelExpiredDataEntries
309  *              Remove messages that have been consumed by all active backends
310  */
311 void
312 SIDelExpiredDataEntries(SISeg *segP)
313 {
314         int                     min,
315                                 i,
316                                 h;
317
318         min = segP->maxMsgNum;
319         if (min == segP->minMsgNum)
320                 return;                                 /* fast path if no messages exist */
321
322         /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
323
324         for (i = 0; i < segP->lastBackend; i++)
325         {
326                 h = segP->procState[i].nextMsgNum;
327                 if (h >= 0)
328                 {                                               /* backend active */
329                         if (h < min)
330                                 min = h;
331                 }
332         }
333         segP->minMsgNum = min;
334
335         /*
336          * When minMsgNum gets really large, decrement all message counters so
337          * as to forestall overflow of the counters.
338          */
339         if (min >= MSGNUMWRAPAROUND)
340         {
341                 segP->minMsgNum -= MSGNUMWRAPAROUND;
342                 segP->maxMsgNum -= MSGNUMWRAPAROUND;
343                 for (i = 0; i < segP->lastBackend; i++)
344                 {
345                         if (segP->procState[i].nextMsgNum >= 0)
346                                 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
347                 }
348         }
349 }