]> granicus.if.org Git - pdns/commitdiff
Recursor HTTP: Fix processing of split requests
authorChristian Hofstaedtler <christian@hofstaedtler.name>
Tue, 25 Mar 2014 16:31:42 +0000 (17:31 +0100)
committerChristian Hofstaedtler <christian@hofstaedtler.name>
Tue, 25 Mar 2014 17:07:51 +0000 (18:07 +0100)
Requests that were split over multiple TCP packets (-> reads) were
not processed correctly. Also, writes are now integrated into MPlexer.

13 files changed:
pdns/lwres.cc
pdns/pdns_recursor.cc
pdns/session.cc
pdns/session.hh
pdns/sstuff.hh
pdns/syncres.hh
pdns/webserver.cc
pdns/webserver.hh
pdns/ws-auth.cc
pdns/ws-recursor.cc
pdns/ws-recursor.hh
regression-tests.api/test_Basics.py
regression-tests.api/test_helper.py

index 25718dfe632e4b3a848d14a87ecde655605a0869..a68e86aac3a16b5ce0c2f6aa964d109e44a3065e 100644 (file)
@@ -122,14 +122,14 @@ int asyncresolve(const ComboAddress& ip, const string& domain, int type, bool do
         return ret;
       
       packet.clear();
-      ret=arecvtcp(packet, 2, &s);
+      ret=arecvtcp(packet, 2, &s, false);
       if(!(ret > 0))
         return ret;
       
       memcpy(&tlen, packet.c_str(), 2);
       len=ntohs(tlen); // switch to the 'len' shared with the rest of the function
       
-      ret=arecvtcp(packet, len, &s);
+      ret=arecvtcp(packet, len, &s, false);
       if(!(ret > 0))
         return ret;
       
index adab81423aac461d28d05254c03c117c88106263..f02b37a6e64b250d2bdfc7504ac8ce129198bade 100644 (file)
@@ -188,12 +188,13 @@ int asendtcp(const string& data, Socket* sock)
 void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
 
 // -1 is error, 0 is timeout, 1 is success
-int arecvtcp(string& data, int len, Socket* sock
+int arecvtcp(string& data, int len, Socket* sock, bool incompleteOkay)
 {
   data.clear();
   PacketID pident;
   pident.sock=sock;
   pident.inNeeded=len;
+  pident.inIncompleteOkay=incompleteOkay;
   t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
 
   int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
@@ -1400,7 +1401,7 @@ void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
   if(ret > 0) {
     pident->inMSG.append(&buffer[0], &buffer[ret]);
     pident->inNeeded-=ret;
-    if(!pident->inNeeded) {
+    if(!pident->inNeeded || pident->inIncompleteOkay) {
       //      cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
       PacketID pid=*pident;
       string msg=pident->inMSG;
index 0634647ceb6b7e5444a5c2599e83cbf11d4869eb..ce7ac432de43e3c116bef196f8991aa6d17d117b 100644 (file)
@@ -1,6 +1,6 @@
 /*
     PowerDNS Versatile Database Driven Nameserver
-    Copyright (C) 2002 - 2012 PowerDNS.COM BV
+    Copyright (C) 2002 - 2014 PowerDNS.COM BV
 
     This program is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License version 2
 #include "misc.hh"
 #include "iputils.hh"
 
-Session::Session(int s, ComboAddress r) : d_timeout(10), d_good(true)
+Socket* Server::accept()
 {
-  d_remote=r;
-  d_socket=s;
+  return d_server_socket.accept();
 }
-
-Session::Session() : d_socket(-1), d_timeout(10), d_good(false)
-{
-}
-
-int Session::close()
-{
-  int rc=0;
-  
-  if(d_socket>=0)
-    rc=Utility::closesocket(d_socket);
-
-  d_socket=-1;
-  return rc;
-}
-
-Session::~Session()
-{
-  /* NOT CLOSING AUTOMATICALLY ANYMORE!
-    if(d_socket>=0)
-    ::close(d_socket);
-  */  
-}
-
-//! This function makes a deep copy of Session
-Session::Session(const Session &s)
-{
-  d_socket=s.d_socket;
-  d_remote=s.d_remote;
-  d_good=s.d_good;
-  d_timeout=s.d_timeout;
-}
-
-void Session::setTimeout(unsigned int seconds)
-{
-  d_timeout=seconds;
-}
-
-bool Session::put(const string &s)
-{
-  int length=s.length();
-  int written=0;
-  int err;
-
-  while(written < length)
-    {
-      err=waitForRWData(d_socket, false, d_timeout, 0);
-      if(err<=0)
-        throw SessionException("nonblocking write failed: "+string(strerror(errno)));
-
-      err = send(d_socket, s.c_str() + written, length-written, 0);
-
-      if(err < 0)
-        return false;
-      
-      written+=err;
-    }
-
-  return true;
-}
-
-static int timeoutRead(int s, char *buf, size_t len, int timeout)
-{
-  int err = waitForRWData(s, true, timeout, 0);
-  
-  if(!err)
-    throw SessionTimeoutException("timeout reading");
-  if(err < 0)
-    throw SessionException("nonblocking read failed: "+string(strerror(errno)));
-  
-  return recv(s,buf,len,0);
-}
-
-bool Session::good()
-{
-  return d_good;
-}
-
-size_t Session::read(char* buf, size_t len)
-{
-  int bytes;
-  bytes = timeoutRead(d_socket, buf, len, d_timeout);
-
-  if(bytes<0)
-    throw SessionException("error on read from socket: "+string(strerror(errno)));
-
-  if(bytes==0)
-    d_good = false;
-
-  return bytes;
-}
-
-int Session::getSocket()
-{
-  return d_socket;
-}
-
-Session Server::accept()
-{
-  ComboAddress remote;
-  remote.sin4.sin_family = AF_INET6;
-  socklen_t remlen = remote.getSocklen();
-
-  int socket=-1;
-
-  while((socket=::accept(s, (struct sockaddr *)&remote, &remlen))==-1) // repeat until we have a successful connect
-    {
-      //      L<<Logger::Error<<"accept() returned: "<<strerror(errno)<<endl;
-      if(errno==EMFILE) {
-        throw SessionException("Out of file descriptors - won't recover from that");
-      }
-
-    }
-
-  Session session(socket, remote);
-  return session;
-}
-
-void Server::asyncNewConnection()
-{
-  try {
-    d_asyncNewConnectionCallback(accept());
-  } catch (SessionException &e) {
-    // we're running in a shared process/thread, so can't just terminate/abort.
-    return;
-  }
-}
-
-void Server::asyncWaitForConnections(FDMultiplexer* fdm, const newconnectioncb_t& callback)
-{
-  d_asyncNewConnectionCallback = callback;
-  fdm->addReadFD(s, boost::bind(&Server::asyncNewConnection, this));
-}
-
-Server::Server(const string &localaddress, int port)
-{
-  d_local = ComboAddress(localaddress.empty() ? "0.0.0.0" : localaddress, port);
-  s = socket(d_local.sin4.sin_family ,SOCK_STREAM,0);
-
-  if(s < 0)
-    throw SessionException(string("socket: ")+strerror(errno));
-
-  Utility::setCloseOnExec(s);
-
-  int tmp=1;
-  if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&tmp, static_cast<unsigned>(sizeof tmp))<0)
-    throw SessionException(string("Setsockopt failed: ")+strerror(errno));
-
-  if(bind(s, (sockaddr*)&d_local, d_local.getSocklen())<0)
-    throw SessionException("binding to "+d_local.toStringWithPort()+": "+strerror(errno));
-  
-  if(listen(s,128)<0)
-    throw SessionException("listen: "+stringerror());
-}
-
index be854578e4ac75c4be2c8ed551e088b35576d0e7..7a32a3853e464a7920f6933946125f7bc2cd0b15 100644 (file)
@@ -1,6 +1,6 @@
 /*
     PowerDNS Versatile Database Driven Nameserver
-    Copyright (C) 2002 - 2013  PowerDNS.COM BV
+    Copyright (C) 2002 - 2014  PowerDNS.COM BV
 
     This program is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License version 2
 #include <sys/types.h>
 #include <strings.h>
 
+#include "sstuff.hh"
 #include "iputils.hh"
-#include "pdnsexception.hh"
 #include "mplexer.hh"
+#include "syncres.hh"
 
-class SessionException: public PDNSException
-{
-public:
-  SessionException(const string &reason) : PDNSException(reason){}
-};
-
-class SessionTimeoutException: public SessionException
-{
-public:
-  SessionTimeoutException(const string &reason) : SessionException(reason){}
-};
-
-//! The Session class represents a TCP/IP session, which can either be created or run on an existing socket
-class Session
-{
-public:
-  bool put(const string &s);
-  bool good();
-  size_t read(char* buf, size_t len);
-
-  Session(int s, ComboAddress r); //!< Start a session on an existing socket, and inform this class of the remotes name
-
-  /** Create a session to a remote host and port. This function reads a timeout value from the ArgvMap class 
-      and does a nonblocking connect to support this timeout. It should be noted that nonblocking connects 
-      suffer from bad portability problems, so look here if you see weird problems on new platforms */
-  Session(const string &remote, int port, int timeout=0); 
-
-  Session(const Session &s); 
-  Session();
-  
-  ~Session();
-  int getSocket(); //!< return the filedescriptor for layering violations
-  int close(); //!< close and disconnect the connection
-  void setTimeout(unsigned int seconds);
-private:
-  int d_socket;
-  ComboAddress d_remote;
-  int d_timeout;
-  bool d_good;
-};
 
 //! The server class can be used to create listening servers
 class Server
 {
 public:
-  Server(const string &localaddress, int port);
-  ComboAddress d_local;
+  Server(const string &localaddress, int port) : d_local(localaddress.empty() ? "0.0.0.0" : localaddress, port), d_server_socket(InterNetwork, Stream, 0) {
+    d_server_socket.setReuseAddr();
+    d_server_socket.bind(d_local);
+    d_server_socket.listen();
+  }
 
-  Session accept(); //!< Call accept() in an endless loop to accept new connections
+  ComboAddress d_local;
 
-  typedef boost::function< void(Session) > newconnectioncb_t;
-  void asyncWaitForConnections(FDMultiplexer* fdm, const newconnectioncb_t& callback);
+  Socket *accept(); //!< Call accept() in an endless loop to accept new connections
 
-private:
-  int s;
-  void asyncNewConnection();
-  newconnectioncb_t d_asyncNewConnectionCallback;
+protected:
+  Socket d_server_socket;
 };
 
 #endif /* SESSION_HH */
index e674548eee104279a5fa174c37b3d1eb093cfcdb..40aa53d9c541721bf040150778207f280544df76 100644 (file)
@@ -87,6 +87,13 @@ public:
     Utility::setNonBlocking(d_socket);
   }
 
+  void setReuseAddr()
+  {
+    int tmp = 1;
+    if (setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (char*)&tmp, static_cast<unsigned>(sizeof tmp))<0)
+      throw NetworkError(string("Setsockopt failed: ")+strerror(errno));
+  }
+
   //! Bind the socket to a specified endpoint
   void bind(const ComboAddress &local)
   {
@@ -222,6 +229,33 @@ public:
     return res;
   }
 
