if (!PostmasterIsAlive())
exit(1);
+ /* Clear any already-pending wakeups */
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
/* Process any requests or signals received recently */
if (got_SIGHUP)
{
/* Check for input from the client */
ProcessRepliesIfAny();
- /* Clear any already-pending wakeups */
- ResetLatch(&MyWalSnd->latch);
-
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
WalSndShutdown();
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
/* Sleep until something happens or we time out */
- ImmediateInterruptOK = true;
- CHECK_FOR_INTERRUPTS();
- WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime);
- ImmediateInterruptOK = false;
}
/* reactivate latch so WalSndLoop knows to continue */
- SetLatch(&MyWalSnd->latch);
+ SetLatch(MyLatch);
}
/*
if (!PostmasterIsAlive())
exit(1);
+ /* Clear any already-pending wakeups */
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
/* Process any requests or signals received recently */
if (got_SIGHUP)
{
/* Check for input from the client */
ProcessRepliesIfAny();
- /* Clear any already-pending wakeups */
- ResetLatch(&MyWalSnd->latch);
-
/* Update our idea of the currently flushed position. */
if (!RecoveryInProgress())
RecentFlushPtr = GetFlushRecPtr();
wakeEvents |= WL_SOCKET_WRITEABLE;
/* Sleep until something happens or we time out */
- ImmediateInterruptOK = true;
- CHECK_FOR_INTERRUPTS();
- WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime);
- ImmediateInterruptOK = false;
}
/* reactivate latch so WalSndLoop knows to continue */
- SetLatch(&MyWalSnd->latch);
+ SetLatch(MyLatch);
return RecentFlushPtr;
}
if (!PostmasterIsAlive())
exit(1);
+ /* Clear any already-pending wakeups */
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
/* Process any requests or signals received recently */
if (got_SIGHUP)
{
SyncRepInitConfig();
}
- CHECK_FOR_INTERRUPTS();
-
/* Check for input from the client */
ProcessRepliesIfAny();
- /* Clear any already-pending wakeups */
- ResetLatch(&MyWalSnd->latch);
-
/*
* If we have received CopyDone from the client, sent CopyDone
* ourselves, and the output buffer is empty, it's time to exit
wakeEvents |= WL_SOCKET_WRITEABLE;
/* Sleep until something happens or we time out */
- ImmediateInterruptOK = true;
- CHECK_FOR_INTERRUPTS();
- WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime);
- ImmediateInterruptOK = false;
}
}
return;
walsnd->pid = MyProcPid;
walsnd->sentPtr = InvalidXLogRecPtr;
walsnd->state = WALSNDSTATE_STARTUP;
+ walsnd->latch = &MyProc->procLatch;
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
- OwnLatch((Latch *) &walsnd->latch);
MyWalSnd = (WalSnd *) walsnd;
break;
Assert(walsnd != NULL);
- /*
- * Clear MyWalSnd first; then disown the latch. This is so that signal
- * handlers won't try to touch the latch after it's no longer ours.
- */
MyWalSnd = NULL;
- DisownLatch(&walsnd->latch);
-
- /*
- * Mark WalSnd struct no longer in use. Assume that no lock is required
- * for this.
- */
+ SpinLockAcquire(&walsnd->mutex);
+ /* clear latch while holding the spinlock, so it can safely be read */
+ walsnd->latch = NULL;
+ /* Mark WalSnd struct as no longer being in use. */
walsnd->pid = 0;
+ SpinLockRelease(&walsnd->mutex);
}
/*
int save_errno = errno;
got_SIGHUP = true;
- if (MyWalSnd)
- SetLatch(&MyWalSnd->latch);
+
+ SetLatch(MyLatch);
errno = save_errno;
}
kill(MyProcPid, SIGTERM);
walsender_ready_to_stop = true;
- if (MyWalSnd)
- SetLatch(&MyWalSnd->latch);
+ SetLatch(MyLatch);
errno = save_errno;
}
WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockInit(&walsnd->mutex);
- InitSharedLatch(&walsnd->latch);
}
}
}
int i;
for (i = 0; i < max_wal_senders; i++)
- SetLatch(&WalSndCtl->walsnds[i].latch);
+ {
+ Latch *latch;
+ WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ /*
+ * Get latch pointer with spinlock held, for the unlikely case that
+ * pointer reads aren't atomic (as they're 8 bytes).
+ */
+ SpinLockAcquire(&walsnd->mutex);
+ latch = walsnd->latch;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (latch != NULL)
+ SetLatch(latch);
+ }
}
/* Set state for current walsender (only called in walsender) */