]> granicus.if.org Git - pdns/commitdiff
signingpipe is all zero-copy and hyperefficient now. but not any faster ;-(
authorBert Hubert <bert.hubert@netherlabs.nl>
Thu, 3 Feb 2011 20:46:30 +0000 (20:46 +0000)
committerBert Hubert <bert.hubert@netherlabs.nl>
Thu, 3 Feb 2011 20:46:30 +0000 (20:46 +0000)
git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@1972 d19b8d6e-7fed-0310-83ef-9ca221ded41b

pdns/signingpipe.cc
pdns/signingpipe.hh

index 25bf5ef1b25a911c72bf9299538c23f54192c03e..c06798aa7c255390526db821b59092083ec1c274 100644 (file)
 #include "signingpipe.hh"
+#include <boost/foreach.hpp>
 
-AtomicCounter ChunkedSigningPipe::s_workerid;
+struct StartHelperStruct
+{
+  StartHelperStruct(ChunkedSigningPipe* csp, int id) : d_csp(csp), d_id(id){}
+  ChunkedSigningPipe* d_csp;
+  int d_id;
+};
 
 void* ChunkedSigningPipe::helperWorker(void* p)
 try
 {
-  ChunkedSigningPipe* us = (ChunkedSigningPipe*)p;
-  us->worker();
+  StartHelperStruct shs=*(StartHelperStruct*)p;
+  delete (StartHelperStruct*)p;
+  
+  shs.d_csp->worker(shs.d_id);
   return 0;
 }
 catch(std::exception& e) {
   cerr<<"Signing thread died with error "<<e.what()<<endl;
+  return 0;
 }
 
 ChunkedSigningPipe::ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int workers) 
-  : d_dk(dk), d_db(db), d_signer(signerName), d_chunkrecords(100), d_outstanding(0), d_numworkers(workers), d_tids(d_numworkers),
+  : d_queued(0), d_outstanding(0), d_dk(dk), d_db(db),  d_signer(signerName), d_maxchunkrecords(100), d_numworkers(workers), d_tids(d_numworkers),
     d_mustSign(mustSign)
 {
+  d_rrsetToSign = new rrset_t;
+  d_chunks.push_back(vector<DNSResourceRecord>());
   if(!d_mustSign)
     return;
-  if(pipe(d_uppipe) < 0 || pipe(d_backpipe))
+  
+  if(pipe(d_backpipe) < 0)
     throw runtime_error("Unable to create communication pipes in for ChunkedSigningPipe");
   
   Utility::setNonBlocking(d_backpipe[0]);
+  int fds[2];
+  
   for(unsigned int n=0; n < d_numworkers; ++n) {
-    pthread_create(&d_tids[n], 0, helperWorker, (void*) this);
+    if(pipe(fds) < 0)
+      throw runtime_error("Unable to create communication uppipes in for ChunkedSigningPipe");
+    d_uppipes.push_back(make_pair(fds[0], fds[1]));
+    
+    pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n));
   }
 }
 
 ChunkedSigningPipe::~ChunkedSigningPipe()
 {
+  delete d_rrsetToSign;
   if(!d_mustSign)
     return;
-  close(d_uppipe[1]); // this will trigger all threads to exit
+  for(vector<pair<int, int> >::const_iterator iter = d_uppipes.begin(); iter != d_uppipes.end(); ++iter)
+    close(iter->second); // this will trigger all threads to exit
+    
   void* res;
   for(unsigned int n = 0; n < d_numworkers; ++n)
     pthread_join(d_tids[n], &res);
   
   close(d_backpipe[1]);
   close(d_backpipe[0]);
-  close(d_uppipe[0]);
+  for(vector<pair<int, int> >::const_iterator iter = d_uppipes.begin(); iter != d_uppipes.end(); ++iter)
+    close(iter->first); 
+  cout<<"Did: "<<d_signed<<endl;
 }
 
 bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr)
 {
-  if(!d_toSign.empty() && (d_toSign.begin()->qtype.getCode() != rr.qtype.getCode()  ||  !pdns_iequals(d_toSign.begin()->qname, rr.qname))) 
+  if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->qtype.getCode() != rr.qtype.getCode()  ||  !pdns_iequals(d_rrsetToSign->begin()->qname, rr.qname))) 
   {
-    flushToSign();
+    sendRRSetToWorker();
   }
-  d_toSign.push_back(rr);
-  return d_chunk.size() > d_chunkrecords;
+  d_rrsetToSign->push_back(rr);
+  return !d_chunks.empty() && d_chunks.back().size() > d_maxchunkrecords;
 }
 
