]> granicus.if.org Git - postgresql/commitdiff
Replace walsender's latch with the general shared latch.
authorAndres Freund <andres@anarazel.de>
Sat, 17 Jan 2015 12:00:42 +0000 (13:00 +0100)
committerAndres Freund <andres@anarazel.de>
Sat, 17 Jan 2015 12:00:42 +0000 (13:00 +0100)
Relying on the normal shared latch simplifies interrupt/signal
handling because we can rely on all signal handlers setting the proc
latch. That in turn allows us to avoid the use of
ImmediateInterruptOK, which arguably isn't correct because
WaitLatchOrSocket isn't declared to be immediately interruptible.

Also change sections that wait on the walsender's latch to notice
interrupts quicker/more reliably and make them more consistent with
each other.

This is part of a larger "get rid of ImmediateInterruptOK" series.

Discussion: 20150115020335.GZ5245@awork2.anarazel.de

src/backend/replication/basebackup.c
src/backend/replication/walsender.c
src/include/replication/walsender_private.h

index 07030a2ef08d8f4fb4d7bb9354fd55605b55b5e8..3058ce921b03b3b6c79f3bf745d9a17550f9ee2f 100644 (file)
@@ -1294,15 +1294,21 @@ throttle(size_t increment)
        /* Only sleep if the transfer is faster than it should be. */
        if (sleep > 0)
        {
-               ResetLatch(&MyWalSnd->latch);
+               ResetLatch(MyLatch);
+
+               /* We're eating a potentially set latch, so check for interrupts */
+               CHECK_FOR_INTERRUPTS();
 
                /*
                 * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
                 * the maximum time to sleep. Thus the cast to long is safe.
                 */
-               wait_result = WaitLatch(&MyWalSnd->latch,
+               wait_result = WaitLatch(MyLatch,
                                                         WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                                                                (long) (sleep / 1000));
+
+               if (wait_result & WL_LATCH_SET)
+                       CHECK_FOR_INTERRUPTS();
        }
        else
        {
index 86c36bf502c26e12b36d49a4ea49f3793cf29d1b..05d2339b15093701f8bac2ed96560c1fb303cb3b 100644 (file)
@@ -1081,6 +1081,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
                if (!PostmasterIsAlive())
                        exit(1);
 
+               /* Clear any already-pending wakeups */
+               ResetLatch(MyLatch);
+
+               CHECK_FOR_INTERRUPTS();
+
                /* Process any requests or signals received recently */
                if (got_SIGHUP)
                {
@@ -1092,9 +1097,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
                /* Check for input from the client */
                ProcessRepliesIfAny();
 
-               /* Clear any already-pending wakeups */
-               ResetLatch(&MyWalSnd->latch);
-
                /* Try to flush pending output to the client */
                if (pq_flush_if_writable() != 0)
                        WalSndShutdown();
@@ -1117,15 +1119,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
                        WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
 
                /* Sleep until something happens or we time out */
-               ImmediateInterruptOK = true;
-               CHECK_FOR_INTERRUPTS();
-               WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+               WaitLatchOrSocket(MyLatch, wakeEvents,
                                                  MyProcPort->sock, sleeptime);
-               ImmediateInterruptOK = false;
        }
 
        /* reactivate latch so WalSndLoop knows to continue */
-       SetLatch(&MyWalSnd->latch);
+       SetLatch(MyLatch);
 }
 
 /*
@@ -1165,6 +1164,11 @@ WalSndWaitForWal(XLogRecPtr loc)
                if (!PostmasterIsAlive())
                        exit(1);
 
+               /* Clear any already-pending wakeups */
+               ResetLatch(MyLatch);
+
+               CHECK_FOR_INTERRUPTS();
+
                /* Process any requests or signals received recently */
                if (got_SIGHUP)
                {
@@ -1176,9 +1180,6 @@ WalSndWaitForWal(XLogRecPtr loc)
                /* Check for input from the client */
                ProcessRepliesIfAny();
 
-               /* Clear any already-pending wakeups */
-               ResetLatch(&MyWalSnd->latch);
-
                /* Update our idea of the currently flushed position. */
                if (!RecoveryInProgress())
                        RecentFlushPtr = GetFlushRecPtr();
@@ -1244,15 +1245,12 @@ WalSndWaitForWal(XLogRecPtr loc)
                        wakeEvents |= WL_SOCKET_WRITEABLE;
 
                /* Sleep until something happens or we time out */
-               ImmediateInterruptOK = true;
-               CHECK_FOR_INTERRUPTS();
-               WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+               WaitLatchOrSocket(MyLatch, wakeEvents,
                                                  MyProcPort->sock, sleeptime);
-               ImmediateInterruptOK = false;
        }
 
        /* reactivate latch so WalSndLoop knows to continue */
-       SetLatch(&MyWalSnd->latch);
+       SetLatch(MyLatch);
        return RecentFlushPtr;
 }
 
