]> granicus.if.org Git - pdns/commitdiff
auth: Emulate a buffered read in the pipe backend, ~3x faster
authorRemi Gacogne <remi.gacogne@powerdns.com>
Sun, 4 Aug 2019 20:58:25 +0000 (22:58 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Sun, 4 Aug 2019 20:58:25 +0000 (22:58 +0200)
modules/pipebackend/coprocess.cc
modules/pipebackend/coprocess.hh
modules/pipebackend/pipebackend.cc

index db0ed5c1833ce2a6d05d5baafe4beef844a6a04f..2da3dacd7ed896dc532972a767b8e52bba30d0ca 100644 (file)
 #include <boost/algorithm/string.hpp>
 #include <vector>
 
-CoProcess::CoProcess(const string &command,int timeout, int infd, int outfd)
+CoProcess::CoProcess(const string &command,int timeout, int infd, int outfd): d_infd(infd), d_outfd(outfd), d_timeout(timeout)
 {
-  vector <string> v;
-  split(v, command, is_any_of(" "));
+  split(d_params, command, is_any_of(" "));
 
-  std::vector<const char *>argv(v.size()+1);
-  argv[v.size()]=0;
+  d_argv.resize(d_params.size()+1);
+  d_argv[d_params.size()]=nullptr;
 
-  for (size_t n = 0; n < v.size(); n++)
-    argv[n]=v[n].c_str();
-  // we get away with not copying since nobody resizes v 
-  launch(argv.data(), timeout, infd, outfd);
+  for (size_t n = 0; n < d_params.size(); n++) {
+    d_argv[n]=d_params[n].c_str();
+  }
 }
 
-void CoProcess::launch(const char **argv, int timeout, int infd, int outfd)
+void CoProcess::launch()
 {
-  d_timeout=timeout;
-  d_infd=infd;
-  d_outfd=outfd;
-
   signal(SIGPIPE, SIG_IGN);
 
-  if(access(argv[0],X_OK)) // check before fork so we can throw
-    throw PDNSException("Command '"+string(argv[0])+"' cannot be executed: "+stringerror());
+  if(access(d_argv[0],X_OK)) // check before fork so we can throw
+    throw PDNSException("Command '"+string(d_argv[0])+"' cannot be executed: "+stringerror());
 
   if(pipe(d_fd1)<0 || pipe(d_fd2)<0)
     throw PDNSException("Unable to open pipe for coprocess: "+string(strerror(errno)));
@@ -71,33 +65,34 @@ void CoProcess::launch(const char **argv, int timeout, int infd, int outfd)
   if((d_pid=fork())<0)
     throw PDNSException("Unable to fork for coprocess: "+stringerror());
   else if(d_pid>0) { // parent speaking
+    // no need to keep this around
+    d_argv.clear();
     close(d_fd1[0]);
     setCloseOnExec(d_fd1[1]);
     close(d_fd2[1]);
     setCloseOnExec(d_fd2[0]);
-    if(!(d_fp=fdopen(d_fd2[0],"r")))
-      throw PDNSException("Unable to associate a file pointer with pipe: "+stringerror());
-    if( d_timeout)
-      setbuf(d_fp,0); // no buffering please, confuses poll
+
+    if (d_timeout) {
+      setNonBlocking(d_fd2[0]);
+    }
   }
   else if(!d_pid) { // child
     signal(SIGCHLD, SIG_DFL); // silence a warning from perl
     close(d_fd1[1]);
     close(d_fd2[0]);
 
-    if(d_fd1[0]!= infd) {
-      dup2(d_fd1[0], infd);
+    if(d_fd1[0]!= d_infd) {
+      dup2(d_fd1[0], d_infd);
       close(d_fd1[0]);
     }
 
-    if(d_fd2[1]!= outfd) {
-      dup2(d_fd2[1], outfd);
+    if(d_fd2[1]!= d_outfd) {
+      dup2(d_fd2[1], d_outfd);
       close(d_fd2[1]);
     }
 
     // stdin & stdout are now connected, fire up our coprocess!
-
-    if(execv(argv[0], const_cast<char * const *>(argv))<0) // now what
+    if(execv(d_argv[0], const_cast<char * const *>(d_argv.data()))<0) // now what
       exit(123);
 
     /* not a lot we can do here. We shouldn't return because that will leave a forked process around.
@@ -114,7 +109,7 @@ CoProcess::~CoProcess()
   }
   
   close(d_fd1[1]);
-  fclose(d_fp);
+  close(d_fd2[0]);
 }
 
 void CoProcess::checkStatus()
@@ -162,19 +157,53 @@ void CoProcess::send(const string &snd)
 
 void CoProcess::receive(string &receive)
 {
+  bool first = true;
   receive.clear();
-    
-  if(d_timeout) {
-    int ret = waitForData(fileno(d_fp), 0, d_timeout * 1000);
-    if(ret<0)
-      throw PDNSException("Error waiting on data from coprocess: "+stringerror());
-    if(!ret)
-      throw PDNSException("Timeout waiting for data from coprocess");
+
+  // we might still have some remaining data from our last read
+  if (!d_remaining.empty()) {
+    receive = std::move(d_remaining);
   }
 
-  if(!stringfgets(d_fp, receive))
-    throw PDNSException("Child closed pipe");
-  
+  size_t lastPos = 0;
+  size_t eolPos;
+  while ((eolPos = receive.find('\n', lastPos)) == std::string::npos) {
+    size_t existingSize = receive.size();
+    lastPos = existingSize;
+    receive.resize(existingSize + 4096);
+    ssize_t got = read(d_fd2[0], &receive.at(existingSize), 4096);
+    if (got == 0) {
+      throw PDNSException("Child closed pipe");
+    }
+    else if (got < 0) {
+      receive.resize(existingSize);
+      int saved = errno;
+      if (saved == EINTR) {
+        continue;
+      }
+      if (saved == EAGAIN) {
+        if(d_timeout) {
+          int ret = waitForData(d_fd2[0], 0, d_timeout * 1000);
+          if(ret<0)
+            throw PDNSException("Error waiting on data from coprocess: "+string(strerror(saved)));
+          if(!ret)
+            throw PDNSException("Timeout waiting for data from coprocess");
+        }
+      }
+      else {
+        throw PDNSException("Error reading from child's pipe:" + string(strerror(saved)));
+      }
+    } else {
+      receive.resize(existingSize + static_cast<size_t>(got));
+    }
+  }
+
+  if (eolPos != receive.size() - 1) {
+    /* we have some data remaining after the first '\n', let's keep it for later */
+    d_remaining.append(receive, eolPos + 1, receive.size() - eolPos - 1);
+  }
+
+  receive.resize(eolPos);
   trim_right(receive);
 }
 
index 4f168202da84dff2f5a2ece946043bb6ff16af66..dd581cc9b01fbde2e53dc72deda912397cb6a146 100644 (file)
@@ -43,18 +43,20 @@ class CoProcess : public CoRemote
 public:
   CoProcess(const string &command,int timeout=0, int infd=0, int outfd=1);
   ~CoProcess();
-  void sendReceive(const string &send, string &receive);
-  void receive(string &rcv);
-  void send(const string &send);
+  void sendReceive(const string &send, string &receive) override;
+  void receive(string &rcv) override;
+  void send(const string &send) override;
+  void launch();
 private:
-  void launch(const char **argv, int timeout=0, int infd=0, int outfd=1);
   void checkStatus();
+  std::vector<std::string> d_params;
+  std::vector<const char *> d_argv;
+  std::string d_remaining;
   int d_fd1[2], d_fd2[2];
   int d_pid;
   int d_infd;
   int d_outfd;
   int d_timeout;
-  FILE *d_fp;
 };
 
 class UnixRemote : public CoRemote
@@ -62,9 +64,9 @@ class UnixRemote : public CoRemote
 public:
   UnixRemote(const string &path, int timeout=0);
   ~UnixRemote();
-  void sendReceive(const string &send, string &receive);
-  void receive(string &rcv);
-  void send(const string &send);
+  void sendReceive(const string &send, string &receive) override;
+  void receive(string &rcv) override;
+  void send(const string &send) override;
 private:
   int d_fd;
   FILE *d_fp;
index 33950ecacd0c35f057db2c220bd9deaccc9af93b..7821c49fc5eba934429a662b84786c14127e8833 100644 (file)
@@ -69,10 +69,14 @@ void CoWrapper::launch()
    if(d_command.empty())
      throw ArgException("pipe-command is not specified");
 
-   if(isUnixSocket(d_command))
+   if(isUnixSocket(d_command)) {
      d_cp = new UnixRemote(d_command, d_timeout);
-   else
-     d_cp = new CoProcess(d_command, d_timeout);
+   }
+   else {
+     auto coprocess = new CoProcess(d_command, d_timeout);
+     coprocess->launch();
+     d_cp = coprocess;
+   }
 
    d_cp->send("HELO\t"+std::to_string(d_abiVersion));
    string banner;