-void ChunkedSigningPipe::sendChunkToSign()
+void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
 {
+  if(d_chunks.empty()) {
+    cerr<<"Help!"<<endl;
+  }
   if(!d_mustSign) {
-    d_chunk.insert(d_chunk.end(), d_toSign.begin(), d_toSign.end());
-    d_toSign.clear();
+    d_chunks.back().insert(d_chunks.back().end(), d_rrsetToSign->begin(), d_rrsetToSign->end());
+    d_rrsetToSign->clear();
     return;
   }
-  if(!d_toSign.empty()) {
-    chunk_t* toSign = new chunk_t(d_toSign);
-    
-    if(write(d_uppipe[1], &toSign, sizeof(toSign)) != sizeof(toSign)) 
+  
+  if(!d_rrsetToSign->empty()) {
+    static int counter;
+    d_rrsetToSign->reserve(2*d_rrsetToSign->size());
+    if(write(d_uppipes[++counter % d_uppipes.size()].second, &d_rrsetToSign, sizeof(d_rrsetToSign)) != sizeof(d_rrsetToSign)) 
       throw runtime_error("Partial write or error communicating to signing thread");
+    d_rrsetToSign = new rrset_t;
     d_outstanding++;
+    d_queued++;
   }
   chunk_t* signedChunk;
   
   while(d_outstanding && read(d_backpipe[0], &signedChunk, sizeof(signedChunk)) > 0) {
     --d_outstanding;
-    d_chunk.insert(d_chunk.end(), signedChunk->begin(), signedChunk->end());
+    d_chunks.back().insert(d_chunks.back().end(), signedChunk->begin(), signedChunk->end());
     delete signedChunk;
+    if(d_chunks.back().size() > d_maxchunkrecords) {
+      d_chunks.push_back(vector<DNSResourceRecord>());
+      break;
+    }    
   }
-  
-  d_toSign.clear();
 }
 
-void ChunkedSigningPipe::worker()
+unsigned int ChunkedSigningPipe::getReady()
+{
+   unsigned int sum=0; 
+   BOOST_FOREACH(const std::vector<DNSResourceRecord>& v, d_chunks) {
+     sum += v.size(); 
+   }
+   return sum;
+}
+void ChunkedSigningPipe::worker(int id)
 {
-  //int my_id = ++s_workerid;
-  // cout<<my_id<<" worker reporting!"<<endl;
-  chunk_t* chunk;
+  chunk_t* chunks[64];
   
   DNSSECKeeper dk;
   int res;
   for(;;) {
-    res=read(d_uppipe[0], &chunk, sizeof(chunk));
+    res=read(d_uppipes[id].first, &chunks[0], 64*sizeof(chunk_t*));
     if(!res) {
-      // cerr<<my_id<<" exiting"<<endl;
+      //cerr<<id<<" exiting"<<endl;
       break;
     }
-    if(res != sizeof(chunk))
+    if(res % sizeof(chunk_t*))
       unixDie("error or partial read from ChunkedSigningPipe main thread");
-    // cout<< my_id <<" worker signing!"<<endl;
-    addRRSigs(dk, d_db, d_signer, *chunk); // should start returning sigs separately instead of interleaved  
-    if(write(d_backpipe[1], &chunk, sizeof(chunk)) != sizeof(chunk))
+    //cerr<<"Got "<<res/sizeof(chunk_t*)<<endl;
+    for(unsigned int n = 0; n < res/sizeof(chunk_t*); ++n) {
+      ++d_signed;
+      addRRSigs(dk, d_db, d_signer, *chunks[n]); 
+    }
+      
+    if(write(d_backpipe[1], &chunks[0], res) != res)
       unixDie("error writing back to ChunkedSigningPipe");
+    
   }
 }
 
 void ChunkedSigningPipe::flushToSign()
 {
-  sendChunkToSign();
-  d_toSign.clear();
+  sendRRSetToWorker();
+  d_rrsetToSign->clear();
 }
 
 vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final)
@@ -113,10 +155,9 @@ vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final)
     flushToSign();
   }
   
-  chunk_t::size_type amount=min(d_chunkrecords, d_chunk.size());
-  chunk_t chunk(d_chunk.begin(), d_chunk.begin() + amount);
-  
-  d_chunk.erase(d_chunk.begin(), d_chunk.begin() + amount);
-  
-  return chunk;
+  vector<DNSResourceRecord> front=d_chunks.front();
+  d_chunks.pop_front();
+  if(d_chunks.empty())
+    d_chunks.push_back(vector<DNSResourceRecord>());
+  return front;
 }
index def6dbee1b4819c7a98f6e6625b41f50806067dc..d60820bca63d097bb48c47867d799215781c2c50 100644 (file)
@@ -2,6 +2,7 @@
 #define PDNS_SIGNINGPIPE
 #include <vector>
 #include <pthread.h>
+#include <stdio.h>
 #include "dnsseckeeper.hh"
 #include "dns.hh"
 using std::string;
@@ -14,30 +15,37 @@ using std::vector;
 class ChunkedSigningPipe
 {
 public:
-  typedef vector<DNSResourceRecord> chunk_t; 
+  typedef vector<DNSResourceRecord> rrset_t; 
+  typedef rrset_t chunk_t; // for now
   
   ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int numWorkers=3);
   ~ChunkedSigningPipe();
   bool submit(const DNSResourceRecord& rr);
   chunk_t getChunk(bool final=false);
+  int d_queued;
+  AtomicCounter d_signed;
+  int d_outstanding;
+  unsigned int getReady();
 private:
   void flushToSign();  
   
-  void sendChunkToSign(); // dispatch chunk to worker
-  void worker();
+  void sendRRSetToWorker(); // dispatch RRSET to worker
+  void worker(int n);
   
   static void* helperWorker(void* p);
-  chunk_t d_toSign, d_chunk;
+  rrset_t* d_rrsetToSign;
+  std::deque< std::vector<DNSResourceRecord> > d_chunks;
   DNSSECKeeper& d_dk;
   UeberBackend& d_db;
   string d_signer;
-  chunk_t::size_type d_chunkrecords;
   
-  int d_uppipe[2], d_backpipe[2];
-  int d_outstanding;
+  chunk_t::size_type d_maxchunkrecords;
+  
+  std::vector<std::pair<int, int> > d_uppipes;
+  int d_backpipe[2];
+  
   unsigned int d_numworkers;
   vector<pthread_t> d_tids;
-  static AtomicCounter s_workerid;
   bool d_mustSign;
 };