+  void writenWithTimeout(const void *buffer, unsigned int n, int timeout)
+  {
+    unsigned int bytes=n;
+    const char *ptr = (char*)buffer;
+    int ret;
+    while(bytes) {
+      ret=::write(d_socket, ptr, bytes);
+      if(ret < 0) {
+        if(errno==EAGAIN) {
+          ret=waitForRWData(d_socket, false, timeout, 0);
+          if(ret < 0)
+            throw NetworkError("Waiting for data write");
+          if(!ret)
+            throw NetworkError("Timeout writing data");
+          continue;
+        }
+        else
+          throw NetworkError("Writing data: "+stringerror());
+      }
+      if(!ret) {
+        throw NetworkError("Did not fulfill TCP write due to EOF");
+      }
+
+      ptr += ret;
+      bytes -= ret;
+    }
+  }
 
   //! reads one character from the socket 
   int getChar()
@@ -261,7 +295,18 @@ public:
     if(res<0) 
       throw NetworkError("Reading from a socket: "+string(strerror(errno)));
     return res;
+  }
+
+  int readWithTimeout(char* buffer, int n, int timeout)
+  {
+    int err = waitForRWData(d_socket, true, timeout, 0);
+
+    if(err == 0)
+      throw NetworkError("timeout reading");
+    if(err < 0)
+      throw NetworkError("nonblocking read failed: "+string(strerror(errno)));
 
+    return read(buffer, n);
   }
 
   //! Sets the socket to listen with a default listen backlog of 10 bytes 
