* commit time, add ourselves to a list of processes that need their XIDs
* cleared. The first process to add itself to the list will acquire
* ProcArrayLock in exclusive mode and perform ProcArrayEndTransactionInternal
- * on behalf of all group members. This avoids a great deal of context
- * switching when many processes are trying to commit at once, since the lock
- * only needs to be handed from the last share-locker to one process waiting
- * for the exclusive lock, rather than to each one in turn.
+ * on behalf of all group members. This avoids a great deal of contention
+ * around ProcArrayLock when many processes are trying to commit at once,
+ * since the lock need not be repeatedly handed off from one committing
+ * process to the next.
*/
static void
ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid)
Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
/* Add ourselves to the list of processes needing a group XID clear. */
+ proc->clearXid = true;
proc->backendLatestXid = latestXid;
while (true)
{
- nextidx = pg_atomic_read_u32(&procglobal->nextClearXidElem);
+ nextidx = pg_atomic_read_u32(&procglobal->firstClearXidElem);
pg_atomic_write_u32(&proc->nextClearXidElem, nextidx);
- if (pg_atomic_compare_exchange_u32(&procglobal->nextClearXidElem,
+ if (pg_atomic_compare_exchange_u32(&procglobal->firstClearXidElem,
&nextidx,
(uint32) proc->pgprocno))
break;
}
- /* If the list was not empty, the leader will clear our XID. */
+ /*
+ * If the list was not empty, the leader will clear our XID. It is
+ * impossible to have followers without a leader because the first process
+ * that has added itself to the list will always have nextidx as
+ * INVALID_PGPROCNO.
+ */
if (nextidx != INVALID_PGPROCNO)
{
/* Sleep until the leader clears our XID. */
- while (pg_atomic_read_u32(&proc->nextClearXidElem) != INVALID_PGPROCNO)
+ for (;;)
{
- extraWaits++;
+ /* acts as a read barrier */
PGSemaphoreLock(&proc->sem);
+ if (!proc->clearXid)
+ break;
+ extraWaits++;
}
+ Assert(pg_atomic_read_u32(&proc->nextClearXidElem) == INVALID_PGPROCNO);
+
/* Fix semaphore count for any absorbed wakeups */
while (extraWaits-- > 0)
PGSemaphoreUnlock(&proc->sem);
/*
* Now that we've got the lock, clear the list of processes waiting for
- * group XID clearing, saving a pointer to the head of the list.
+ * group XID clearing, saving a pointer to the head of the list. Trying
+ * to pop elements one at a time could lead to an ABA problem.
*/
while (true)
{
- nextidx = pg_atomic_read_u32(&procglobal->nextClearXidElem);
- if (pg_atomic_compare_exchange_u32(&procglobal->nextClearXidElem,
+ nextidx = pg_atomic_read_u32(&procglobal->firstClearXidElem);
+ if (pg_atomic_compare_exchange_u32(&procglobal->firstClearXidElem,
&nextidx,
INVALID_PGPROCNO))
break;
wakeidx = pg_atomic_read_u32(&proc->nextClearXidElem);
pg_atomic_write_u32(&proc->nextClearXidElem, INVALID_PGPROCNO);
+ /* ensure all previous writes are visible before follower continues. */
+ pg_write_barrier();
+
+ proc->clearXid = false;
+
if (proc != MyProc)
PGSemaphoreUnlock(&proc->sem);
}
ProcGlobal->startupBufferPinWaitBufId = -1;
ProcGlobal->walwriterLatch = NULL;
ProcGlobal->checkpointerLatch = NULL;
- pg_atomic_init_u32(&ProcGlobal->nextClearXidElem, INVALID_PGPROCNO);
+ pg_atomic_init_u32(&ProcGlobal->firstClearXidElem, INVALID_PGPROCNO);
/*
* Create and initialize all the PGPROC structures we'll need. There are
SHMQueueElemInit(&(MyProc->syncRepLinks));
/* Initialize fields for group XID clearing. */
+ MyProc->clearXid = false;
MyProc->backendLatestXid = InvalidTransactionId;
pg_atomic_init_u32(&MyProc->nextClearXidElem, INVALID_PGPROCNO);
struct XidCache subxids; /* cache for subtransaction XIDs */
/* Support for group XID clearing. */
- volatile pg_atomic_uint32 nextClearXidElem;
+ bool clearXid;
+ pg_atomic_uint32 nextClearXidElem;
TransactionId backendLatestXid;
/* Per-backend LWLock. Protects fields below. */
/* Head of list of bgworker free PGPROC structures */
PGPROC *bgworkerFreeProcs;
/* First pgproc waiting for group XID clear */
- volatile pg_atomic_uint32 nextClearXidElem;
+ pg_atomic_uint32 firstClearXidElem;
/* WALWriter process's latch */
Latch *walwriterLatch;
/* Checkpointer process's latch */