From: Bert Hubert Date: Mon, 7 Feb 2011 09:29:03 +0000 (+0000) Subject: signingpipe was revamped 12 times, but is again simple. X-Git-Tag: auth-3.0~266 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=a6ef6f7a8ccb533e64fc8d5b1ea0412536dd9074;p=pdns signingpipe was revamped 12 times, but is again simple. git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@1981 d19b8d6e-7fed-0310-83ef-9ca221ded41b --- diff --git a/pdns/signingpipe.cc b/pdns/signingpipe.cc index c06798aa7..fa9b9a9f8 100644 --- a/pdns/signingpipe.cc +++ b/pdns/signingpipe.cc @@ -1,11 +1,18 @@ #include "signingpipe.hh" +#include "misc.hh" +#include #include +#include +#include +#include +#include struct StartHelperStruct { - StartHelperStruct(ChunkedSigningPipe* csp, int id) : d_csp(csp), d_id(id){} + StartHelperStruct(ChunkedSigningPipe* csp, int id, int fd) : d_csp(csp), d_id(id), d_fd(fd){} ChunkedSigningPipe* d_csp; int d_id; + int d_fd; }; void* ChunkedSigningPipe::helperWorker(void* p) @@ -14,7 +21,7 @@ try StartHelperStruct shs=*(StartHelperStruct*)p; delete (StartHelperStruct*)p; - shs.d_csp->worker(shs.d_id); + shs.d_csp->worker(shs.d_id, shs.d_fd); return 0; } catch(std::exception& e) { @@ -22,27 +29,55 @@ catch(std::exception& e) { return 0; } -ChunkedSigningPipe::ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int workers) - : d_queued(0), d_outstanding(0), d_dk(dk), d_db(db), d_signer(signerName), d_maxchunkrecords(100), d_numworkers(workers), d_tids(d_numworkers), - d_mustSign(mustSign) +ChunkedSigningPipe::ChunkedSigningPipe(const std::string& signerName, bool mustSign, const pdns::string& servers, unsigned int workers) + : d_queued(0), d_outstanding(0), d_signer(signerName), d_maxchunkrecords(100), d_numworkers(workers), d_tids(d_numworkers), + d_mustSign(mustSign), d_final(false) { d_rrsetToSign = new rrset_t; d_chunks.push_back(vector()); if(!d_mustSign) return; - if(pipe(d_backpipe) < 0) - throw runtime_error("Unable to create communication pipes in for ChunkedSigningPipe"); - - Utility::setNonBlocking(d_backpipe[0]); int fds[2]; + ServiceTuple st; + ComboAddress remote; + if(!servers.empty()) { + st.port=2000; + parseService(servers, st); + remote=ComboAddress(st.host, st.port); + } + for(unsigned int n=0; n < d_numworkers; ++n) { - if(pipe(fds) < 0) - throw runtime_error("Unable to create communication uppipes in for ChunkedSigningPipe"); - d_uppipes.push_back(make_pair(fds[0], fds[1])); - - pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n)); + if(!servers.empty()) { + fds[0] = socket(AF_INET, SOCK_STREAM, 0); + fds[1] = -1; + + if(connect(fds[0], (struct sockaddr*)&remote, remote.getSocklen()) < 0) + unixDie("Connecting to signing server"); + + //int tmp=1; + //setsockopt(fds[0], SOL_TCP, TCP_NODELAY, &tmp, sizeof(tmp)); + } + else { + if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) + throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe"); + pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1])); +#if 0 + signal(SIGCHLD, SIG_IGN); + if(!fork()) { // child + dup2(fds[1], 0); + execl("./pdnssec", "./pdnssec", "--config-dir=./", "signing-slave", NULL); + // helperWorker(new StartHelperStruct(this, n)); + return; + } + else + close(fds[1]); +#endif + } + + d_sockets.push_back(fds[0]); + Utility::setNonBlocking(fds[0]); } } @@ -51,17 +86,14 @@ ChunkedSigningPipe::~ChunkedSigningPipe() delete d_rrsetToSign; if(!d_mustSign) return; - for(vector >::const_iterator iter = d_uppipes.begin(); iter != d_uppipes.end(); ++iter) - close(iter->second); // this will trigger all threads to exit + BOOST_FOREACH(int fd, d_sockets) { + close(fd); // this will trigger all threads to exit + } void* res; - for(unsigned int n = 0; n < d_numworkers; ++n) - pthread_join(d_tids[n], &res); - - close(d_backpipe[1]); - close(d_backpipe[0]); - for(vector >::const_iterator iter = d_uppipes.begin(); iter != d_uppipes.end(); ++iter) - close(iter->first); + BOOST_FOREACH(pthread_t& tid, d_tids) { + pthread_join(tid, &res); + } cout<<"Did: "< d_maxchunkrecords; } + +namespace { +int readn(int fd, void* buffer, unsigned int len) +{ + unsigned int pos=0; + int res; + for(;;) { + res = read(fd, (char*)buffer + pos, len - pos); + if(res == 0) { + if(pos) + throw runtime_error("Signing Pipe remote shut down in the middle of a message"); + else { + cerr<<"Got decent EOF on "< buf(new char[len]); + readn(fd, buf.get(), len); + + msg.assign(buf.get(), len); + return true; +} +void writeLStringToSocket(int fd, const string& msg) +{ + string realmsg; + uint32_t len = htonl(msg.length()); + string tot((char*)&len, 4); + tot+=msg; + + writen2(fd, tot.c_str(), tot.length()); +} + +#endif + +pair, vector > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds) +{ + struct pollfd pfds[d_sockets.size()]; + + for(unsigned int n = 0; n < d_sockets.size(); ++n) { + + memset(&pfds[n], 0, sizeof(pfds[n])); + pfds[n].fd = d_sockets[n]; + if(!d_eof.count(n)) { + if(rd) + pfds[n].events |= POLLIN; + if(wr) + pfds[n].events |= POLLOUT; + } + } + + int res = poll(pfds, d_sockets.size(), seconds * 1000); // negative = infinite + if(res < 0) + unixDie("polling for activity from signers"); + pair, vector > vects; + for(unsigned int n = 0; n < d_sockets.size(); ++n) + if(pfds[n].revents & POLLIN) + vects.first.push_back(pfds[n].fd); + else if(pfds[n].revents & POLLOUT) + vects.second.push_back(pfds[n].fd); + + return vects; +} + + void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! { if(d_chunks.empty()) { @@ -86,26 +210,66 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! return; } - if(!d_rrsetToSign->empty()) { - static int counter; - d_rrsetToSign->reserve(2*d_rrsetToSign->size()); - if(write(d_uppipes[++counter % d_uppipes.size()].second, &d_rrsetToSign, sizeof(d_rrsetToSign)) != sizeof(d_rrsetToSign)) - throw runtime_error("Partial write or error communicating to signing thread"); + bool wantRead, wantWrite; + + wantWrite = !d_rrsetToSign->empty(); + wantRead = d_outstanding | wantWrite; // if we wrote, we want to read + + pair, vector > rwVect; + + waitForWrite:; + if(d_final) { + if(!d_outstanding) + return; + // cerr<<"Setting timeout to infinite, outstanding = " < 0) { - --d_outstanding; - d_chunks.back().insert(d_chunks.back().end(), signedChunk->begin(), signedChunk->end()); - delete signedChunk; - if(d_chunks.back().size() > d_maxchunkrecords) { - d_chunks.push_back(vector()); + } // if wantWrite && we couldn't, we must try again after reading a bit + + string str; + while(d_outstanding) { + bool gotSomething=false; + chunk_t* chunk; + BOOST_FOREACH(int fd, rwVect.first) { + if(d_eof.count(fd)) + continue; + int res = readn(fd, &chunk, sizeof(chunk)); + if(!res) { + d_eof.insert(fd); + break; + } + if(res < 0) + unixDie("Error reading signed chunk from thread"); + + --d_outstanding; + d_chunks.back().insert(d_chunks.back().end(), chunk->begin(), chunk->end()); + delete chunk; + + if(d_chunks.back().size() > d_maxchunkrecords) { + d_chunks.push_back(vector()); // we filled a chunk, and have no need to queue further now + break; + } + } + if(!gotSomething) + break; + if(d_chunks.back().empty()) // this means we've read a full chunk and should cut it out already break; - } } + + if(wantWrite && !d_rrsetToSign->empty()) { // we still have something to write, and should try again + wantRead = false; + goto waitForWrite; + } + } unsigned int ChunkedSigningPipe::getReady() @@ -116,30 +280,31 @@ unsigned int ChunkedSigningPipe::getReady() } return sum; } -void ChunkedSigningPipe::worker(int id) +void ChunkedSigningPipe::worker(int id, int fd) +try { - chunk_t* chunks[64]; - DNSSECKeeper dk; + UeberBackend db("key-only"); + + chunk_t* chunk; int res; for(;;) { - res=read(d_uppipes[id].first, &chunks[0], 64*sizeof(chunk_t*)); - if(!res) { - //cerr< ChunkedSigningPipe::getChunk(bool final) { - if(final) { - Utility::setBlocking(d_backpipe[0]); + if(final && !d_final) { + // this means we should keep on reading until d_outstanding == 0 + d_final = true; flushToSign(); + + BOOST_FOREACH(int fd, d_sockets) { + shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side + cerr<<"shutdown of "< front=d_chunks.front(); d_chunks.pop_front(); if(d_chunks.empty()) d_chunks.push_back(vector()); + if(d_final && front.empty()) + cerr<<"getChunk returning empty in final"< rrset_t; typedef rrset_t chunk_t; // for now - ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int numWorkers=3); + ChunkedSigningPipe(const std::string& signerName, bool mustSign, const pdns::string& servers=pdns::string(), unsigned int numWorkers=3); ~ChunkedSigningPipe(); bool submit(const DNSResourceRecord& rr); chunk_t getChunk(bool final=false); @@ -30,23 +33,23 @@ private: void flushToSign(); void sendRRSetToWorker(); // dispatch RRSET to worker - void worker(int n); + pair, vector > waitForRW(bool rd, bool wr, int seconds); + + void worker(int n, int fd); static void* helperWorker(void* p); rrset_t* d_rrsetToSign; std::deque< std::vector > d_chunks; - DNSSECKeeper& d_dk; - UeberBackend& d_db; string d_signer; chunk_t::size_type d_maxchunkrecords; - std::vector > d_uppipes; - int d_backpipe[2]; - + std::vector d_sockets; + std::set d_eof; unsigned int d_numworkers; vector d_tids; bool d_mustSign; + bool d_final; }; #endif