]> granicus.if.org Git - postgresql/blob - src/backend/storage/ipc/sinval.c
87f7a2924523f542608ac643473a5e2a929696bb
[postgresql] / src / backend / storage / ipc / sinval.c
1 /*-------------------------------------------------------------------------
2  *
3  * sinval.c
4  *        POSTGRES shared cache invalidation communication code.
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.52 2002/09/04 20:31:25 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17
18 #include "storage/proc.h"
19 #include "storage/sinval.h"
20 #include "storage/sinvaladt.h"
21 #include "utils/tqual.h"
22 #include "miscadmin.h"
23
24
25 /****************************************************************************/
26 /*      CreateSharedInvalidationState()          Initialize SI buffer                           */
27 /*                                                                                                                                                      */
28 /*      should be called only by the POSTMASTER                                                                 */
29 /****************************************************************************/
30 void
31 CreateSharedInvalidationState(int maxBackends)
32 {
33         /* SInvalLock must be initialized already, during LWLock init */
34         SIBufferInit(maxBackends);
35 }
36
37 /*
38  * InitBackendSharedInvalidationState
39  *              Initialize new backend's state info in buffer segment.
40  */
41 void
42 InitBackendSharedInvalidationState(void)
43 {
44         int                     flag;
45
46         LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
47         flag = SIBackendInit(shmInvalBuffer);
48         LWLockRelease(SInvalLock);
49         if (flag < 0)                           /* unexpected problem */
50                 elog(FATAL, "Backend cache invalidation initialization failed");
51         if (flag == 0)                          /* expected problem: MaxBackends exceeded */
52                 elog(FATAL, "Sorry, too many clients already");
53 }
54
55 /*
56  * SendSharedInvalidMessage
57  *      Add a shared-cache-invalidation message to the global SI message queue.
58  */
59 void
60 SendSharedInvalidMessage(SharedInvalidationMessage *msg)
61 {
62         bool            insertOK;
63
64         LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
65         insertOK = SIInsertDataEntry(shmInvalBuffer, msg);
66         LWLockRelease(SInvalLock);
67         if (!insertOK)
68                 elog(DEBUG3, "SendSharedInvalidMessage: SI buffer overflow");
69 }
70
71 /*
72  * ReceiveSharedInvalidMessages
73  *              Process shared-cache-invalidation messages waiting for this backend
74  */
75 void
76 ReceiveSharedInvalidMessages(
77                                   void (*invalFunction) (SharedInvalidationMessage *msg),
78                                                          void (*resetFunction) (void))
79 {
80         SharedInvalidationMessage data;
81         int                     getResult;
82         bool            gotMessage = false;
83
84         for (;;)
85         {
86                 /*
87                  * We can run SIGetDataEntry in parallel with other backends
88                  * running SIGetDataEntry for themselves, since each instance will
89                  * modify only fields of its own backend's ProcState, and no
90                  * instance will look at fields of other backends' ProcStates.  We
91                  * express this by grabbing SInvalLock in shared mode.  Note that
92                  * this is not exactly the normal (read-only) interpretation of a
93                  * shared lock! Look closely at the interactions before allowing
94                  * SInvalLock to be grabbed in shared mode for any other reason!
95                  *
96                  * The routines later in this file that use shared mode are okay with
97                  * this, because they aren't looking at the ProcState fields
98                  * associated with SI message transfer; they only use the
99                  * ProcState array as an easy way to find all the PGPROC
100                  * structures.
101                  */
102                 LWLockAcquire(SInvalLock, LW_SHARED);
103                 getResult = SIGetDataEntry(shmInvalBuffer, MyBackendId, &data);
104                 LWLockRelease(SInvalLock);
105
106                 if (getResult == 0)
107                         break;                          /* nothing more to do */
108                 if (getResult < 0)
109                 {
110                         /* got a reset message */
111                         elog(DEBUG3, "ReceiveSharedInvalidMessages: cache state reset");
112                         resetFunction();
113                 }
114                 else
115                 {
116                         /* got a normal data message */
117                         invalFunction(&data);
118                 }
119                 gotMessage = true;
120         }
121
122         /* If we got any messages, try to release dead messages */
123         if (gotMessage)
124         {
125                 LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
126                 SIDelExpiredDataEntries(shmInvalBuffer);
127                 LWLockRelease(SInvalLock);
128         }
129 }
130
131
132 /****************************************************************************/
133 /* Functions that need to scan the PGPROC structures of all running backends. */
134 /* It's a bit strange to keep these in sinval.c, since they don't have any      */
135 /* direct relationship to shared-cache invalidation.  But the procState         */
136 /* array in the SI segment is the only place in the system where we have        */
137 /* an array of per-backend data, so it is the most convenient place to keep */
138 /* pointers to the backends' PGPROC structures.  We used to implement these     */
139 /* functions with a slow, ugly search through the ShmemIndex hash table --- */
140 /* now they are simple loops over the SI ProcState array.                                       */
141 /****************************************************************************/
142
143
144 /*
145  * DatabaseHasActiveBackends -- are there any backends running in the given DB
146  *
147  * If 'ignoreMyself' is TRUE, ignore this particular backend while checking
148  * for backends in the target database.
149  *
150  * This function is used to interlock DROP DATABASE against there being
151  * any active backends in the target DB --- dropping the DB while active
152  * backends remain would be a Bad Thing.  Note that we cannot detect here
153  * the possibility of a newly-started backend that is trying to connect
154  * to the doomed database, so additional interlocking is needed during
155  * backend startup.
156  */
157
158 bool
159 DatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself)
160 {
161         bool            result = false;
162         SISeg      *segP = shmInvalBuffer;
163         ProcState  *stateP = segP->procState;
164         int                     index;
165
166         LWLockAcquire(SInvalLock, LW_SHARED);
167
168         for (index = 0; index < segP->lastBackend; index++)
169         {
170                 SHMEM_OFFSET pOffset = stateP[index].procStruct;
171
172                 if (pOffset != INVALID_OFFSET)
173                 {
174                         PGPROC     *proc = (PGPROC *) MAKE_PTR(pOffset);
175
176                         if (proc->databaseId == databaseId)
177                         {
178                                 if (ignoreMyself && proc == MyProc)
179                                         continue;
180
181                                 result = true;
182                                 break;
183                         }
184                 }
185         }
186
187         LWLockRelease(SInvalLock);
188
189         return result;
190 }
191
192 /*
193  * TransactionIdIsInProgress -- is given transaction running by some backend
194  */
195 bool
196 TransactionIdIsInProgress(TransactionId xid)
197 {
198         bool            result = false;
199         SISeg      *segP = shmInvalBuffer;
200         ProcState  *stateP = segP->procState;
201         int                     index;
202
203         LWLockAcquire(SInvalLock, LW_SHARED);
204
205         for (index = 0; index < segP->lastBackend; index++)
206         {
207                 SHMEM_OFFSET pOffset = stateP[index].procStruct;
208
209                 if (pOffset != INVALID_OFFSET)
210                 {
211                         PGPROC     *proc = (PGPROC *) MAKE_PTR(pOffset);
212
213                         /* Fetch xid just once - see GetNewTransactionId */
214                         TransactionId pxid = proc->xid;
215
216                         if (TransactionIdEquals(pxid, xid))
217                         {
218                                 result = true;
219                                 break;
220                         }
221                 }
222         }
223
224         LWLockRelease(SInvalLock);
225
226         return result;
227 }
228
229 /*
230  * GetOldestXmin -- returns oldest transaction that was running
231  *                                      when any current transaction was started.
232  *
233  * If allDbs is TRUE then all backends are considered; if allDbs is FALSE
234  * then only backends running in my own database are considered.
235  *
236  * This is used by VACUUM to decide which deleted tuples must be preserved
237  * in a table.  allDbs = TRUE is needed for shared relations, but allDbs =
238  * FALSE is sufficient for non-shared relations, since only backends in my
239  * own database could ever see the tuples in them.
240  *
241  * Note: we include the currently running xids in the set of considered xids.
242  * This ensures that if a just-started xact has not yet set its snapshot,
243  * when it does set the snapshot it cannot set xmin less than what we compute.
244  */
245 TransactionId
246 GetOldestXmin(bool allDbs)
247 {
248         SISeg      *segP = shmInvalBuffer;
249         ProcState  *stateP = segP->procState;
250         TransactionId result;
251         int                     index;
252
253         result = GetCurrentTransactionId();
254
255         LWLockAcquire(SInvalLock, LW_SHARED);
256
257         for (index = 0; index < segP->lastBackend; index++)
258         {
259                 SHMEM_OFFSET pOffset = stateP[index].procStruct;
260
261                 if (pOffset != INVALID_OFFSET)
262                 {
263                         PGPROC     *proc = (PGPROC *) MAKE_PTR(pOffset);
264
265                         if (allDbs || proc->databaseId == MyDatabaseId)
266                         {
267                                 /* Fetch xid just once - see GetNewTransactionId */
268                                 TransactionId xid = proc->xid;
269
270                                 if (TransactionIdIsNormal(xid))
271                                 {
272                                         if (TransactionIdPrecedes(xid, result))
273                                                 result = xid;
274                                         xid = proc->xmin;
275                                         if (TransactionIdIsNormal(xid))
276                                                 if (TransactionIdPrecedes(xid, result))
277                                                         result = xid;
278                                 }
279                         }
280                 }
281         }
282
283         LWLockRelease(SInvalLock);
284
285         return result;
286 }
287
288 /*----------
289  * GetSnapshotData -- returns information about running transactions.
290  *
291  * The returned snapshot includes xmin (lowest still-running xact ID),
292  * xmax (next xact ID to be assigned), and a list of running xact IDs
293  * in the range xmin <= xid < xmax.  It is used as follows:
294  *              All xact IDs < xmin are considered finished.
295  *              All xact IDs >= xmax are considered still running.
296  *              For an xact ID xmin <= xid < xmax, consult list to see whether
297  *              it is considered running or not.
298  * This ensures that the set of transactions seen as "running" by the
299  * current xact will not change after it takes the snapshot.
300  *
301  * Also, we compute the current global xmin (oldest xmin across all running
302  * transactions) and save it in RecentGlobalXmin.  This is the same
303  * computation done by GetOldestXmin(TRUE).
304  *----------
305  */
306 Snapshot
307 GetSnapshotData(bool serializable)
308 {
309         Snapshot        snapshot = (Snapshot) malloc(sizeof(SnapshotData));
310         SISeg      *segP = shmInvalBuffer;
311         ProcState  *stateP = segP->procState;
312         TransactionId xmin;
313         TransactionId xmax;
314         TransactionId globalxmin;
315         int                     index;
316         int                     count = 0;
317
318         if (snapshot == NULL)
319                 elog(ERROR, "Memory exhausted in GetSnapshotData");
320
321         /*
322          * Allocating space for MaxBackends xids is usually overkill;
323          * lastBackend would be sufficient.  But it seems better to do the
324          * malloc while not holding the lock, so we can't look at lastBackend.
325          */
326         snapshot->xip = (TransactionId *)
327                 malloc(MaxBackends * sizeof(TransactionId));
328         if (snapshot->xip == NULL)
329                 elog(ERROR, "Memory exhausted in GetSnapshotData");
330
331         globalxmin = xmin = GetCurrentTransactionId();
332
333         /*
334          * If we are going to set MyProc->xmin then we'd better get exclusive
335          * lock; if not, this is a read-only operation so it can be shared.
336          */
337         LWLockAcquire(SInvalLock, serializable ? LW_EXCLUSIVE : LW_SHARED);
338
339         /*--------------------
340          * Unfortunately, we have to call ReadNewTransactionId() after acquiring
341          * SInvalLock above.  It's not good because ReadNewTransactionId() does
342          * LWLockAcquire(XidGenLock), but *necessary*.  We need to be sure that
343          * no transactions exit the set of currently-running transactions
344          * between the time we fetch xmax and the time we finish building our
345          * snapshot.  Otherwise we could have a situation like this:
346          *
347          *              1. Tx Old is running (in Read Committed mode).
348          *              2. Tx S reads new transaction ID into xmax, then
349          *                 is swapped out before acquiring SInvalLock.
350          *              3. Tx New gets new transaction ID (>= S' xmax),
351          *                 makes changes and commits.
352          *              4. Tx Old changes some row R changed by Tx New and commits.
353          *              5. Tx S finishes getting its snapshot data.  It sees Tx Old as
354          *                 done, but sees Tx New as still running (since New >= xmax).
355          *
356          * Now S will see R changed by both Tx Old and Tx New, *but* does not
357          * see other changes made by Tx New.  If S is supposed to be in
358          * Serializable mode, this is wrong.
359          *
360          * By locking SInvalLock before we read xmax, we ensure that TX Old
361          * cannot exit the set of running transactions seen by Tx S.  Therefore
362          * both Old and New will be seen as still running => no inconsistency.
363          *--------------------
364          */
365
366         xmax = ReadNewTransactionId();
367
368         for (index = 0; index < segP->lastBackend; index++)
369         {
370                 SHMEM_OFFSET pOffset = stateP[index].procStruct;
371
372                 if (pOffset != INVALID_OFFSET)
373                 {
374                         PGPROC     *proc = (PGPROC *) MAKE_PTR(pOffset);
375
376                         /* Fetch xid just once - see GetNewTransactionId */
377                         TransactionId xid = proc->xid;
378
379                         /*
380                          * Ignore my own proc (dealt with my xid above), procs not
381                          * running a transaction, and xacts started since we read the
382                          * next transaction ID.  There's no need to store XIDs above
383                          * what we got from ReadNewTransactionId, since we'll treat
384                          * them as running anyway.      We also assume that such xacts
385                          * can't compute an xmin older than ours, so they needn't be
386                          * considered in computing globalxmin.
387                          */
388                         if (proc == MyProc ||
389                                 !TransactionIdIsNormal(xid) ||
390                                 TransactionIdFollowsOrEquals(xid, xmax))
391                                 continue;
392
393                         if (TransactionIdPrecedes(xid, xmin))
394                                 xmin = xid;
395                         snapshot->xip[count] = xid;
396                         count++;
397
398                         /* Update globalxmin to be the smallest valid xmin */
399                         xid = proc->xmin;
400                         if (TransactionIdIsNormal(xid))
401                                 if (TransactionIdPrecedes(xid, globalxmin))
402                                         globalxmin = xid;
403                 }
404         }
405
406         if (serializable)
407                 MyProc->xmin = xmin;
408
409         LWLockRelease(SInvalLock);
410
411         /* Serializable snapshot must be computed before any other... */
412         Assert(TransactionIdIsValid(MyProc->xmin));
413
414         /*
415          * Update globalxmin to include actual process xids.  This is a
416          * slightly different way of computing it than GetOldestXmin uses, but
417          * should give the same result.
418          */
419         if (TransactionIdPrecedes(xmin, globalxmin))
420                 globalxmin = xmin;
421
422         RecentGlobalXmin = globalxmin;
423
424         snapshot->xmin = xmin;
425         snapshot->xmax = xmax;
426         snapshot->xcnt = count;
427
428         snapshot->curcid = GetCurrentCommandId();
429
430         return snapshot;
431 }
432
433 /*
434  * CountActiveBackends --- count backends (other than myself) that are in
435  *              active transactions.  This is used as a heuristic to decide if
436  *              a pre-XLOG-flush delay is worthwhile during commit.
437  *
438  * An active transaction is something that has written at least one XLOG
439  * record; read-only transactions don't count.  Also, do not count backends
440  * that are blocked waiting for locks, since they are not going to get to
441  * run until someone else commits.
442  */
443 int
444 CountActiveBackends(void)
445 {
446         SISeg      *segP = shmInvalBuffer;
447         ProcState  *stateP = segP->procState;
448         int                     count = 0;
449         int                     index;
450
451         /*
452          * Note: for speed, we don't acquire SInvalLock.  This is a little bit
453          * bogus, but since we are only testing xrecoff for zero or nonzero,
454          * it should be OK.  The result is only used for heuristic purposes
455          * anyway...
456          */
457         for (index = 0; index < segP->lastBackend; index++)
458         {
459                 SHMEM_OFFSET pOffset = stateP[index].procStruct;
460
461                 if (pOffset != INVALID_OFFSET)
462                 {
463                         PGPROC     *proc = (PGPROC *) MAKE_PTR(pOffset);
464
465                         if (proc == MyProc)
466                                 continue;               /* do not count myself */
467                         if (proc->logRec.xrecoff == 0)
468                                 continue;               /* do not count if not in a transaction */
469                         if (proc->waitLock != NULL)
470                                 continue;               /* do not count if blocked on a lock */
471                         count++;
472                 }
473         }
474
475         return count;
476 }
477
478 /*
479  * GetUndoRecPtr -- returns oldest PGPROC->logRec.
480  */
481 XLogRecPtr
482 GetUndoRecPtr(void)
483 {
484         SISeg      *segP = shmInvalBuffer;
485         ProcState  *stateP = segP->procState;
486         XLogRecPtr      urec = {0, 0};
487         XLogRecPtr      tempr;
488         int                     index;
489
490         LWLockAcquire(SInvalLock, LW_SHARED);
491
492         for (index = 0; index < segP->lastBackend; index++)
493         {
494                 SHMEM_OFFSET pOffset = stateP[index].procStruct;
495
496                 if (pOffset != INVALID_OFFSET)
497                 {
498                         PGPROC     *proc = (PGPROC *) MAKE_PTR(pOffset);
499
500                         tempr = proc->logRec;
501                         if (tempr.xrecoff == 0)
502                                 continue;
503                         if (urec.xrecoff != 0 && XLByteLT(urec, tempr))
504                                 continue;
505                         urec = tempr;
506                 }
507         }
508
509         LWLockRelease(SInvalLock);
510
511         return (urec);
512 }
513
514 /*
515  * BackendIdGetProc - given a BackendId, find its PGPROC structure
516  *
517  * This is a trivial lookup in the ProcState array.  We assume that the caller
518  * knows that the backend isn't going to go away, so we do not bother with
519  * locking.
520  */
521 struct PGPROC *
522 BackendIdGetProc(BackendId procId)
523 {
524         SISeg      *segP = shmInvalBuffer;
525
526         if (procId > 0 && procId <= segP->lastBackend)
527         {
528                 ProcState  *stateP = &segP->procState[procId - 1];
529                 SHMEM_OFFSET pOffset = stateP->procStruct;
530
531                 if (pOffset != INVALID_OFFSET)
532                 {
533                         PGPROC     *proc = (PGPROC *) MAKE_PTR(pOffset);
534
535                         return proc;
536                 }
537         }
538
539         return NULL;
540 }
541
542 /*
543  * CountEmptyBackendSlots - count empty slots in backend process table
544  *
545  * Doesn't count since the procState array could be large and we've already
546  * allowed for that by running a freeBackends counter in the SI segment.
547  * Unlike CountActiveBackends() we do not need to interrogate the
548  * backends to determine the free slot count.
549  * Goes for a lock despite being a trival look up in case other backends
550  * are busy starting or exiting since there is scope for confusion.
551  */
552 int
553 CountEmptyBackendSlots(void)
554 {
555         int                     count;
556
557         LWLockAcquire(SInvalLock, LW_SHARED);
558
559         count = shmInvalBuffer->freeBackends;
560
561         LWLockRelease(SInvalLock);
562
563         return count;
564 }