]> granicus.if.org Git - curl/commitdiff
Add a polling loop in main to read from more than one socket at once. Add the O_NONBL...
authorJoe Mason <jmason@rim.com>
Thu, 2 Aug 2012 21:22:46 +0000 (17:22 -0400)
committerJoe Mason <jmason@rim.com>
Thu, 2 Aug 2012 22:52:38 +0000 (18:52 -0400)
SO_KEEPALIVE flag to all sockets. Note that several loops which used to continue on a return value
of 0 (theoretical since 0 would never be returned without O_NONBLOCK) now break on 0 so that they
won't continue reading until after poll is called again.

tests/server/sws.c

index d60a52a54899e2c66ba21931187b612d002cc8cb..fc0d0ddf31202aa43487a41d2132e11a859c6926 100644 (file)
@@ -50,6 +50,9 @@
 #include <netinet/tcp.h> /* for TCP_NODELAY */
 #endif
 
+#include <fcntl.h>
+#include <poll.h>
+
 #define ENABLE_CURLX_PRINTF
 /* make the curlx header define all printf() functions to use the curlx_*
    versions instead */
@@ -113,6 +116,11 @@ struct httprequest {
   int done_processing;
 };
 
+#define MAX_SOCKETS 1024
+
+static struct pollfd all_sockets[MAX_SOCKETS];
+static nfds_t num_sockets = 0;
+
 static int ProcessRequest(struct httprequest *req);
 static void storerequest(char *reqbuf, size_t totalsize);
 
@@ -1708,7 +1716,12 @@ static int accept_connection(int sock)
 {
   curl_socket_t msgsock = CURL_SOCKET_BAD;
   int error;
-  int flag;
+  int flag = 1;
+
+  if(MAX_SOCKETS == num_sockets) {
+    logmsg("Too many open sockets!");
+    return CURL_SOCKET_BAD;
+  }
 
   msgsock = accept(sock, NULL, NULL);
 
@@ -1729,23 +1742,45 @@ static int accept_connection(int sock)
     return CURL_SOCKET_BAD;
   }
 
+  if(0 != fcntl(msgsock, F_SETFL, O_NONBLOCK)) {
+    error = SOCKERRNO;
+    logmsg("fcntl(O_NONBLOCK) failed with error: (%d) %s",
+           error, strerror(error));
+    sclose(msgsock);
+    return CURL_SOCKET_BAD;
+  }
+
+  if(0 != setsockopt(msgsock, SOL_SOCKET, SO_KEEPALIVE,
+                     (void *)&flag, sizeof(flag))) {
+    error = SOCKERRNO;
+    logmsg("setsockopt(SO_KEEPALIVE) failed with error: (%d) %s",
+           error, strerror(error));
+    sclose(msgsock);
+    return CURL_SOCKET_BAD;
+  }
+
   /*
-  ** As soon as this server acepts a connection from the test harness it
+  ** As soon as this server accepts a connection from the test harness it
   ** must set the server logs advisor read lock to indicate that server
   ** logs should not be read until this lock is removed by this server.
   */
 
-  set_advisor_read_lock(SERVERLOGS_LOCK);
-  serverlogslocked = 1;
+  if(!serverlogslocked)
+    set_advisor_read_lock(SERVERLOGS_LOCK);
+  serverlogslocked += 1;
 
   logmsg("====> Client connect");
 
+  all_sockets[num_sockets].fd = msgsock;
+  all_sockets[num_sockets].events = POLLIN;
+  all_sockets[num_sockets].revents = 0;
+  num_sockets += 1;
+
 #ifdef TCP_NODELAY
   /*
    * Disable the Nagle algorithm to make it easier to send out a large
    * response in many small segments to torture the clients more.
    */
-  flag = 1;
   if(0 != setsockopt(msgsock, IPPROTO_TCP, TCP_NODELAY,
                      (void *)&flag, sizeof(flag)))
     logmsg("====> TCP_NODELAY failed");
