]> granicus.if.org Git - pdns/commitdiff
Merged CoProcess into pipeconnector
authorAki Tuomi <cmouse@desteem.org>
Mon, 8 Jul 2013 09:46:01 +0000 (12:46 +0300)
committerAki Tuomi <cmouse@desteem.org>
Mon, 8 Jul 2013 09:46:01 +0000 (12:46 +0300)
modules/remotebackend/Makefile.am
modules/remotebackend/pipeconnector.cc
modules/remotebackend/remotebackend.hh

index c4bf57930f7548dbc6238195f1534f65a694800f..04d110a29d564f83ff4b9741be29f7ec015dfcb0 100644 (file)
@@ -31,7 +31,7 @@ libtestremotebackend_la_SOURCES=../../pdns/dnsbackend.hh ../../pdns/dnsbackend.c
         ../../pdns/aes/dns_random.cc ../../pdns/packetcache.hh ../../pdns/packetcache.cc \
         ../../pdns/aes/aescpp.h ../../pdns/dns.hh ../../pdns/dns.cc ../../pdns/json.hh ../../pdns/json.cc \
         ../../pdns/aes/aescrypt.c ../../pdns/aes/aes.h ../../pdns/aes/aeskey.c ../../pdns/aes/aes_modes.c ../../pdns/aes/aesopt.h \
-        ../../pdns/aes/aestab.c ../../pdns/aes/aestab.h ../../pdns/aes/brg_endian.h ../../pdns/aes/brg_types.h ../pipebackend/coprocess.cc \
+        ../../pdns/aes/aestab.c ../../pdns/aes/aestab.h ../../pdns/aes/brg_endian.h ../../pdns/aes/brg_types.h \
         remotebackend.hh remotebackend.cc unixconnector.cc httpconnector.cc pipeconnector.cc
 
 libtestremotebackend_la_CFLAGS=$(BOOST_CPPFLAGS) @THREADFLAGS@ $(LIBCURL_CFLAGS) -g -O0 -I../../pdns
