1 /*-------------------------------------------------------------------------
4 * POSTGRES shared cache invalidation communication code.
6 * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.52 2002/09/04 20:31:25 momjian Exp $
13 *-------------------------------------------------------------------------
18 #include "storage/proc.h"
19 #include "storage/sinval.h"
20 #include "storage/sinvaladt.h"
21 #include "utils/tqual.h"
22 #include "miscadmin.h"
25 /****************************************************************************/
26 /* CreateSharedInvalidationState() Initialize SI buffer */
28 /* should be called only by the POSTMASTER */
29 /****************************************************************************/
31 CreateSharedInvalidationState(int maxBackends)
33 /* SInvalLock must be initialized already, during LWLock init */
34 SIBufferInit(maxBackends);
38 * InitBackendSharedInvalidationState
39 * Initialize new backend's state info in buffer segment.
42 InitBackendSharedInvalidationState(void)
46 LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
47 flag = SIBackendInit(shmInvalBuffer);
48 LWLockRelease(SInvalLock);
49 if (flag < 0) /* unexpected problem */
50 elog(FATAL, "Backend cache invalidation initialization failed");
51 if (flag == 0) /* expected problem: MaxBackends exceeded */
52 elog(FATAL, "Sorry, too many clients already");
56 * SendSharedInvalidMessage
57 * Add a shared-cache-invalidation message to the global SI message queue.
60 SendSharedInvalidMessage(SharedInvalidationMessage *msg)
64 LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
65 insertOK = SIInsertDataEntry(shmInvalBuffer, msg);
66 LWLockRelease(SInvalLock);
68 elog(DEBUG3, "SendSharedInvalidMessage: SI buffer overflow");
72 * ReceiveSharedInvalidMessages
73 * Process shared-cache-invalidation messages waiting for this backend
76 ReceiveSharedInvalidMessages(
77 void (*invalFunction) (SharedInvalidationMessage *msg),
78 void (*resetFunction) (void))
80 SharedInvalidationMessage data;
82 bool gotMessage = false;
87 * We can run SIGetDataEntry in parallel with other backends
88 * running SIGetDataEntry for themselves, since each instance will
89 * modify only fields of its own backend's ProcState, and no
90 * instance will look at fields of other backends' ProcStates. We
91 * express this by grabbing SInvalLock in shared mode. Note that
92 * this is not exactly the normal (read-only) interpretation of a
93 * shared lock! Look closely at the interactions before allowing
94 * SInvalLock to be grabbed in shared mode for any other reason!
96 * The routines later in this file that use shared mode are okay with
97 * this, because they aren't looking at the ProcState fields
98 * associated with SI message transfer; they only use the
99 * ProcState array as an easy way to find all the PGPROC
102 LWLockAcquire(SInvalLock, LW_SHARED);
103 getResult = SIGetDataEntry(shmInvalBuffer, MyBackendId, &data);
104 LWLockRelease(SInvalLock);
107 break; /* nothing more to do */
110 /* got a reset message */
111 elog(DEBUG3, "ReceiveSharedInvalidMessages: cache state reset");
116 /* got a normal data message */
117 invalFunction(&data);
122 /* If we got any messages, try to release dead messages */
125 LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
126 SIDelExpiredDataEntries(shmInvalBuffer);
127 LWLockRelease(SInvalLock);
132 /****************************************************************************/
133 /* Functions that need to scan the PGPROC structures of all running backends. */
134 /* It's a bit strange to keep these in sinval.c, since they don't have any */
135 /* direct relationship to shared-cache invalidation. But the procState */
136 /* array in the SI segment is the only place in the system where we have */
137 /* an array of per-backend data, so it is the most convenient place to keep */
138 /* pointers to the backends' PGPROC structures. We used to implement these */
139 /* functions with a slow, ugly search through the ShmemIndex hash table --- */
140 /* now they are simple loops over the SI ProcState array. */
141 /****************************************************************************/
145 * DatabaseHasActiveBackends -- are there any backends running in the given DB
147 * If 'ignoreMyself' is TRUE, ignore this particular backend while checking
148 * for backends in the target database.
150 * This function is used to interlock DROP DATABASE against there being
151 * any active backends in the target DB --- dropping the DB while active
152 * backends remain would be a Bad Thing. Note that we cannot detect here
153 * the possibility of a newly-started backend that is trying to connect
154 * to the doomed database, so additional interlocking is needed during
159 DatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself)
162 SISeg *segP = shmInvalBuffer;
163 ProcState *stateP = segP->procState;
166 LWLockAcquire(SInvalLock, LW_SHARED);
168 for (index = 0; index < segP->lastBackend; index++)
170 SHMEM_OFFSET pOffset = stateP[index].procStruct;
172 if (pOffset != INVALID_OFFSET)
174 PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
176 if (proc->databaseId == databaseId)
178 if (ignoreMyself && proc == MyProc)
187 LWLockRelease(SInvalLock);
193 * TransactionIdIsInProgress -- is given transaction running by some backend
196 TransactionIdIsInProgress(TransactionId xid)
199 SISeg *segP = shmInvalBuffer;
200 ProcState *stateP = segP->procState;
203 LWLockAcquire(SInvalLock, LW_SHARED);
205 for (index = 0; index < segP->lastBackend; index++)
207 SHMEM_OFFSET pOffset = stateP[index].procStruct;
209 if (pOffset != INVALID_OFFSET)
211 PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
213 /* Fetch xid just once - see GetNewTransactionId */
214 TransactionId pxid = proc->xid;
216 if (TransactionIdEquals(pxid, xid))
224 LWLockRelease(SInvalLock);
230 * GetOldestXmin -- returns oldest transaction that was running
231 * when any current transaction was started.
233 * If allDbs is TRUE then all backends are considered; if allDbs is FALSE
234 * then only backends running in my own database are considered.
236 * This is used by VACUUM to decide which deleted tuples must be preserved
237 * in a table. allDbs = TRUE is needed for shared relations, but allDbs =
238 * FALSE is sufficient for non-shared relations, since only backends in my
239 * own database could ever see the tuples in them.
241 * Note: we include the currently running xids in the set of considered xids.
242 * This ensures that if a just-started xact has not yet set its snapshot,
243 * when it does set the snapshot it cannot set xmin less than what we compute.
246 GetOldestXmin(bool allDbs)
248 SISeg *segP = shmInvalBuffer;
249 ProcState *stateP = segP->procState;
250 TransactionId result;
253 result = GetCurrentTransactionId();
255 LWLockAcquire(SInvalLock, LW_SHARED);
257 for (index = 0; index < segP->lastBackend; index++)
259 SHMEM_OFFSET pOffset = stateP[index].procStruct;
261 if (pOffset != INVALID_OFFSET)
263 PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
265 if (allDbs || proc->databaseId == MyDatabaseId)
267 /* Fetch xid just once - see GetNewTransactionId */
268 TransactionId xid = proc->xid;
270 if (TransactionIdIsNormal(xid))
272 if (TransactionIdPrecedes(xid, result))
275 if (TransactionIdIsNormal(xid))
276 if (TransactionIdPrecedes(xid, result))
283 LWLockRelease(SInvalLock);
289 * GetSnapshotData -- returns information about running transactions.
291 * The returned snapshot includes xmin (lowest still-running xact ID),
292 * xmax (next xact ID to be assigned), and a list of running xact IDs
293 * in the range xmin <= xid < xmax. It is used as follows:
294 * All xact IDs < xmin are considered finished.
295 * All xact IDs >= xmax are considered still running.
296 * For an xact ID xmin <= xid < xmax, consult list to see whether
297 * it is considered running or not.
298 * This ensures that the set of transactions seen as "running" by the
299 * current xact will not change after it takes the snapshot.
301 * Also, we compute the current global xmin (oldest xmin across all running
302 * transactions) and save it in RecentGlobalXmin. This is the same
303 * computation done by GetOldestXmin(TRUE).
307 GetSnapshotData(bool serializable)
309 Snapshot snapshot = (Snapshot) malloc(sizeof(SnapshotData));
310 SISeg *segP = shmInvalBuffer;
311 ProcState *stateP = segP->procState;
314 TransactionId globalxmin;
318 if (snapshot == NULL)
319 elog(ERROR, "Memory exhausted in GetSnapshotData");
322 * Allocating space for MaxBackends xids is usually overkill;
323 * lastBackend would be sufficient. But it seems better to do the
324 * malloc while not holding the lock, so we can't look at lastBackend.
326 snapshot->xip = (TransactionId *)
327 malloc(MaxBackends * sizeof(TransactionId));
328 if (snapshot->xip == NULL)
329 elog(ERROR, "Memory exhausted in GetSnapshotData");
331 globalxmin = xmin = GetCurrentTransactionId();
334 * If we are going to set MyProc->xmin then we'd better get exclusive
335 * lock; if not, this is a read-only operation so it can be shared.
337 LWLockAcquire(SInvalLock, serializable ? LW_EXCLUSIVE : LW_SHARED);
339 /*--------------------
340 * Unfortunately, we have to call ReadNewTransactionId() after acquiring
341 * SInvalLock above. It's not good because ReadNewTransactionId() does
342 * LWLockAcquire(XidGenLock), but *necessary*. We need to be sure that
343 * no transactions exit the set of currently-running transactions
344 * between the time we fetch xmax and the time we finish building our
345 * snapshot. Otherwise we could have a situation like this:
347 * 1. Tx Old is running (in Read Committed mode).
348 * 2. Tx S reads new transaction ID into xmax, then
349 * is swapped out before acquiring SInvalLock.
350 * 3. Tx New gets new transaction ID (>= S' xmax),
351 * makes changes and commits.
352 * 4. Tx Old changes some row R changed by Tx New and commits.
353 * 5. Tx S finishes getting its snapshot data. It sees Tx Old as
354 * done, but sees Tx New as still running (since New >= xmax).
356 * Now S will see R changed by both Tx Old and Tx New, *but* does not
357 * see other changes made by Tx New. If S is supposed to be in
358 * Serializable mode, this is wrong.
360 * By locking SInvalLock before we read xmax, we ensure that TX Old
361 * cannot exit the set of running transactions seen by Tx S. Therefore
362 * both Old and New will be seen as still running => no inconsistency.
363 *--------------------
366 xmax = ReadNewTransactionId();
368 for (index = 0; index < segP->lastBackend; index++)
370 SHMEM_OFFSET pOffset = stateP[index].procStruct;
372 if (pOffset != INVALID_OFFSET)
374 PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
376 /* Fetch xid just once - see GetNewTransactionId */
377 TransactionId xid = proc->xid;
380 * Ignore my own proc (dealt with my xid above), procs not
381 * running a transaction, and xacts started since we read the
382 * next transaction ID. There's no need to store XIDs above
383 * what we got from ReadNewTransactionId, since we'll treat
384 * them as running anyway. We also assume that such xacts
385 * can't compute an xmin older than ours, so they needn't be
386 * considered in computing globalxmin.
388 if (proc == MyProc ||
389 !TransactionIdIsNormal(xid) ||
390 TransactionIdFollowsOrEquals(xid, xmax))
393 if (TransactionIdPrecedes(xid, xmin))
395 snapshot->xip[count] = xid;
398 /* Update globalxmin to be the smallest valid xmin */
400 if (TransactionIdIsNormal(xid))
401 if (TransactionIdPrecedes(xid, globalxmin))
409 LWLockRelease(SInvalLock);
411 /* Serializable snapshot must be computed before any other... */
412 Assert(TransactionIdIsValid(MyProc->xmin));
415 * Update globalxmin to include actual process xids. This is a
416 * slightly different way of computing it than GetOldestXmin uses, but
417 * should give the same result.
419 if (TransactionIdPrecedes(xmin, globalxmin))
422 RecentGlobalXmin = globalxmin;
424 snapshot->xmin = xmin;
425 snapshot->xmax = xmax;
426 snapshot->xcnt = count;
428 snapshot->curcid = GetCurrentCommandId();
434 * CountActiveBackends --- count backends (other than myself) that are in
435 * active transactions. This is used as a heuristic to decide if
436 * a pre-XLOG-flush delay is worthwhile during commit.
438 * An active transaction is something that has written at least one XLOG
439 * record; read-only transactions don't count. Also, do not count backends
440 * that are blocked waiting for locks, since they are not going to get to
441 * run until someone else commits.
444 CountActiveBackends(void)
446 SISeg *segP = shmInvalBuffer;
447 ProcState *stateP = segP->procState;
452 * Note: for speed, we don't acquire SInvalLock. This is a little bit
453 * bogus, but since we are only testing xrecoff for zero or nonzero,
454 * it should be OK. The result is only used for heuristic purposes
457 for (index = 0; index < segP->lastBackend; index++)
459 SHMEM_OFFSET pOffset = stateP[index].procStruct;
461 if (pOffset != INVALID_OFFSET)
463 PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
466 continue; /* do not count myself */
467 if (proc->logRec.xrecoff == 0)
468 continue; /* do not count if not in a transaction */
469 if (proc->waitLock != NULL)
470 continue; /* do not count if blocked on a lock */
479 * GetUndoRecPtr -- returns oldest PGPROC->logRec.
484 SISeg *segP = shmInvalBuffer;
485 ProcState *stateP = segP->procState;
486 XLogRecPtr urec = {0, 0};
490 LWLockAcquire(SInvalLock, LW_SHARED);
492 for (index = 0; index < segP->lastBackend; index++)
494 SHMEM_OFFSET pOffset = stateP[index].procStruct;
496 if (pOffset != INVALID_OFFSET)
498 PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
500 tempr = proc->logRec;
501 if (tempr.xrecoff == 0)
503 if (urec.xrecoff != 0 && XLByteLT(urec, tempr))
509 LWLockRelease(SInvalLock);
515 * BackendIdGetProc - given a BackendId, find its PGPROC structure
517 * This is a trivial lookup in the ProcState array. We assume that the caller
518 * knows that the backend isn't going to go away, so we do not bother with
522 BackendIdGetProc(BackendId procId)
524 SISeg *segP = shmInvalBuffer;
526 if (procId > 0 && procId <= segP->lastBackend)
528 ProcState *stateP = &segP->procState[procId - 1];
529 SHMEM_OFFSET pOffset = stateP->procStruct;
531 if (pOffset != INVALID_OFFSET)
533 PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
543 * CountEmptyBackendSlots - count empty slots in backend process table
545 * Doesn't count since the procState array could be large and we've already
546 * allowed for that by running a freeBackends counter in the SI segment.
547 * Unlike CountActiveBackends() we do not need to interrogate the
548 * backends to determine the free slot count.
549 * Goes for a lock despite being a trival look up in case other backends
550 * are busy starting or exiting since there is scope for confusion.
553 CountEmptyBackendSlots(void)
557 LWLockAcquire(SInvalLock, LW_SHARED);
559 count = shmInvalBuffer->freeBackends;
561 LWLockRelease(SInvalLock);