]> granicus.if.org Git - postgresql/commitdiff
Use latch instead of select() in walreceiver
authorPeter Eisentraut <peter_e@gmx.net>
Wed, 30 Nov 2016 17:00:00 +0000 (12:00 -0500)
committerPeter Eisentraut <peter_e@gmx.net>
Fri, 2 Dec 2016 01:23:28 +0000 (20:23 -0500)
Replace use of poll()/select() by WaitLatchOrSocket(), which is more
portable and flexible.

Also change walreceiver to use its procLatch instead of a custom latch.

From: Petr Jelinek <petr@2ndquadrant.com>

src/backend/postmaster/pgstat.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/include/pgstat.h
src/include/replication/walreceiver.h

index a3921977c572d0d96aea464eed2188ccdc18426e..c7584cb1d3492e89684bf986c163e926a86d3bae 100644 (file)
@@ -3338,6 +3338,9 @@ pgstat_get_wait_client(WaitEventClient w)
                case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
                        event_name = "WalReceiverWaitStart";
                        break;
+               case WAIT_EVENT_LIBPQWALRECEIVER_READ:
+                       event_name = "LibPQWalReceiverRead";
+                       break;
                case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
                        event_name = "WalSenderWaitForWAL";
                        break;
index f1c843e868c08bedf5a520df3052564f9c7fae15..6c01e7b991853a3388f11a36db605cb2abbb61eb 100644 (file)
 #include "pqexpbuffer.h"
 #include "access/xlog.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "replication/walreceiver.h"
+#include "storage/proc.h"
 #include "utils/builtins.h"
 
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-#ifdef HAVE_SYS_POLL_H
-#include <sys/poll.h>
-#endif
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-
 PG_MODULE_MAGIC;
 
 void           _PG_init(void);
@@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes);
 static void libpqrcv_disconnect(void);
 
 /* Prototypes for private functions */
