From ff44fba46c09c37dd9e60da1cb0b3a9339eb085f Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sat, 17 Jan 2015 13:00:42 +0100 Subject: [PATCH] Replace walsender's latch with the general shared latch. Relying on the normal shared latch simplifies interrupt/signal handling because we can rely on all signal handlers setting the proc latch. That in turn allows us to avoid the use of ImmediateInterruptOK, which arguably isn't correct because WaitLatchOrSocket isn't declared to be immediately interruptible. Also change sections that wait on the walsender's latch to notice interrupts quicker/more reliably and make them more consistent with each other. This is part of a larger "get rid of ImmediateInterruptOK" series. Discussion: 20150115020335.GZ5245@awork2.anarazel.de --- src/backend/replication/basebackup.c | 10 ++- src/backend/replication/walsender.c | 86 +++++++++++---------- src/include/replication/walsender_private.h | 6 +- 3 files changed, 55 insertions(+), 47 deletions(-) diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 07030a2ef0..3058ce921b 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -1294,15 +1294,21 @@ throttle(size_t increment) /* Only sleep if the transfer is faster than it should be. */ if (sleep > 0) { - ResetLatch(&MyWalSnd->latch); + ResetLatch(MyLatch); + + /* We're eating a potentially set latch, so check for interrupts */ + CHECK_FOR_INTERRUPTS(); /* * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be * the maximum time to sleep. Thus the cast to long is safe. */ - wait_result = WaitLatch(&MyWalSnd->latch, + wait_result = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, (long) (sleep / 1000)); + + if (wait_result & WL_LATCH_SET) + CHECK_FOR_INTERRUPTS(); } else { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 86c36bf502..05d2339b15 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1081,6 +1081,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, if (!PostmasterIsAlive()) exit(1); + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + /* Process any requests or signals received recently */ if (got_SIGHUP) { @@ -1092,9 +1097,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* Check for input from the client */ ProcessRepliesIfAny(); - /* Clear any already-pending wakeups */ - ResetLatch(&MyWalSnd->latch); - /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); @@ -1117,15 +1119,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; /* Sleep until something happens or we time out */ - ImmediateInterruptOK = true; - CHECK_FOR_INTERRUPTS(); - WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, + WaitLatchOrSocket(MyLatch, wakeEvents, MyProcPort->sock, sleeptime); - ImmediateInterruptOK = false; } /* reactivate latch so WalSndLoop knows to continue */ - SetLatch(&MyWalSnd->latch); + SetLatch(MyLatch); } /* @@ -1165,6 +1164,11 @@ WalSndWaitForWal(XLogRecPtr loc) if (!PostmasterIsAlive()) exit(1); + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + /* Process any requests or signals received recently */ if (got_SIGHUP) { @@ -1176,9 +1180,6 @@ WalSndWaitForWal(XLogRecPtr loc) /* Check for input from the client */ ProcessRepliesIfAny(); - /* Clear any already-pending wakeups */ - ResetLatch(&MyWalSnd->latch); - /* Update our idea of the currently flushed position. */ if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(); @@ -1244,15 +1245,12 @@ WalSndWaitForWal(XLogRecPtr loc) wakeEvents |= WL_SOCKET_WRITEABLE; /* Sleep until something happens or we time out */ - ImmediateInterruptOK = true; - CHECK_FOR_INTERRUPTS(); - WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, + WaitLatchOrSocket(MyLatch, wakeEvents, MyProcPort->sock, sleeptime); - ImmediateInterruptOK = false; } /* reactivate latch so WalSndLoop knows to continue */ - SetLatch(&MyWalSnd->latch); + SetLatch(MyLatch); return RecentFlushPtr; } @@ -1813,6 +1811,11 @@ WalSndLoop(WalSndSendDataCallback send_data) if (!PostmasterIsAlive()) exit(1); + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + /* Process any requests or signals received recently */ if (got_SIGHUP) { @@ -1821,14 +1824,9 @@ WalSndLoop(WalSndSendDataCallback send_data) SyncRepInitConfig(); } - CHECK_FOR_INTERRUPTS(); - /* Check for input from the client */ ProcessRepliesIfAny(); - /* Clear any already-pending wakeups */ - ResetLatch(&MyWalSnd->latch); - /* * If we have received CopyDone from the client, sent CopyDone * ourselves, and the output buffer is empty, it's time to exit @@ -1912,11 +1910,8 @@ WalSndLoop(WalSndSendDataCallback send_data) wakeEvents |= WL_SOCKET_WRITEABLE; /* Sleep until something happens or we time out */ - ImmediateInterruptOK = true; - CHECK_FOR_INTERRUPTS(); - WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, + WaitLatchOrSocket(MyLatch, wakeEvents, MyProcPort->sock, sleeptime); - ImmediateInterruptOK = false; } } return; @@ -1959,9 +1954,9 @@ InitWalSenderSlot(void) walsnd->pid = MyProcPid; walsnd->sentPtr = InvalidXLogRecPtr; walsnd->state = WALSNDSTATE_STARTUP; + walsnd->latch = &MyProc->procLatch; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ - OwnLatch((Latch *) &walsnd->latch); MyWalSnd = (WalSnd *) walsnd; break; @@ -1986,19 +1981,14 @@ WalSndKill(int code, Datum arg) Assert(walsnd != NULL); - /* - * Clear MyWalSnd first; then disown the latch. This is so that signal - * handlers won't try to touch the latch after it's no longer ours. - */ MyWalSnd = NULL; - DisownLatch(&walsnd->latch); - - /* - * Mark WalSnd struct no longer in use. Assume that no lock is required - * for this. - */ + SpinLockAcquire(&walsnd->mutex); + /* clear latch while holding the spinlock, so it can safely be read */ + walsnd->latch = NULL; + /* Mark WalSnd struct as no longer being in use. */ walsnd->pid = 0; + SpinLockRelease(&walsnd->mutex); } /* @@ -2570,8 +2560,8 @@ WalSndSigHupHandler(SIGNAL_ARGS) int save_errno = errno; got_SIGHUP = true; - if (MyWalSnd) - SetLatch(&MyWalSnd->latch); + + SetLatch(MyLatch); errno = save_errno; } @@ -2603,8 +2593,7 @@ WalSndLastCycleHandler(SIGNAL_ARGS) kill(MyProcPid, SIGTERM); walsender_ready_to_stop = true; - if (MyWalSnd) - SetLatch(&MyWalSnd->latch); + SetLatch(MyLatch); errno = save_errno; } @@ -2668,7 +2657,6 @@ WalSndShmemInit(void) WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockInit(&walsnd->mutex); - InitSharedLatch(&walsnd->latch); } } } @@ -2685,7 +2673,21 @@ WalSndWakeup(void) int i; for (i = 0; i < max_wal_senders; i++) - SetLatch(&WalSndCtl->walsnds[i].latch); + { + Latch *latch; + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + /* + * Get latch pointer with spinlock held, for the unlikely case that + * pointer reads aren't atomic (as they're 8 bytes). + */ + SpinLockAcquire(&walsnd->mutex); + latch = walsnd->latch; + SpinLockRelease(&walsnd->mutex); + + if (latch != NULL) + SetLatch(latch); + } } /* Set state for current walsender (only called in walsender) */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index cc351d6f67..88677506f3 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -51,10 +51,10 @@ typedef struct WalSnd slock_t mutex; /* - * Latch used by backends to wake up this walsender when it has work to - * do. + * Pointer to the walsender's latch. Used by backends to wake up this + * walsender when it has work to do. NULL if the walsender isn't active. */ - Latch latch; + Latch *latch; /* * The priority order of the standby managed by this WALSender, as listed -- 2.40.0