index 67beafcee360bff924ca4590a6160323ff2f00e7..428ee58f558e0dc212cc7386c36fc0a3c3a3ab25 100644 (file)
@@ -471,12 +471,12 @@ extern __thread SyncRes::StaticStorage* t_sstorage;
 class Socket;
 /* external functions, opaque to us */
 int asendtcp(const string& data, Socket* sock);
-int arecvtcp(string& data, int len, Socket* sock);
+int arecvtcp(string& data, int len, Socket* sock, bool incompleteOkay);
 
 
 struct PacketID
 {
-  PacketID() : id(0), type(0), sock(0), inNeeded(0), outPos(0), nearMisses(0), fd(-1)
+  PacketID() : id(0), type(0), sock(0), inNeeded(0), inIncompleteOkay(false), outPos(0), nearMisses(0), fd(-1)
   {
     memset(&remote, 0, sizeof(remote));
   }
@@ -489,6 +489,7 @@ struct PacketID
   Socket* sock;  // or wait for an event on a TCP fd
   int inNeeded; // if this is set, we'll read until inNeeded bytes are read
   string inMSG; // they'll go here
+  bool inIncompleteOkay;
 
   string outMSG; // the outgoing message that needs to be sent
   string::size_type outPos;    // how far we are along in the outMSG
@@ -535,7 +536,6 @@ extern __thread RecursorPacketCache* t_packetCache;
 typedef MTasker<PacketID,string> MT_t;
 extern __thread MT_t* MT;
 
-
 struct RecursorStats
 {
   uint64_t servFails;
index b3f9f59ebd5a81c5b118c574ec6fea93edade151..09a177403a8d0ec04dce8eb487664645c4b43277 100644 (file)
 #include "dns.hh"
 #include "base64.hh"
 #include "json.hh"
-#include "mplexer.hh"
-
-const char* INVALID_REQUEST_RESPONSE = "HTTP/1.0 400 Bad Request\r\nConnection: close\r\n\r\nYour Browser sent a request that this server failed to understand.\r\n";
 
 struct connectionThreadData {
   WebServer* webServer;
-  Session client;
+  Socket* client;
 };
 
 void HttpRequest::json(rapidjson::Document& document)
@@ -178,8 +175,7 @@ static void *WebServerConnectionThreadStart(void *p) {
   pthread_detach(pthread_self());
   data->webServer->serveConnection(data->client);
 
-  data->client.close();
-
+  delete data->client; // close socket
   delete data;
 
   return NULL;
@@ -188,12 +184,17 @@ static void *WebServerConnectionThreadStart(void *p) {
 HttpResponse WebServer::handleRequest(HttpRequest req)
 {
   HttpResponse resp(req);
+
   // set default headers
   resp.headers["Content-Type"] = "text/html; charset=utf-8";
 
-  L<<Logger::Debug<<"HTTP: Handling request \"" << req.url.path << "\"" << endl;
-
   try {
+    if (!req.complete) {
+      throw HttpBadRequestException();
+    }
+
+    L<<Logger::Debug<<"HTTP: Handling request \"" << req.url.path << "\"" << endl;
+
     YaHTTP::strstr_map_t::iterator header;
 
     if ((header = req.headers.find("accept")) != req.headers.end()) {
@@ -273,52 +274,45 @@ HttpResponse WebServer::handleRequest(HttpRequest req)
   return resp;
 }
 
-void WebServer::serveConnection(Session client)
+void WebServer::serveConnection(Socket *client)
 try {
   HttpRequest req;
   YaHTTP::AsyncRequestLoader yarl(&req);
+  int timeout = 5;
+  client->setNonBlocking();
 
-  client.setTimeout(5);
-
-  bool complete = false;
   try {
-    while(client.good()) {
+    while(!req.complete) {
       int bytes;
       char buf[1024];
-      bytes = client.read(buf, sizeof(buf));
-      if (bytes) {
+      bytes = client->readWithTimeout(buf, sizeof(buf), timeout);
+      if (bytes > 0) {
         string data = string(buf, bytes);
-        if (yarl.feed(data)) {
-          complete = true;
-          break;
-        }
+        req.complete = yarl.feed(data);
+      } else {
+        // read error OR EOF
+        break;
       }
     }
   } catch (YaHTTP::ParseError &e) {
-    complete = false;
-  }
-
-  if (!complete) {
-    client.put(INVALID_REQUEST_RESPONSE);
-    return;
+    // request stays incomplete
   }
 
   HttpResponse resp = WebServer::handleRequest(req);
   ostringstream ss;
   resp.write(ss);
-  client.put(ss.str());
-}
-catch(SessionTimeoutException &e) {
-  // L<<Logger::Error<<"Timeout in webserver"<<endl;
+  string reply = ss.str();
+
+  client->writenWithTimeout(reply.c_str(), reply.size(), timeout);
 }
 catch(PDNSException &e) {
-  L<<Logger::Error<<"Exception in webserver: "<<e.reason<<endl;
+  L<<Logger::Error<<"HTTP Exception: "<<e.reason<<endl;
 }
 catch(std::exception &e) {
-  L<<Logger::Error<<"STL Exception in webserver: "<<e.what()<<endl;
+  L<<Logger::Error<<"HTTP STL Exception: "<<e.what()<<endl;
 }
 catch(...) {
-  L<<Logger::Error<<"Unknown exception in webserver"<<endl;
+  L<<Logger::Error<<"HTTP: Unknown exception"<<endl;
 }
 
 WebServer::WebServer(const string &listenaddress, int port, const string &password)
@@ -326,11 +320,16 @@ WebServer::WebServer(const string &listenaddress, int port, const string &passwo
   d_listenaddress=listenaddress;
   d_port=port;
   d_password=password;
+}
+
+void WebServer::bind()
+{
   try {
-    d_server = new Server(d_listenaddress, d_port);
+    d_server = createServer();
+    L<<Logger::Warning<<"Listening for HTTP requests on "<<d_server->d_local.toStringWithPort()<<endl;
   }
-  catch(SessionException &e) {
-    L<<Logger::Error<<"Fatal error in webserver: "<<e.reason<<endl;
+  catch(NetworkError &e) {
+    L<<Logger::Error<<"Listening on HTTP socket failed: "<<e.what()<<endl;
     d_server = NULL;
   }
 }
@@ -341,23 +340,15 @@ void WebServer::go()
     return;
   try {
     pthread_t tid;
-    
-    L<<Logger::Error<<"Launched webserver on " << d_server->d_local.toStringWithPort() <<endl;
 
     while(true) {
-      // will be freed by thread
+      // data and data->client will be freed by thread
       connectionThreadData *data = new connectionThreadData;
       data->webServer = this;
       data->client = d_server->accept();
       pthread_create(&tid, 0, &WebServerConnectionThreadStart, (void *)data);
     }
   }
-  catch(SessionTimeoutException &e) {
-    //    L<<Logger::Error<<"Timeout in webserver"<<endl;
-  }
-  catch(PDNSException &e) {
-    L<<Logger::Error<<"Exception in main webserver thread: "<<e.reason<<endl;
-  }
   catch(std::exception &e) {
     L<<Logger::Error<<"STL Exception in main webserver thread: "<<e.what()<<endl;
   }
@@ -365,74 +356,4 @@ void WebServer::go()
     L<<Logger::Error<<"Unknown exception in main webserver thread"<<endl;
   }
   exit(1);
-
-}
-
-void AsyncWebServer::go()
-{
-  if (!d_server)
-    return;
-
-  d_server->asyncWaitForConnections(d_fdm, boost::bind(&AsyncWebServer::newConnection, this, _1));
-}
-
-void AsyncWebServer::newConnection(Session session)
-{
-  int fd = session.getSocket();
-  Utility::setNonBlocking(fd);
-  d_fdm->addReadFD(fd, boost::bind(&AsyncWebServer::serveConnection, this, session));
-}
-
-void AsyncWebServer::serveConnection(Session session)
-{
-  int fd = session.getSocket();
-  d_fdm->removeReadFD(fd);
-
-  try {
-    char buffer[16384];
-    int res = read(fd, buffer, sizeof(buffer)-1);
-    if (res <= 0) {
-      throw PDNSException("Reading from client failed");
-      return;
-    }
-    buffer[res]=0;
-
-    HttpRequest req;
-    YaHTTP::AsyncRequestLoader yarl(&req);
-
-    bool complete = false;
-    string reply;
-
-    try {
-      if (yarl.feed(buffer)) {
-        complete = true;
-      }
-    } catch (YaHTTP::ParseError &e) {
-      complete = false;
-    }
-
-    if (complete) {
-      HttpResponse resp = handleRequest(req);
-      ostringstream ss;
-      resp.write(ss);
-      reply = ss.str();
-    } else {
-      reply = INVALID_REQUEST_RESPONSE;
-    }
-
-    Utility::setBlocking(fd);
-    writen2(fd, reply.c_str(), reply.length());
-    Utility::setNonBlocking(fd);
-  }
-  catch(PDNSException &e) {
-    L<<Logger::Error<<"Exception in webserver: "<<e.reason<<endl;
-  }
-  catch(std::exception &e) {
-    L<<Logger::Error<<"STL Exception in webserver: "<<e.what()<<endl;
-  }
-  catch(...) {
-    L<<Logger::Error<<"Unknown exception in webserver"<<endl;
-  }
-
-  close(fd);
 }
index 9dc7a397b90c0bf1467bca6735eab280fa7ce034..1c046420b6bf8d30c0d3baf231723415118ba34c 100644 (file)
 #include "rapidjson/stringbuffer.h"
 #include "rapidjson/writer.h"
 #include "namespaces.hh"
-
-class Server;
-class Session;
+#include "sstuff.hh"
+#include "session.hh"
 
 class HttpRequest : public YaHTTP::Request {
 public:
-  HttpRequest() : YaHTTP::Request(), accept_json(false), accept_html(false) { };
+  HttpRequest() : YaHTTP::Request(), accept_json(false), accept_html(false), complete(false) { };
 
   map<string,string> path_parameters;
   bool accept_json;
   bool accept_html;
+  bool complete;
   void json(rapidjson::Document& document);
 };
 
@@ -110,9 +110,10 @@ class WebServer : public boost::noncopyable
 {
 public:
   WebServer(const string &listenaddress, int port, const string &password="");
+  void bind();
   void go();
 
-  void serveConnection(Session client);
+  void serveConnection(Socket *client);
   HttpResponse handleRequest(HttpRequest request);
 
   typedef boost::function<void(HttpRequest* req, HttpResponse* resp)> HandlerFunction;
@@ -130,6 +131,10 @@ protected:
   static int B64Decode(const std::string& strInput, std::string& strOutput);
   bool route(const std::string& url, std::map<std::string, std::string>& urlArgs, HandlerFunction** handler);
 
+  virtual Server* createServer() {
+    return new Server(d_listenaddress, d_port);
+  }
+
   string d_listenaddress;
   int d_port;
   std::list<HandlerRegistration> d_handlers;
@@ -137,20 +142,4 @@ protected:
   Server* d_server;
 };
 
-class FDMultiplexer;
-
-class AsyncWebServer : public WebServer
-{
-public:
-  AsyncWebServer(FDMultiplexer* fdm, const string &listenaddress, int port, const string &password="") :
-    WebServer(listenaddress, port, password), d_fdm(fdm) { };
-  void go();
-
-private:
-  FDMultiplexer* d_fdm;
-
-  void newConnection(Session session);
-  void serveConnection(Session session);
-};
-
 #endif /* WEBSERVER_HH */
index 9569e14ee34b27618d414e0b1379786b8736d76b..551a1c6045037c70a2d0e65c884438a0068e960c 100644 (file)
@@ -55,8 +55,10 @@ AuthWebServer::AuthWebServer()
   d_min10=d_min5=d_min1=0;
   d_ws = 0;
   d_tid = 0;
-  if(arg().mustDo("webserver"))
+  if(arg().mustDo("webserver")) {
     d_ws = new WebServer(arg()["webserver-address"], arg().asNum("webserver-port"),arg()["webserver-password"]);
+    d_ws->bind();
+  }
 }
 
 void AuthWebServer::go()
index fd22a1de912a0000123cc15e317a2ea27ccdc8db..e62a210b49840f7c8f674a59eca646922a258500 100644 (file)
@@ -1,6 +1,6 @@
 /*
     PowerDNS Versatile Database Driven Nameserver
-    Copyright (C) 2003 - 2012  PowerDNS.COM BV
+    Copyright (C) 2003 - 2014  PowerDNS.COM BV
 
     This program is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License version 2
@@ -37,6 +37,8 @@
 #include "ws-api.hh"
 #include "logger.hh"
 
+extern __thread FDMultiplexer* t_fdm;
+
 using namespace rapidjson;
 
 void productServerStatisticsFetch(map<string,string>& out)
@@ -408,6 +410,7 @@ RecursorWebServer::RecursorWebServer(FDMultiplexer* fdm)
   }
 
   d_ws = new AsyncWebServer(fdm, arg()["experimental-webserver-address"], arg().asNum("experimental-webserver-port"), arg()["experimental-webserver-password"]);
+  d_ws->bind();
 
   // legacy dispatch
   d_ws->registerApiHandler("/jsonstat", boost::bind(&RecursorWebServer::jsonstat, this, _1, _2));
@@ -541,3 +544,67 @@ void RecursorWebServer::jsonstat(HttpRequest* req, HttpResponse *resp)
     resp->body = returnJsonError("Not found");
   }
 }
+
+
+void AsyncServerNewConnectionMT(void *p) {
+  AsyncServer *server = (AsyncServer*)p;
+  try {
+    Socket* socket = server->accept();
+    server->d_asyncNewConnectionCallback(socket);
+    delete socket;
+  } catch (NetworkError &e) {
+    // we're running in a shared process/thread, so can't just terminate/abort.
+    return;
+  }
+}
+
+void AsyncServer::asyncWaitForConnections(FDMultiplexer* fdm, const newconnectioncb_t& callback)
+{
+  d_asyncNewConnectionCallback = callback;
+  fdm->addReadFD(d_server_socket.getHandle(), boost::bind(&AsyncServer::newConnection, this));
+}
+
+void AsyncServer::newConnection()
+{
+  MT->makeThread(&AsyncServerNewConnectionMT, this);
+}
+
+
+void AsyncWebServer::serveConnection(Socket *client)
+{
+  HttpRequest req;
+  YaHTTP::AsyncRequestLoader yarl(&req);
+  client->setNonBlocking();
+
+  string data;
+  try {
+    while(!req.complete) {
+      data.empty();
+      int bytes = arecvtcp(data, 16384, client, true);
+      if (bytes > 0) {
+        req.complete = yarl.feed(data);
+      } else {
+        // read error OR EOF
+        break;
+      }
+    }
+  } catch (YaHTTP::ParseError &e) {
+    // request stays incomplete
+  }
+
+  HttpResponse resp = handleRequest(req);
+  ostringstream ss;
+  resp.write(ss);
+  data = ss.str();
+
+  // now send the reply
+  if (asendtcp(data, client) == -1 || data.empty()) {
+    L<<Logger::Error<<"Failed sending reply to HTTP client"<<endl;
+  }
+}
+
+void AsyncWebServer::go() {
+  if (!d_server)
+    return;
+  ((AsyncServer*)d_server)->asyncWaitForConnections(d_fdm, boost::bind(&AsyncWebServer::serveConnection, this, _1));
+}
index f729e6b71bc53c18c8accfb83c60d1bb1847121c..a8052a5378e5ba35b32621551e7efbc318e18b2b 100644 (file)
@@ -1,6 +1,6 @@
 /*
     PowerDNS Versatile Database Driven Nameserver
-    Copyright (C) 2003 - 2011  PowerDNS.COM BV
+    Copyright (C) 2003 - 2014  PowerDNS.COM BV
 
     This program is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License version 2 
 #include <boost/utility.hpp> 
 #include "namespaces.hh"
 #include "mplexer.hh"
+#include "session.hh"
+#include "webserver.hh"
 
-class AsyncWebServer;
 class HttpRequest;
 class HttpResponse;
 
+class AsyncServer : public Server {
+public:
+  AsyncServer(const string &localaddress, int port) : Server(localaddress, port) { };
+
+  friend void AsyncServerNewConnectionMT(void *p);
+
+  typedef boost::function< void(Socket*) > newconnectioncb_t;
+  void asyncWaitForConnections(FDMultiplexer* fdm, const newconnectioncb_t& callback);
+
+private:
+  void newConnection();
+
+  newconnectioncb_t d_asyncNewConnectionCallback;
+};
+
+class AsyncWebServer : public WebServer
+{
+public:
+  AsyncWebServer(FDMultiplexer* fdm, const string &listenaddress, int port, const string &password="") :
+    WebServer(listenaddress, port, password), d_fdm(fdm) { };
+  void go();
+
+private:
+  FDMultiplexer* d_fdm;
+  void serveConnection(Socket *socket);
+
+protected:
+  virtual Server* createServer() {
+    return new AsyncServer(d_listenaddress, d_port);
+  };
+};
+
 class RecursorWebServer : public boost::noncopyable
 {
 public:
@@ -36,5 +69,3 @@ public:
 private:
   AsyncWebServer* d_ws;
 };
-
-string returnJSONStats(const map<string, string>& items);
index ece0e3e81b2e3021f43b651c00ef0a5172f6e17c..dd3de3ef7c2eefb158ba1f43b76ad5d4aa8237d3 100644 (file)
@@ -1,5 +1,8 @@
 import unittest
 import requests
+import socket
+import pprint
+import time
 from test_helper import ApiTestCase
 
 
@@ -8,3 +11,26 @@ class TestBasics(ApiTestCase):
     def test_Unauth(self):
         r = requests.get(self.url("/servers/localhost"))
         self.assertEquals(r.status_code, requests.codes.unauthorized)
+
+    def test_SplitRequest(self):
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        s.connect((self.server_address, self.server_port))
+
+        parts = ("GET / HTTP/1.0\r\n", "Content-Type: text/plain\r\n\r\n")
+
+        print("Sending request")
+        for part in parts:
+            print("Sending %s" % part)
+            s.sendall(part)
+            time.sleep(0.5)
+
+        resp = s.recv(4096, socket.MSG_WAITALL)
+        s.close()
+
+        print "response", repr(resp)
+
+        status = resp.splitlines(0)[0]
+        if '400' in status:
+            raise Exception('Got unwanted response: %s' % status)
+            print 'Got', status
index b513bb7025441b721b1e998851d139abd61f201d..d44eea95648b5b692a095057112e02a46677efde 100644 (file)
@@ -11,7 +11,9 @@ class ApiTestCase(unittest.TestCase):
 
     def setUp(self):
         # TODO: config
-        self.server_url = 'http://127.0.0.1:%s/' % (os.environ.get('WEBPORT', '5580'))
+        self.server_address = '127.0.0.1'
+        self.server_port = int(os.environ.get('WEBPORT', '5580'))
+        self.server_url = 'http://%s:%s/' % (self.server_address, self.server_port)
         self.session = requests.Session()
         self.session.auth = ('admin', os.environ.get('WEBPASSWORD', 'changeme'))