-static bool libpq_select(int timeout_ms);
 static PGresult *libpqrcv_PQexec(const char *query);
 
 /*
@@ -366,67 +357,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
        PQclear(res);
 }
 
-/*
- * Wait until we can read WAL stream, or timeout.
- *
- * Returns true if data has become available for reading, false if timed out
- * or interrupted by signal.
- *
- * This is based on pqSocketCheck.
- */
-static bool
-libpq_select(int timeout_ms)
-{
-       int                     ret;
-
-       Assert(streamConn != NULL);
-       if (PQsocket(streamConn) < 0)
-               ereport(ERROR,
-                               (errcode_for_socket_access(),
-                                errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
-
-       /* We use poll(2) if available, otherwise select(2) */
-       {
-#ifdef HAVE_POLL
-               struct pollfd input_fd;
-
-               input_fd.fd = PQsocket(streamConn);
-               input_fd.events = POLLIN | POLLERR;
-               input_fd.revents = 0;
-
-               ret = poll(&input_fd, 1, timeout_ms);
-#else                                                  /* !HAVE_POLL */
-
-               fd_set          input_mask;
-               struct timeval timeout;
-               struct timeval *ptr_timeout;
-
-               FD_ZERO(&input_mask);
-               FD_SET(PQsocket(streamConn), &input_mask);
-
-               if (timeout_ms < 0)
-                       ptr_timeout = NULL;
-               else
-               {
-                       timeout.tv_sec = timeout_ms / 1000;
-                       timeout.tv_usec = (timeout_ms % 1000) * 1000;
-                       ptr_timeout = &timeout;
-               }
-
-               ret = select(PQsocket(streamConn) + 1, &input_mask,
-                                        NULL, NULL, ptr_timeout);
-#endif   /* HAVE_POLL */
-       }
-
-       if (ret == 0 || (ret < 0 && errno == EINTR))
-               return false;
-       if (ret < 0)
-               ereport(ERROR,
-                               (errcode_for_socket_access(),
-                                errmsg("select() failed: %m")));
-       return true;
-}
-
 /*
  * Send a query and wait for the results by using the asynchronous libpq
  * functions and the backend version of select().
@@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query)
                 */
                while (PQisBusy(streamConn))
                {
+                       int                     rc;
+
                        /*
                         * We don't need to break down the sleep into smaller increments,
-                        * and check for interrupts after each nap, since we can just
-                        * elog(FATAL) within SIGTERM signal handler if the signal arrives
-                        * in the middle of establishment of replication connection.
+                        * since we'll get interrupted by signals and can either handle
+                        * interrupts here or elog(FATAL) within SIGTERM signal handler if
+                        * the signal arrives in the middle of establishment of
+                        * replication connection.
                         */
-                       if (!libpq_select(-1))
-                               continue;               /* interrupted */
+                       ResetLatch(&MyProc->procLatch);
+                       rc = WaitLatchOrSocket(&MyProc->procLatch,
+                                                                  WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+                                                                  WL_LATCH_SET,
+                                                                  PQsocket(streamConn),
+                                                                  0,
+                                                                  WAIT_EVENT_LIBPQWALRECEIVER_READ);
+                       if (rc & WL_POSTMASTER_DEATH)
+                               exit(1);
+
+                       /* interrupted */
+                       if (rc & WL_LATCH_SET)
+                       {
+                               CHECK_FOR_INTERRUPTS();
+                               continue;
+                       }
                        if (PQconsumeInput(streamConn) == 0)
                                return NULL;    /* trouble */
                }
index 2bb3dce1b1c5bab7ac7d1e1aeb59e6e50b7bc7a3..8bfb041560823f2c3ca11ba09b42b7ece190c09f 100644 (file)
@@ -261,7 +261,7 @@ WalReceiverMain(void)
        /* Arrange to clean up at walreceiver exit */
        on_shmem_exit(WalRcvDie, 0);
 
-       OwnLatch(&walrcv->latch);
+       walrcv->latch = &MyProc->procLatch;
 
        /* Properly accept or ignore signals the postmaster might send us */
        pqsignal(SIGHUP, WalRcvSigHupHandler);          /* set flag to read config
@@ -483,7 +483,7 @@ WalReceiverMain(void)
                                 * avoiding some system calls.
                                 */
                                Assert(wait_fd != PGINVALID_SOCKET);
-                               rc = WaitLatchOrSocket(&walrcv->latch,
+                               rc = WaitLatchOrSocket(walrcv->latch,
                                                                   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
                                                                           WL_TIMEOUT | WL_LATCH_SET,
                                                                           wait_fd,
@@ -491,7 +491,7 @@ WalReceiverMain(void)
                                                                           WAIT_EVENT_WAL_RECEIVER_MAIN);
                                if (rc & WL_LATCH_SET)
                                {
-                                       ResetLatch(&walrcv->latch);
+                                       ResetLatch(walrcv->latch);
                                        if (walrcv->force_reply)
                                        {
                                                /*
@@ -652,7 +652,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
        WakeupRecovery();
        for (;;)
        {
-               ResetLatch(&walrcv->latch);
+               ResetLatch(walrcv->latch);
 
                /*
                 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -687,7 +687,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
                }
                SpinLockRelease(&walrcv->mutex);
 
-               WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
+               WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
                                  WAIT_EVENT_WAL_RECEIVER_WAIT_START);
        }
 
@@ -763,7 +763,7 @@ WalRcvDie(int code, Datum arg)
        /* Ensure that all WAL records received are flushed to disk */
        XLogWalRcvFlush(true);
 
-       DisownLatch(&walrcv->latch);
+       walrcv->latch = NULL;
 
        SpinLockAcquire(&walrcv->mutex);
        Assert(walrcv->walRcvState == WALRCV_STREAMING ||
@@ -812,7 +812,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
 
        got_SIGTERM = true;
 
-       SetLatch(&WalRcv->latch);
+       if (WalRcv->latch)
+               SetLatch(WalRcv->latch);
 
        /* Don't joggle the elbow of proc_exit */
        if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
@@ -1297,7 +1298,8 @@ void
 WalRcvForceReply(void)
 {
        WalRcv->force_reply = true;
-       SetLatch(&WalRcv->latch);
+       if (WalRcv->latch)
+               SetLatch(WalRcv->latch);
 }
 
 /*
index 5f6e423f1f63c74d53ed7607849a9bf81ac6eae9..01111a4c12bcd8f3ddde0ebe47edeab4ab3c304c 100644 (file)
@@ -64,7 +64,7 @@ WalRcvShmemInit(void)
                MemSet(WalRcv, 0, WalRcvShmemSize());
                WalRcv->walRcvState = WALRCV_STOPPED;
                SpinLockInit(&WalRcv->mutex);
-               InitSharedLatch(&WalRcv->latch);
+               WalRcv->latch = NULL;
        }
 }
 
@@ -279,8 +279,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 
        if (launch)
                SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
-       else
-               SetLatch(&walrcv->latch);
+       else if (walrcv->latch)
+               SetLatch(walrcv->latch);
 }
 
 /*
index 0b85b7ad3ae9975eb20e6803c86c498e037ff0de..152ff0620852133323edda179e17539893fbe539 100644 (file)
@@ -763,6 +763,7 @@ typedef enum
        WAIT_EVENT_CLIENT_WRITE,
        WAIT_EVENT_SSL_OPEN_SERVER,
        WAIT_EVENT_WAL_RECEIVER_WAIT_START,
+       WAIT_EVENT_LIBPQWALRECEIVER_READ,
        WAIT_EVENT_WAL_SENDER_WAIT_WAL,
        WAIT_EVENT_WAL_SENDER_WRITE_DATA
 } WaitEventClient;
index cd787c92b3fc9eb2fb8de62d1199ceb64cc6c1c1..afbb8d8b9541e450d08702834406cc7b835be80c 100644 (file)
@@ -127,8 +127,9 @@ typedef struct
         * where to start streaming (after setting receiveStart and
         * receiveStartTLI), and also to tell it to send apply feedback to the
         * primary whenever specially marked commit records are applied.
+        * This is normally mapped to procLatch when walreceiver is running.
         */
-       Latch           latch;
+       Latch      *latch;
 } WalRcvData;
 
 extern WalRcvData *WalRcv;