static TransactionId KnownAssignedXidsGetOldestXmin(void);
static void KnownAssignedXidsDisplay(int trace_level);
static void KnownAssignedXidsReset(void);
+static inline void ProcArrayEndTransactionInternal(PGPROC *proc,
+ PGXACT *pgxact, TransactionId latestXid);
+static void ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid);
/*
* Report shared-memory space needed by CreateSharedProcArray.
*/
Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-
- pgxact->xid = InvalidTransactionId;
- proc->lxid = InvalidLocalTransactionId;
- pgxact->xmin = InvalidTransactionId;
- /* must be cleared with xid/xmin: */
- pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
- pgxact->delayChkpt = false; /* be sure this is cleared in abort */
- proc->recoveryConflictPending = false;
-
- /* Clear the subtransaction-XID cache too while holding the lock */
- pgxact->nxids = 0;
- pgxact->overflowed = false;
-
- /* Also advance global latestCompletedXid while holding the lock */
- if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
- latestXid))
- ShmemVariableCache->latestCompletedXid = latestXid;
-
- LWLockRelease(ProcArrayLock);
+ /*
+ * If we can immediately acquire ProcArrayLock, we clear our own XID
+ * and release the lock. If not, use group XID clearing to improve
+ * efficiency.
+ */
+ if (LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE))
+ {
+ ProcArrayEndTransactionInternal(proc, pgxact, latestXid);
+ LWLockRelease(ProcArrayLock);
+ }
+ else
+ ProcArrayGroupClearXid(proc, latestXid);
}
else
{
}
}
+/*
+ * Mark a write transaction as no longer running.
+ *
+ * We don't do any locking here; caller must handle that.
+ */
+static inline void
+ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
+ TransactionId latestXid)
+{
+ pgxact->xid = InvalidTransactionId;
+ proc->lxid = InvalidLocalTransactionId;
+ pgxact->xmin = InvalidTransactionId;
+ /* must be cleared with xid/xmin: */
+ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+ pgxact->delayChkpt = false; /* be sure this is cleared in abort */
+ proc->recoveryConflictPending = false;
+
+ /* Clear the subtransaction-XID cache too while holding the lock */
+ pgxact->nxids = 0;
+ pgxact->overflowed = false;
+
+ /* Also advance global latestCompletedXid while holding the lock */
+ if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
+ latestXid))
+ ShmemVariableCache->latestCompletedXid = latestXid;
+}
+
+/*
+ * ProcArrayGroupClearXid -- group XID clearing
+ *
+ * When we cannot immediately acquire ProcArrayLock in exclusive mode at
+ * commit time, add ourselves to a list of processes that need their XIDs
+ * cleared. The first process to add itself to the list will acquire
+ * ProcArrayLock in exclusive mode and perform ProcArrayEndTransactionInternal
+ * on behalf of all group members. This avoids a great deal of context
+ * switching when many processes are trying to commit at once, since the lock
+ * only needs to be handed from the last share-locker to one process waiting
+ * for the exclusive lock, rather than to each one in turn.
+ */
+static void
+ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid)
+{
+ volatile PROC_HDR *procglobal = ProcGlobal;
+ uint32 nextidx;
+ uint32 wakeidx;
+ int extraWaits = -1;
+
+ /* We should definitely have an XID to clear. */
+ Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
+
+ /* Add ourselves to the list of processes needing a group XID clear. */
+ proc->backendLatestXid = latestXid;
+ while (true)
+ {
+ nextidx = pg_atomic_read_u32(&procglobal->nextClearXidElem);
+ pg_atomic_write_u32(&proc->nextClearXidElem, nextidx);
+
+ if (pg_atomic_compare_exchange_u32(&procglobal->nextClearXidElem,
+ &nextidx,
+ (uint32) proc->pgprocno))
+ break;
+ }
+
+ /* If the list was not empty, the leader will clear our XID. */
+ if (nextidx != INVALID_PGPROCNO)
+ {
+ /* Sleep until the leader clears our XID. */
+ while (pg_atomic_read_u32(&proc->nextClearXidElem) != INVALID_PGPROCNO)
+ {
+ extraWaits++;
+ PGSemaphoreLock(&proc->sem);
+ }
+
+ /* Fix semaphore count for any absorbed wakeups */
+ while (extraWaits-- > 0)
+ PGSemaphoreUnlock(&proc->sem);
+ return;
+ }
+
+ /* We are the leader. Acquire the lock on behalf of everyone. */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ /*
+ * Now that we've got the lock, clear the list of processes waiting for
+ * group XID clearing, saving a pointer to the head of the list.
+ */
+ while (true)
+ {
+ nextidx = pg_atomic_read_u32(&procglobal->nextClearXidElem);
+ if (pg_atomic_compare_exchange_u32(&procglobal->nextClearXidElem,
+ &nextidx,
+ INVALID_PGPROCNO))
+ break;
+ }
+
+ /* Remember head of list so we can perform wakeups after dropping lock. */
+ wakeidx = nextidx;
+
+ /* Walk the list and clear all XIDs. */
+ while (nextidx != INVALID_PGPROCNO)
+ {
+ PGPROC *proc = &allProcs[nextidx];
+ PGXACT *pgxact = &allPgXact[nextidx];
+
+ ProcArrayEndTransactionInternal(proc, pgxact, proc->backendLatestXid);
+
+ /* Move to next proc in list. */
+ nextidx = pg_atomic_read_u32(&proc->nextClearXidElem);
+ }
+
+ /* We're done with the lock now. */
+ LWLockRelease(ProcArrayLock);
+
+ /*
+ * Now that we've released the lock, go back and wake everybody up. We
+ * don't do this under the lock so as to keep lock hold times to a
+ * minimum. The system calls we need to perform to wake other processes
+ * up are probably much slower than the simple memory writes we did while
+ * holding the lock.
+ */
+ while (wakeidx != INVALID_PGPROCNO)
+ {
+ PGPROC *proc = &allProcs[wakeidx];
+
+ wakeidx = pg_atomic_read_u32(&proc->nextClearXidElem);
+ pg_atomic_write_u32(&proc->nextClearXidElem, INVALID_PGPROCNO);
+
+ if (proc != MyProc)
+ PGSemaphoreUnlock(&proc->sem);
+ }
+}
/*
* ProcArrayClearTransaction -- clear the transaction fields