@@ -1813,6 +1811,11 @@ WalSndLoop(WalSndSendDataCallback send_data)
                if (!PostmasterIsAlive())
                        exit(1);
 
+               /* Clear any already-pending wakeups */
+               ResetLatch(MyLatch);
+
+               CHECK_FOR_INTERRUPTS();
+
                /* Process any requests or signals received recently */
                if (got_SIGHUP)
                {
@@ -1821,14 +1824,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
                        SyncRepInitConfig();
                }
 
-               CHECK_FOR_INTERRUPTS();
-
                /* Check for input from the client */
                ProcessRepliesIfAny();
 
-               /* Clear any already-pending wakeups */
-               ResetLatch(&MyWalSnd->latch);
-
                /*
                 * If we have received CopyDone from the client, sent CopyDone
                 * ourselves, and the output buffer is empty, it's time to exit
@@ -1912,11 +1910,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
                                wakeEvents |= WL_SOCKET_WRITEABLE;
 
                        /* Sleep until something happens or we time out */
-                       ImmediateInterruptOK = true;
-                       CHECK_FOR_INTERRUPTS();
-                       WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+                       WaitLatchOrSocket(MyLatch, wakeEvents,
                                                          MyProcPort->sock, sleeptime);
-                       ImmediateInterruptOK = false;
                }
        }
        return;
@@ -1959,9 +1954,9 @@ InitWalSenderSlot(void)
                        walsnd->pid = MyProcPid;
                        walsnd->sentPtr = InvalidXLogRecPtr;
                        walsnd->state = WALSNDSTATE_STARTUP;
+                       walsnd->latch = &MyProc->procLatch;
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
-                       OwnLatch((Latch *) &walsnd->latch);
                        MyWalSnd = (WalSnd *) walsnd;
 
                        break;
@@ -1986,19 +1981,14 @@ WalSndKill(int code, Datum arg)
 
        Assert(walsnd != NULL);
 
-       /*
-        * Clear MyWalSnd first; then disown the latch.  This is so that signal
-        * handlers won't try to touch the latch after it's no longer ours.
-        */
        MyWalSnd = NULL;
 
-       DisownLatch(&walsnd->latch);
-
-       /*
-        * Mark WalSnd struct no longer in use. Assume that no lock is required
-        * for this.
-        */
+       SpinLockAcquire(&walsnd->mutex);
+       /* clear latch while holding the spinlock, so it can safely be read */
+       walsnd->latch = NULL;
+       /* Mark WalSnd struct as no longer being in use. */
        walsnd->pid = 0;
+       SpinLockRelease(&walsnd->mutex);
 }
 
 /*
@@ -2570,8 +2560,8 @@ WalSndSigHupHandler(SIGNAL_ARGS)
        int                     save_errno = errno;
 
        got_SIGHUP = true;
-       if (MyWalSnd)
-               SetLatch(&MyWalSnd->latch);
+
+       SetLatch(MyLatch);
 
        errno = save_errno;
 }
@@ -2603,8 +2593,7 @@ WalSndLastCycleHandler(SIGNAL_ARGS)
                kill(MyProcPid, SIGTERM);
 
        walsender_ready_to_stop = true;
-       if (MyWalSnd)
-               SetLatch(&MyWalSnd->latch);
+       SetLatch(MyLatch);
 
        errno = save_errno;
 }
@@ -2668,7 +2657,6 @@ WalSndShmemInit(void)
                        WalSnd     *walsnd = &WalSndCtl->walsnds[i];
 
                        SpinLockInit(&walsnd->mutex);
-                       InitSharedLatch(&walsnd->latch);
                }
        }
 }
@@ -2685,7 +2673,21 @@ WalSndWakeup(void)
        int                     i;
 
        for (i = 0; i < max_wal_senders; i++)
-               SetLatch(&WalSndCtl->walsnds[i].latch);
+       {
+               Latch *latch;
+               WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+               /*
+                * Get latch pointer with spinlock held, for the unlikely case that
+                * pointer reads aren't atomic (as they're 8 bytes).
+                */
+               SpinLockAcquire(&walsnd->mutex);
+               latch = walsnd->latch;
+               SpinLockRelease(&walsnd->mutex);
+
+               if (latch != NULL)
+                       SetLatch(latch);
+       }
 }
 
 /* Set state for current walsender (only called in walsender) */
index cc351d6f6736df81dd77c753baef3c1e19932ef6..88677506f34ccd82481af777e8ae16c8a34792b4 100644 (file)
@@ -51,10 +51,10 @@ typedef struct WalSnd
        slock_t         mutex;
 
        /*
-        * Latch used by backends to wake up this walsender when it has work to
-        * do.
+        * Pointer to the walsender's latch. Used by backends to wake up this
+        * walsender when it has work to do. NULL if the walsender isn't active.
         */
-       Latch           latch;
+       Latch           *latch;
 
        /*
         * The priority order of the standby managed by this WALSender, as listed