From 674da8ae07e3780b393bd1aebb7b7f973433c29f Mon Sep 17 00:00:00 2001 From: Joe Mason Date: Thu, 2 Aug 2012 17:22:46 -0400 Subject: [PATCH] Add a polling loop in main to read from more than one socket at once. Add the O_NONBLOCK and SO_KEEPALIVE flag to all sockets. Note that several loops which used to continue on a return value of 0 (theoretical since 0 would never be returned without O_NONBLOCK) now break on 0 so that they won't continue reading until after poll is called again. --- tests/server/sws.c | 193 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 147 insertions(+), 46 deletions(-) diff --git a/tests/server/sws.c b/tests/server/sws.c index d60a52a54..fc0d0ddf3 100644 --- a/tests/server/sws.c +++ b/tests/server/sws.c @@ -50,6 +50,9 @@ #include /* for TCP_NODELAY */ #endif +#include +#include + #define ENABLE_CURLX_PRINTF /* make the curlx header define all printf() functions to use the curlx_* versions instead */ @@ -113,6 +116,11 @@ struct httprequest { int done_processing; }; +#define MAX_SOCKETS 1024 + +static struct pollfd all_sockets[MAX_SOCKETS]; +static nfds_t num_sockets = 0; + static int ProcessRequest(struct httprequest *req); static void storerequest(char *reqbuf, size_t totalsize); @@ -1708,7 +1716,12 @@ static int accept_connection(int sock) { curl_socket_t msgsock = CURL_SOCKET_BAD; int error; - int flag; + int flag = 1; + + if(MAX_SOCKETS == num_sockets) { + logmsg("Too many open sockets!"); + return CURL_SOCKET_BAD; + } msgsock = accept(sock, NULL, NULL); @@ -1729,23 +1742,45 @@ static int accept_connection(int sock) return CURL_SOCKET_BAD; } + if(0 != fcntl(msgsock, F_SETFL, O_NONBLOCK)) { + error = SOCKERRNO; + logmsg("fcntl(O_NONBLOCK) failed with error: (%d) %s", + error, strerror(error)); + sclose(msgsock); + return CURL_SOCKET_BAD; + } + + if(0 != setsockopt(msgsock, SOL_SOCKET, SO_KEEPALIVE, + (void *)&flag, sizeof(flag))) { + error = SOCKERRNO; + logmsg("setsockopt(SO_KEEPALIVE) failed with error: (%d) %s", + error, strerror(error)); + sclose(msgsock); + return CURL_SOCKET_BAD; + } + /* - ** As soon as this server acepts a connection from the test harness it + ** As soon as this server accepts a connection from the test harness it ** must set the server logs advisor read lock to indicate that server ** logs should not be read until this lock is removed by this server. */ - set_advisor_read_lock(SERVERLOGS_LOCK); - serverlogslocked = 1; + if(!serverlogslocked) + set_advisor_read_lock(SERVERLOGS_LOCK); + serverlogslocked += 1; logmsg("====> Client connect"); + all_sockets[num_sockets].fd = msgsock; + all_sockets[num_sockets].events = POLLIN; + all_sockets[num_sockets].revents = 0; + num_sockets += 1; + #ifdef TCP_NODELAY /* * Disable the Nagle algorithm to make it easier to send out a large * response in many small segments to torture the clients more. */ - flag = 1; if(0 != setsockopt(msgsock, IPPROTO_TCP, TCP_NODELAY, (void *)&flag, sizeof(flag))) logmsg("====> TCP_NODELAY failed"); @@ -1764,9 +1799,9 @@ static int service_connection(int msgsock, struct httprequest *req, if(got_exit_signal) return -1; - init_httprequest(req); while(!req->done_processing) { int rc = get_request(msgsock, req); + logmsg("get_request %d returned %d", msgsock, rc); if (rc <= 0) { /* Nothing further to read now (possibly because the socket was closed */ return rc; @@ -1827,7 +1862,6 @@ int main(int argc, char *argv[]) { srvr_sockaddr_union_t me; curl_socket_t sock = CURL_SOCKET_BAD; - curl_socket_t msgsock = CURL_SOCKET_BAD; int wrotepidfile = 0; int flag; unsigned short port = DEFAULT_PORT; @@ -1838,6 +1872,7 @@ int main(int argc, char *argv[]) int arg=1; long pid; const char *hostport = "127.0.0.1"; + nfds_t socket_idx; memset(&req, 0, sizeof(req)); @@ -1949,6 +1984,11 @@ int main(int argc, char *argv[]) sock = socket(AF_INET6, SOCK_STREAM, 0); #endif + all_sockets[0].fd = sock; + all_sockets[0].events = POLLIN; + all_sockets[0].revents = 0; + num_sockets = 1; + if(CURL_SOCKET_BAD == sock) { error = SOCKERRNO; logmsg("Error creating socket: (%d) %s", @@ -1964,6 +2004,12 @@ int main(int argc, char *argv[]) error, strerror(error)); goto sws_cleanup; } + if(0 != fcntl(sock, F_SETFL, O_NONBLOCK)) { + error = SOCKERRNO; + logmsg("fcntl(O_NONBLOCK) failed with error: (%d) %s", + error, strerror(error)); + goto sws_cleanup; + } #ifdef ENABLE_IPV6 if(!use_ipv6) { @@ -2011,59 +2057,114 @@ int main(int argc, char *argv[]) if(!wrotepidfile) goto sws_cleanup; - for (;;) { - do { - msgsock = accept_connection(sock); - if (CURL_SOCKET_BAD == msgsock) - goto sws_cleanup; - } while (msgsock >= 0); + /* initialization of httprequest struct is done before get_request(), but + the pipelining struct field must be initialized previously to FALSE + every time a new connection arrives. */ + + req.pipelining = FALSE; + init_httprequest(&req); + + for(;;) { + /* Clear out closed sockets */ + for (socket_idx = num_sockets - 1; socket_idx >= 1; --socket_idx) { + if (CURL_SOCKET_BAD == all_sockets[socket_idx].fd) { + char* dst = (char *) all_sockets + socket_idx; + char* src = (char *) all_sockets + socket_idx + 1; + char* end = (char *) all_sockets + num_sockets; + memmove(dst, src, end - src); + num_sockets -= 1; + } + } - /* initialization of httprequest struct is done before get_request(), but - the pipelining struct field must be initialized previously to FALSE - every time a new connection arrives. */ + rc = poll(all_sockets, num_sockets, -1); - req.pipelining = FALSE; + if (rc < 0) { + error = SOCKERRNO; + logmsg("poll() failed with error: (%d) %s", + error, strerror(error)); + goto sws_cleanup; + } - do { - rc = service_connection(msgsock, &req, sock, hostport); - if(got_exit_signal) - goto sws_cleanup; + /* Check if the listening socket is ready to accept */ + if ((all_sockets[0].revents & POLLIN) == POLLIN) { + /* Service all queued connections */ + curl_socket_t msgsock; + do { + msgsock = accept_connection(sock); + logmsg("accept_connection %d returned %d", sock, msgsock); + if (CURL_SOCKET_BAD == msgsock) + goto sws_cleanup; + } while (msgsock > 0); + } + else if (all_sockets[0].revents != 0) { + logmsg("unexpected poll event on listening socket: %d", + all_sockets[0].revents); + goto sws_cleanup; + } - if (rc < 0) { - logmsg("====> Client disconnect %d", req.connmon); + /* Service all connections that are ready */ + for (socket_idx = 1; socket_idx < num_sockets; ++socket_idx) { + if ((all_sockets[socket_idx].revents & POLLIN) == POLLIN) { + if(got_exit_signal) + goto sws_cleanup; - if(req.connmon) { - const char *keepopen="[DISCONNECT]\n"; - storerequest((char *)keepopen, strlen(keepopen)); - } + /* Service this connection until it has nothing available */ + do { + rc = service_connection(all_sockets[socket_idx].fd, &req, sock, hostport); + logmsg("service_connection %d returned %d", all_sockets[socket_idx].fd, rc); + if(got_exit_signal) + goto sws_cleanup; - if(!req.open) - /* When instructed to close connection after server-reply we - wait a very small amount of time before doing so. If this - is not done client might get an ECONNRESET before reading - a single byte of server-reply. */ - wait_ms(50); + if (rc < 0) { + logmsg("====> Client disconnect %d", req.connmon); - if(msgsock != CURL_SOCKET_BAD) { - sclose(msgsock); - msgsock = CURL_SOCKET_BAD; - } + if(req.connmon) { + const char *keepopen="[DISCONNECT]\n"; + storerequest((char *)keepopen, strlen(keepopen)); + } - if(serverlogslocked) { - serverlogslocked = 0; - clear_advisor_read_lock(SERVERLOGS_LOCK); - } + if(!req.open) + /* When instructed to close connection after server-reply we + wait a very small amount of time before doing so. If this + is not done client might get an ECONNRESET before reading + a single byte of server-reply. */ + wait_ms(50); - if (req.testno == DOCNUMBER_QUIT) - goto sws_cleanup; + if(all_sockets[socket_idx].fd != CURL_SOCKET_BAD) { + sclose(all_sockets[socket_idx].fd); + all_sockets[socket_idx].fd = CURL_SOCKET_BAD; + } + + serverlogslocked -= 1; + if(!serverlogslocked) + clear_advisor_read_lock(SERVERLOGS_LOCK); + + if (req.testno == DOCNUMBER_QUIT) + goto sws_cleanup; + } + + /* Reset the request, unless we're still in the middle of reading */ + if (rc != 0) + init_httprequest(&req); + } while (rc > 0); + } + else if (all_sockets[socket_idx].revents != 0) { + logmsg("unexpected poll event on socket %d: %d", + socket_idx, all_sockets[socket_idx].revents); + goto sws_cleanup; } - } while (rc > = 0); + } + + if(got_exit_signal) + goto sws_cleanup; } sws_cleanup: - if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD)) - sclose(msgsock); + for (socket_idx = 1; socket_idx < num_sockets; ++socket_idx) + if((all_sockets[socket_idx].fd != sock) && + (all_sockets[socket_idx].fd != CURL_SOCKET_BAD)) + sclose(all_sockets[socket_idx].fd); if(sock != CURL_SOCKET_BAD) sclose(sock); -- 2.40.0