@@ -1764,9 +1799,9 @@ static int service_connection(int msgsock, struct httprequest *req,
   if(got_exit_signal)
     return -1;
 
-  init_httprequest(req);
   while(!req->done_processing) {
     int rc = get_request(msgsock, req);
+    logmsg("get_request %d returned %d", msgsock, rc);
     if (rc <= 0) {
       /* Nothing further to read now (possibly because the socket was closed */
       return rc;
@@ -1827,7 +1862,6 @@ int main(int argc, char *argv[])
 {
   srvr_sockaddr_union_t me;
   curl_socket_t sock = CURL_SOCKET_BAD;
-  curl_socket_t msgsock = CURL_SOCKET_BAD;
   int wrotepidfile = 0;
   int flag;
   unsigned short port = DEFAULT_PORT;
@@ -1838,6 +1872,7 @@ int main(int argc, char *argv[])
   int arg=1;
   long pid;
   const char *hostport = "127.0.0.1";
+  nfds_t socket_idx;
 
   memset(&req, 0, sizeof(req));
 
@@ -1949,6 +1984,11 @@ int main(int argc, char *argv[])
     sock = socket(AF_INET6, SOCK_STREAM, 0);
 #endif
 
+  all_sockets[0].fd = sock;
+  all_sockets[0].events = POLLIN;
+  all_sockets[0].revents = 0;
+  num_sockets = 1;
+
   if(CURL_SOCKET_BAD == sock) {
     error = SOCKERRNO;
     logmsg("Error creating socket: (%d) %s",
@@ -1964,6 +2004,12 @@ int main(int argc, char *argv[])
            error, strerror(error));
     goto sws_cleanup;
   }
+  if(0 != fcntl(sock, F_SETFL, O_NONBLOCK)) {
+    error = SOCKERRNO;
+    logmsg("fcntl(O_NONBLOCK) failed with error: (%d) %s",
+           error, strerror(error));
+    goto sws_cleanup;
+  }
 
 #ifdef ENABLE_IPV6
   if(!use_ipv6) {
@@ -2011,59 +2057,114 @@ int main(int argc, char *argv[])
   if(!wrotepidfile)
     goto sws_cleanup;
 
-  for (;;) {
-    do {
-      msgsock = accept_connection(sock);
-      if (CURL_SOCKET_BAD == msgsock)
-        goto sws_cleanup;
-    } while (msgsock >= 0);
+  /* initialization of httprequest struct is done before get_request(), but
+     the pipelining struct field must be initialized previously to FALSE
+     every time a new connection arrives. */
+
+  req.pipelining = FALSE;
+  init_httprequest(&req);
+
+  for(;;) {
+    /* Clear out closed sockets */
+    for (socket_idx = num_sockets - 1; socket_idx >= 1; --socket_idx) {
+      if (CURL_SOCKET_BAD == all_sockets[socket_idx].fd) {
+        char* dst = (char *) all_sockets + socket_idx;
+        char* src = (char *) all_sockets + socket_idx + 1;
+        char* end = (char *) all_sockets + num_sockets;
+        memmove(dst, src, end - src);
+        num_sockets -= 1;
+      }
+    }
 
-    /* initialization of httprequest struct is done before get_request(), but
-       the pipelining struct field must be initialized previously to FALSE
-       every time a new connection arrives. */
+    rc = poll(all_sockets, num_sockets, -1);
 
-    req.pipelining = FALSE;
+    if (rc < 0) {
+      error = SOCKERRNO;
+      logmsg("poll() failed with error: (%d) %s",
+             error, strerror(error));
+      goto sws_cleanup;
+    }
 
-    do {
-      rc = service_connection(msgsock, &req, sock, hostport);
-      if(got_exit_signal)
-        goto sws_cleanup;
+    /* Check if the listening socket is ready to accept */
+    if ((all_sockets[0].revents & POLLIN) == POLLIN) {
+      /* Service all queued connections */
+      curl_socket_t msgsock;
+      do {
+        msgsock = accept_connection(sock);
+        logmsg("accept_connection %d returned %d", sock, msgsock);
+        if (CURL_SOCKET_BAD == msgsock)
+          goto sws_cleanup;
+      } while (msgsock > 0);
+    }
+    else if (all_sockets[0].revents != 0) {
+      logmsg("unexpected poll event on listening socket: %d",
+             all_sockets[0].revents);
+      goto sws_cleanup;
+    }
 
-      if (rc < 0) {
-        logmsg("====> Client disconnect %d", req.connmon);
+    /* Service all connections that are ready */
+    for (socket_idx = 1; socket_idx < num_sockets; ++socket_idx) {
+      if ((all_sockets[socket_idx].revents & POLLIN) == POLLIN) {
+        if(got_exit_signal)
+          goto sws_cleanup;
 
-        if(req.connmon) {
-          const char *keepopen="[DISCONNECT]\n";
-          storerequest((char *)keepopen, strlen(keepopen));
-        }
+        /* Service this connection until it has nothing available */
+        do {
+          rc = service_connection(all_sockets[socket_idx].fd, &req, sock, hostport);
+          logmsg("service_connection %d returned %d", all_sockets[socket_idx].fd, rc);
+          if(got_exit_signal)
+            goto sws_cleanup;
 
-        if(!req.open)
-          /* When instructed to close connection after server-reply we
-             wait a very small amount of time before doing so. If this
-             is not done client might get an ECONNRESET before reading
-             a single byte of server-reply. */
-          wait_ms(50);
+          if (rc < 0) {
+            logmsg("====> Client disconnect %d", req.connmon);
 
-        if(msgsock != CURL_SOCKET_BAD) {
-          sclose(msgsock);
-          msgsock = CURL_SOCKET_BAD;
-        }
+            if(req.connmon) {
+              const char *keepopen="[DISCONNECT]\n";
+              storerequest((char *)keepopen, strlen(keepopen));
+            }
 
-        if(serverlogslocked) {
-          serverlogslocked = 0;
-          clear_advisor_read_lock(SERVERLOGS_LOCK);
-        }
+            if(!req.open)
+              /* When instructed to close connection after server-reply we
+                 wait a very small amount of time before doing so. If this
+                 is not done client might get an ECONNRESET before reading
+                 a single byte of server-reply. */
+              wait_ms(50);
 
-        if (req.testno == DOCNUMBER_QUIT)
-          goto sws_cleanup;
+            if(all_sockets[socket_idx].fd != CURL_SOCKET_BAD) {
+              sclose(all_sockets[socket_idx].fd);
+              all_sockets[socket_idx].fd = CURL_SOCKET_BAD;
+            }
+
+            serverlogslocked -= 1;
+            if(!serverlogslocked)
+              clear_advisor_read_lock(SERVERLOGS_LOCK);
+
+            if (req.testno == DOCNUMBER_QUIT)
+              goto sws_cleanup;
+          }
+
+          /* Reset the request, unless we're still in the middle of reading */
+          if (rc != 0)
+            init_httprequest(&req);
+        } while (rc > 0);
+      }
+      else if (all_sockets[socket_idx].revents != 0) {
+        logmsg("unexpected poll event on socket %d: %d",
+               socket_idx, all_sockets[socket_idx].revents);
+        goto sws_cleanup;
       }
-    } while (rc > = 0);
+    }
+
+    if(got_exit_signal)
+      goto sws_cleanup;
   }
 
 sws_cleanup:
 
-  if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
-    sclose(msgsock);
+  for (socket_idx = 1; socket_idx < num_sockets; ++socket_idx)
+    if((all_sockets[socket_idx].fd != sock) &&
+     (all_sockets[socket_idx].fd != CURL_SOCKET_BAD))
+      sclose(all_sockets[socket_idx].fd);
 
   if(sock != CURL_SOCKET_BAD)
     sclose(sock);