From: Bert Hubert Date: Thu, 3 Feb 2011 20:46:30 +0000 (+0000) Subject: signingpipe is all zero-copy and hyperefficient now. but not any faster ;-( X-Git-Tag: auth-3.0~275 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=a2aaa80731425a4b20a1f7b2d80bff85f3157c89;p=pdns signingpipe is all zero-copy and hyperefficient now. but not any faster ;-( git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@1972 d19b8d6e-7fed-0310-83ef-9ca221ded41b --- diff --git a/pdns/signingpipe.cc b/pdns/signingpipe.cc index 25bf5ef1b..c06798aa7 100644 --- a/pdns/signingpipe.cc +++ b/pdns/signingpipe.cc @@ -1,109 +1,151 @@ #include "signingpipe.hh" +#include -AtomicCounter ChunkedSigningPipe::s_workerid; +struct StartHelperStruct +{ + StartHelperStruct(ChunkedSigningPipe* csp, int id) : d_csp(csp), d_id(id){} + ChunkedSigningPipe* d_csp; + int d_id; +}; void* ChunkedSigningPipe::helperWorker(void* p) try { - ChunkedSigningPipe* us = (ChunkedSigningPipe*)p; - us->worker(); + StartHelperStruct shs=*(StartHelperStruct*)p; + delete (StartHelperStruct*)p; + + shs.d_csp->worker(shs.d_id); return 0; } catch(std::exception& e) { cerr<<"Signing thread died with error "<()); if(!d_mustSign) return; - if(pipe(d_uppipe) < 0 || pipe(d_backpipe)) + + if(pipe(d_backpipe) < 0) throw runtime_error("Unable to create communication pipes in for ChunkedSigningPipe"); Utility::setNonBlocking(d_backpipe[0]); + int fds[2]; + for(unsigned int n=0; n < d_numworkers; ++n) { - pthread_create(&d_tids[n], 0, helperWorker, (void*) this); + if(pipe(fds) < 0) + throw runtime_error("Unable to create communication uppipes in for ChunkedSigningPipe"); + d_uppipes.push_back(make_pair(fds[0], fds[1])); + + pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n)); } } ChunkedSigningPipe::~ChunkedSigningPipe() { + delete d_rrsetToSign; if(!d_mustSign) return; - close(d_uppipe[1]); // this will trigger all threads to exit + for(vector >::const_iterator iter = d_uppipes.begin(); iter != d_uppipes.end(); ++iter) + close(iter->second); // this will trigger all threads to exit + void* res; for(unsigned int n = 0; n < d_numworkers; ++n) pthread_join(d_tids[n], &res); close(d_backpipe[1]); close(d_backpipe[0]); - close(d_uppipe[0]); + for(vector >::const_iterator iter = d_uppipes.begin(); iter != d_uppipes.end(); ++iter) + close(iter->first); + cout<<"Did: "<qtype.getCode() != rr.qtype.getCode() || !pdns_iequals(d_toSign.begin()->qname, rr.qname))) + if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->qtype.getCode() != rr.qtype.getCode() || !pdns_iequals(d_rrsetToSign->begin()->qname, rr.qname))) { - flushToSign(); + sendRRSetToWorker(); } - d_toSign.push_back(rr); - return d_chunk.size() > d_chunkrecords; + d_rrsetToSign->push_back(rr); + return !d_chunks.empty() && d_chunks.back().size() > d_maxchunkrecords; } -void ChunkedSigningPipe::sendChunkToSign() +void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! { + if(d_chunks.empty()) { + cerr<<"Help!"<begin(), d_rrsetToSign->end()); + d_rrsetToSign->clear(); return; } - if(!d_toSign.empty()) { - chunk_t* toSign = new chunk_t(d_toSign); - - if(write(d_uppipe[1], &toSign, sizeof(toSign)) != sizeof(toSign)) + + if(!d_rrsetToSign->empty()) { + static int counter; + d_rrsetToSign->reserve(2*d_rrsetToSign->size()); + if(write(d_uppipes[++counter % d_uppipes.size()].second, &d_rrsetToSign, sizeof(d_rrsetToSign)) != sizeof(d_rrsetToSign)) throw runtime_error("Partial write or error communicating to signing thread"); + d_rrsetToSign = new rrset_t; d_outstanding++; + d_queued++; } chunk_t* signedChunk; while(d_outstanding && read(d_backpipe[0], &signedChunk, sizeof(signedChunk)) > 0) { --d_outstanding; - d_chunk.insert(d_chunk.end(), signedChunk->begin(), signedChunk->end()); + d_chunks.back().insert(d_chunks.back().end(), signedChunk->begin(), signedChunk->end()); delete signedChunk; + if(d_chunks.back().size() > d_maxchunkrecords) { + d_chunks.push_back(vector()); + break; + } } - - d_toSign.clear(); } -void ChunkedSigningPipe::worker() +unsigned int ChunkedSigningPipe::getReady() +{ + unsigned int sum=0; + BOOST_FOREACH(const std::vector& v, d_chunks) { + sum += v.size(); + } + return sum; +} +void ChunkedSigningPipe::worker(int id) { - //int my_id = ++s_workerid; - // cout<clear(); } vector ChunkedSigningPipe::getChunk(bool final) @@ -113,10 +155,9 @@ vector ChunkedSigningPipe::getChunk(bool final) flushToSign(); } - chunk_t::size_type amount=min(d_chunkrecords, d_chunk.size()); - chunk_t chunk(d_chunk.begin(), d_chunk.begin() + amount); - - d_chunk.erase(d_chunk.begin(), d_chunk.begin() + amount); - - return chunk; + vector front=d_chunks.front(); + d_chunks.pop_front(); + if(d_chunks.empty()) + d_chunks.push_back(vector()); + return front; } diff --git a/pdns/signingpipe.hh b/pdns/signingpipe.hh index def6dbee1..d60820bca 100644 --- a/pdns/signingpipe.hh +++ b/pdns/signingpipe.hh @@ -2,6 +2,7 @@ #define PDNS_SIGNINGPIPE #include #include +#include #include "dnsseckeeper.hh" #include "dns.hh" using std::string; @@ -14,30 +15,37 @@ using std::vector; class ChunkedSigningPipe { public: - typedef vector chunk_t; + typedef vector rrset_t; + typedef rrset_t chunk_t; // for now ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int numWorkers=3); ~ChunkedSigningPipe(); bool submit(const DNSResourceRecord& rr); chunk_t getChunk(bool final=false); + int d_queued; + AtomicCounter d_signed; + int d_outstanding; + unsigned int getReady(); private: void flushToSign(); - void sendChunkToSign(); // dispatch chunk to worker - void worker(); + void sendRRSetToWorker(); // dispatch RRSET to worker + void worker(int n); static void* helperWorker(void* p); - chunk_t d_toSign, d_chunk; + rrset_t* d_rrsetToSign; + std::deque< std::vector > d_chunks; DNSSECKeeper& d_dk; UeberBackend& d_db; string d_signer; - chunk_t::size_type d_chunkrecords; - int d_uppipe[2], d_backpipe[2]; - int d_outstanding; + chunk_t::size_type d_maxchunkrecords; + + std::vector > d_uppipes; + int d_backpipe[2]; + unsigned int d_numworkers; vector d_tids; - static AtomicCounter s_workerid; bool d_mustSign; };