From: Remi Gacogne Date: Sun, 4 Aug 2019 20:58:25 +0000 (+0200) Subject: auth: Emulate a buffered read in the pipe backend, ~3x faster X-Git-Tag: dnsdist-1.4.0-rc3~18^2 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=214fd99e390fa5a97b3683ccb076e3b468888cd7;p=pdns auth: Emulate a buffered read in the pipe backend, ~3x faster --- diff --git a/modules/pipebackend/coprocess.cc b/modules/pipebackend/coprocess.cc index db0ed5c18..2da3dacd7 100644 --- a/modules/pipebackend/coprocess.cc +++ b/modules/pipebackend/coprocess.cc @@ -40,30 +40,24 @@ #include #include -CoProcess::CoProcess(const string &command,int timeout, int infd, int outfd) +CoProcess::CoProcess(const string &command,int timeout, int infd, int outfd): d_infd(infd), d_outfd(outfd), d_timeout(timeout) { - vector v; - split(v, command, is_any_of(" ")); + split(d_params, command, is_any_of(" ")); - std::vectorargv(v.size()+1); - argv[v.size()]=0; + d_argv.resize(d_params.size()+1); + d_argv[d_params.size()]=nullptr; - for (size_t n = 0; n < v.size(); n++) - argv[n]=v[n].c_str(); - // we get away with not copying since nobody resizes v - launch(argv.data(), timeout, infd, outfd); + for (size_t n = 0; n < d_params.size(); n++) { + d_argv[n]=d_params[n].c_str(); + } } -void CoProcess::launch(const char **argv, int timeout, int infd, int outfd) +void CoProcess::launch() { - d_timeout=timeout; - d_infd=infd; - d_outfd=outfd; - signal(SIGPIPE, SIG_IGN); - if(access(argv[0],X_OK)) // check before fork so we can throw - throw PDNSException("Command '"+string(argv[0])+"' cannot be executed: "+stringerror()); + if(access(d_argv[0],X_OK)) // check before fork so we can throw + throw PDNSException("Command '"+string(d_argv[0])+"' cannot be executed: "+stringerror()); if(pipe(d_fd1)<0 || pipe(d_fd2)<0) throw PDNSException("Unable to open pipe for coprocess: "+string(strerror(errno))); @@ -71,33 +65,34 @@ void CoProcess::launch(const char **argv, int timeout, int infd, int outfd) if((d_pid=fork())<0) throw PDNSException("Unable to fork for coprocess: "+stringerror()); else if(d_pid>0) { // parent speaking + // no need to keep this around + d_argv.clear(); close(d_fd1[0]); setCloseOnExec(d_fd1[1]); close(d_fd2[1]); setCloseOnExec(d_fd2[0]); - if(!(d_fp=fdopen(d_fd2[0],"r"))) - throw PDNSException("Unable to associate a file pointer with pipe: "+stringerror()); - if( d_timeout) - setbuf(d_fp,0); // no buffering please, confuses poll + + if (d_timeout) { + setNonBlocking(d_fd2[0]); + } } else if(!d_pid) { // child signal(SIGCHLD, SIG_DFL); // silence a warning from perl close(d_fd1[1]); close(d_fd2[0]); - if(d_fd1[0]!= infd) { - dup2(d_fd1[0], infd); + if(d_fd1[0]!= d_infd) { + dup2(d_fd1[0], d_infd); close(d_fd1[0]); } - if(d_fd2[1]!= outfd) { - dup2(d_fd2[1], outfd); + if(d_fd2[1]!= d_outfd) { + dup2(d_fd2[1], d_outfd); close(d_fd2[1]); } // stdin & stdout are now connected, fire up our coprocess! - - if(execv(argv[0], const_cast(argv))<0) // now what + if(execv(d_argv[0], const_cast(d_argv.data()))<0) // now what exit(123); /* not a lot we can do here. We shouldn't return because that will leave a forked process around. @@ -114,7 +109,7 @@ CoProcess::~CoProcess() } close(d_fd1[1]); - fclose(d_fp); + close(d_fd2[0]); } void CoProcess::checkStatus() @@ -162,19 +157,53 @@ void CoProcess::send(const string &snd) void CoProcess::receive(string &receive) { + bool first = true; receive.clear(); - - if(d_timeout) { - int ret = waitForData(fileno(d_fp), 0, d_timeout * 1000); - if(ret<0) - throw PDNSException("Error waiting on data from coprocess: "+stringerror()); - if(!ret) - throw PDNSException("Timeout waiting for data from coprocess"); + + // we might still have some remaining data from our last read + if (!d_remaining.empty()) { + receive = std::move(d_remaining); } - if(!stringfgets(d_fp, receive)) - throw PDNSException("Child closed pipe"); - + size_t lastPos = 0; + size_t eolPos; + while ((eolPos = receive.find('\n', lastPos)) == std::string::npos) { + size_t existingSize = receive.size(); + lastPos = existingSize; + receive.resize(existingSize + 4096); + ssize_t got = read(d_fd2[0], &receive.at(existingSize), 4096); + if (got == 0) { + throw PDNSException("Child closed pipe"); + } + else if (got < 0) { + receive.resize(existingSize); + int saved = errno; + if (saved == EINTR) { + continue; + } + if (saved == EAGAIN) { + if(d_timeout) { + int ret = waitForData(d_fd2[0], 0, d_timeout * 1000); + if(ret<0) + throw PDNSException("Error waiting on data from coprocess: "+string(strerror(saved))); + if(!ret) + throw PDNSException("Timeout waiting for data from coprocess"); + } + } + else { + throw PDNSException("Error reading from child's pipe:" + string(strerror(saved))); + } + } else { + receive.resize(existingSize + static_cast(got)); + } + } + + if (eolPos != receive.size() - 1) { + /* we have some data remaining after the first '\n', let's keep it for later */ + d_remaining.append(receive, eolPos + 1, receive.size() - eolPos - 1); + } + + receive.resize(eolPos); trim_right(receive); } diff --git a/modules/pipebackend/coprocess.hh b/modules/pipebackend/coprocess.hh index 4f168202d..dd581cc9b 100644 --- a/modules/pipebackend/coprocess.hh +++ b/modules/pipebackend/coprocess.hh @@ -43,18 +43,20 @@ class CoProcess : public CoRemote public: CoProcess(const string &command,int timeout=0, int infd=0, int outfd=1); ~CoProcess(); - void sendReceive(const string &send, string &receive); - void receive(string &rcv); - void send(const string &send); + void sendReceive(const string &send, string &receive) override; + void receive(string &rcv) override; + void send(const string &send) override; + void launch(); private: - void launch(const char **argv, int timeout=0, int infd=0, int outfd=1); void checkStatus(); + std::vector d_params; + std::vector d_argv; + std::string d_remaining; int d_fd1[2], d_fd2[2]; int d_pid; int d_infd; int d_outfd; int d_timeout; - FILE *d_fp; }; class UnixRemote : public CoRemote @@ -62,9 +64,9 @@ class UnixRemote : public CoRemote public: UnixRemote(const string &path, int timeout=0); ~UnixRemote(); - void sendReceive(const string &send, string &receive); - void receive(string &rcv); - void send(const string &send); + void sendReceive(const string &send, string &receive) override; + void receive(string &rcv) override; + void send(const string &send) override; private: int d_fd; FILE *d_fp; diff --git a/modules/pipebackend/pipebackend.cc b/modules/pipebackend/pipebackend.cc index 33950ecac..7821c49fc 100644 --- a/modules/pipebackend/pipebackend.cc +++ b/modules/pipebackend/pipebackend.cc @@ -69,10 +69,14 @@ void CoWrapper::launch() if(d_command.empty()) throw ArgException("pipe-command is not specified"); - if(isUnixSocket(d_command)) + if(isUnixSocket(d_command)) { d_cp = new UnixRemote(d_command, d_timeout); - else - d_cp = new CoProcess(d_command, d_timeout); + } + else { + auto coprocess = new CoProcess(d_command, d_timeout); + coprocess->launch(); + d_cp = coprocess; + } d_cp->send("HELO\t"+std::to_string(d_abiVersion)); string banner;