index 9e28980aad96e618f82635214f0abd906a05fc12..f86fd28c863a1575aaa1b564e1cab5f6c9142f3e 100644 (file)
@@ -1,3 +1,5 @@
+#include <sys/types.h>
+#include <sys/wait.h>
 #include "remotebackend.hh"
 
 PipeConnector::PipeConnector(std::map<std::string,std::string> options) {
@@ -7,26 +9,92 @@ PipeConnector::PipeConnector(std::map<std::string,std::string> options) {
   }
   this->command = options.find("command")->second;
   this->options = options;
-  this->coproc = NULL;
+  d_timeout=2000;
+
+  if (options.find("timeout") != options.end()) {
+     d_timeout = boost::lexical_cast<int>(options.find("timeout")->second);
+  }
+
+  d_pid = -1;
+  d_fp = NULL;
   launch();
 }
 
 PipeConnector::~PipeConnector(){
-  if (this->coproc != NULL) 
-    delete coproc; 
+  int status;
+  // just in case...
+  if (d_pid == -1) return;
+
+  if(!waitpid(d_pid, &status, WNOHANG)) {
+    kill(d_pid, 9);
+    waitpid(d_pid, &status, 0);
+  }
+
+  close(d_fd1[1]);
+  if (d_fp != NULL) fclose(d_fp);
 }
 
 void PipeConnector::launch() {
-  if (coproc != NULL) return;
+  // no relaunch
+  if (d_pid > 0 && checkStatus()) return;
+
+  std::vector <std::string> v;
+  split(v, command, is_any_of(" "));
+
+  const char *argv[v.size()+1];
+  argv[v.size()]=0;
+
+  for (size_t n = 0; n < v.size(); n++)
+    argv[n]=v[n].c_str();
+
+  signal(SIGPIPE, SIG_IGN);
+
+  if(access(argv[0],X_OK)) // check before fork so we can throw
+    throw AhuException("Command '"+string(argv[0])+"' cannot be executed: "+stringerror());
+
+  if(pipe(d_fd1)<0 || pipe(d_fd2)<0)
+    throw AhuException("Unable to open pipe for coprocess: "+string(strerror(errno)));
+
+  if((d_pid=fork())<0)
+    throw AhuException("Unable to fork for coprocess: "+stringerror());
+  else if(d_pid>0) { // parent speaking
+    close(d_fd1[0]);
+    Utility::setCloseOnExec(d_fd1[1]);
+    close(d_fd2[1]);
+    Utility::setCloseOnExec(d_fd2[0]);
+    if(!(d_fp=fdopen(d_fd2[0],"r")))
+      throw AhuException("Unable to associate a file pointer with pipe: "+stringerror());
+    setbuf(d_fp,0); // no buffering please, confuses select
+  }
+  else if(!d_pid) { // child
+    signal(SIGCHLD, SIG_DFL); // silence a warning from perl
+    close(d_fd1[1]);
+    close(d_fd2[0]);
+
+    if(d_fd1[0]!= 0) {
+      dup2(d_fd1[0], 0);
+      close(d_fd1[0]);
+    }
+
+    if(d_fd2[1]!= 1) {
+      dup2(d_fd2[1], 1);
+      close(d_fd2[1]);
+    }
+
+    // stdin & stdout are now connected, fire up our coprocess!
+
+    if(execv(argv[0], const_cast<char * const *>(argv))<0) // now what
+      exit(123);
+
+    /* not a lot we can do here. We shouldn't return because that will leave a forked process around.
+       no way to log this either - only thing we can do is make sure that our parent catches this soonest! */
+  }
+
   rapidjson::Value val;
   rapidjson::Document init,res;
-  int timeout=2000;
-  if (options.find("timeout") != options.end()) { 
-     timeout = boost::lexical_cast<int>(options.find("timeout")->second);
-  }
-  coproc = new CoProcess(this->command, timeout);
   init.SetObject();
   val = "initialize";
+
   init.AddMember("method",val, init.GetAllocator());
   val.SetObject();
   init.AddMember("parameters", val, init.GetAllocator());
@@ -44,42 +112,83 @@ void PipeConnector::launch() {
 
 int PipeConnector::send_message(const rapidjson::Document &input)
 {
-   std::string data;
+   std::string line;
+   line = makeStringFromDocument(input);
+   launch();
 
-   data = makeStringFromDocument(input);
+   line.append(1,'\n');
 
-   launch();
-   try {
-      coproc->send(data);
-      return 1;
+   unsigned int sent=0;
+   int bytes;
+
+   // writen routine - socket may not accept al data in one go
+   while(sent<line.size()) {
+     bytes=write(d_fd1[1],line.c_str()+sent,line.length()-sent);
+     if(bytes<0)
+       throw AhuException("Writing to coprocess failed: "+std::string(strerror(errno)));
+
+     sent+=bytes;
    }
-   catch(AhuException &ae) {
-      delete coproc;
-      coproc=NULL;
-      throw;
-   } 
+   return sent;
 }
 
 int PipeConnector::recv_message(rapidjson::Document &output) 
 {
+   std::string receive;
    rapidjson::GenericReader<rapidjson::UTF8<> , rapidjson::MemoryPoolAllocator<> > r;
    std::string tmp;
    std::string s_output;
-
    launch();
-   try {
-      while(1) {
-        coproc->receive(tmp);
-        s_output.append(tmp);
-        rapidjson::StringStream ss(s_output.c_str());
-        output.ParseStream<0>(ss); 
-        if (output.HasParseError() == false)
-          return s_output.size();
-      }
-   } catch(AhuException &ae) {
-      L<<Logger::Warning<<"[pipeconnector] "<<" unable to receive data from coprocess. "<<ae.reason<<endl;
-      delete coproc;
-      coproc = NULL;
-      throw;
+
+   while(1) {
+     receive.clear();
+     if(d_timeout) {
+       struct timeval tv;
+       tv.tv_sec = d_timeout/1000;
+       tv.tv_usec = (d_timeout % 1000) * 1000;
+       fd_set rds;
+       FD_ZERO(&rds);
+       FD_SET(fileno(d_fp),&rds);
+       int ret=select(fileno(d_fp)+1,&rds,0,0,&tv);
+       if(ret<0) 
+         throw AhuException("Error waiting on data from coprocess: "+stringerror());
+       if(!ret)
+         throw AhuException("Timeout waiting for data from coprocess");
+     }
+
+     if(!stringfgets(d_fp, receive))
+       throw AhuException("Child closed pipe");
+  
+      s_output.append(receive);
+      rapidjson::StringStream ss(s_output.c_str());
+      output.ParseStream<0>(ss); 
+      if (output.HasParseError() == false)
+        return s_output.size();
    }
+   return 0;
+}
+
+bool PipeConnector::checkStatus()
+{
+  int status;
+  int ret=waitpid(d_pid, &status, WNOHANG);
+  if(ret<0)
+    throw AhuException("Unable to ascertain status of coprocess "+itoa(d_pid)+" from "+itoa(getpid())+": "+string(strerror(errno)));
+  else if(ret) {
+    if(WIFEXITED(status)) {
+      int ret=WEXITSTATUS(status);
+      throw AhuException("Coprocess exited with code "+itoa(ret));
+    }
+    if(WIFSIGNALED(status)) {
+      int sig=WTERMSIG(status);
+      string reason="CoProcess died on receiving signal "+itoa(sig);
+#ifdef WCOREDUMP
+      if(WCOREDUMP(status))
+        reason+=". Dumped core";
+#endif
+
+      throw AhuException(reason);
+    }
+  }
+  return true;
 }
index 18b6a97ab4098008b8522cdbaf64dfb42d258551..2f2d717b839bf03ae1657a4e175232bd0b44ffb2 100644 (file)
@@ -92,9 +92,15 @@ class PipeConnector: public Connector {
   private:
 
   void launch();
-  CoProcess *coproc;
+  bool checkStatus();
+
   std::string command;
   std::map<std::string,std::string> options;
+  int d_fd1[2], d_fd2[2];
+  int d_pid;
+  int d_timeout;
+  FILE *d_fp;
 };
 
 class RemoteBackend : public DNSBackend
@@ -133,12 +139,13 @@ class RemoteBackend : public DNSBackend
   static DNSBackend *maker();
 
   private:
-    int build(const std::string &connstr);
+    int build();
     Connector *connector;
     bool d_dnssec;
     rapidjson::Document *d_result;
     int d_index;
     int64_t d_trxid;
+    std::string d_connstr;
 
     bool getBool(rapidjson::Value &value);
     int getInt(rapidjson::Value &value);
@@ -146,5 +153,8 @@ class RemoteBackend : public DNSBackend
     int64_t getInt64(rapidjson::Value &value);
     std::string getString(rapidjson::Value &value);
     double getDouble(rapidjson::Value &value);
+
+    bool send(rapidjson::Document &value);
+    bool recv(rapidjson::Document &value);
 };
 #endif