]> granicus.if.org Git - pdns/commitdiff
auth: Handle signing pipe worker dying with work still pending
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 6 Oct 2017 10:48:26 +0000 (12:48 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 6 Oct 2017 10:48:26 +0000 (12:48 +0200)
pdns/pdnsutil.cc
pdns/signingpipe.cc
pdns/signingpipe.hh
pdns/tcpreceiver.cc

index db522db8d6b2f73160a82e510ed4e0275df81b55..69edd89054a4d502989aa20410f72db365c1c400 100644 (file)
@@ -1424,7 +1424,7 @@ void testSpeed(DNSSECKeeper& dk, const DNSName& zone, const string& remote, int
     throw runtime_error("No backends available for DNSSEC key storage");
   }
 
-  ChunkedSigningPipe csp(DNSName(zone), 1, remote, cores);
+  ChunkedSigningPipe csp(DNSName(zone), 1, cores);
   
   vector<DNSZoneRecord> signatures;
   uint32_t rnd;
index ffe5b29960e59d5fc54d951ca91ca46e18fe38ef..2f8a2acd6737ee33e3caaa2b16ef5576231c3242 100644 (file)
@@ -55,7 +55,7 @@ catch(...) {
   return nullptr;
 }
 
-ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, const string& servers, unsigned int workers)
+ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers)
   : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
     d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
 {
@@ -75,6 +75,7 @@ ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign,
     d_threads[n] = std::thread(helperWorker, this, fds[1]);
     setNonBlocking(fds[0]);
     d_sockets.push_back(fds[0]);
+    d_outstandings[fds[0]] = 0;
   }
 }
 
@@ -131,7 +132,7 @@ bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr)
 pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
 {
   vector<pollfd> pfds;
-  
+
   for(unsigned int n = 0; n < d_sockets.size(); ++n) {    
     if(d_eof.count(d_sockets[n]))  
       continue;
@@ -144,7 +145,7 @@ pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr,
       pfd.events |= POLLOUT;
     pfds.push_back(pfd);
   }
-  
+
   int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
   if(res < 0)
     unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
@@ -195,12 +196,13 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
   
   pair<vector<int>, vector<int> > rwVect;
   
-  rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen  
+  rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
   
   if(wantWrite && !rwVect.second.empty()) {
     random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
     writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
     d_rrsetToSign = new rrset_t;
+    d_outstandings[*rwVect.second.begin()]++;
     d_outstanding++;
     d_queued++;
     wantWrite=false;
@@ -217,6 +219,9 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
         while(d_outstanding) {
           int res = readn(fd, &chunk, sizeof(chunk));
           if(!res) {
+            if (d_outstandings[fd] > 0) {
+              throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
+            }
             d_eof.insert(fd);
             break;
           }
@@ -228,6 +233,7 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
           }
           
           --d_outstanding;
+          d_outstandings[fd]--;
           
           addSignedToChunks(chunk);
           
@@ -236,22 +242,23 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
       }
       if(!d_outstanding || !d_final)
         break;
-      rwVect = waitForRW(1, 0, -1); // wait for something to happen  
+      rwVect = waitForRW(true, false, -1); // wait for something to happen
     }
   }
   
   if(wantWrite) {  // our optimization above failed, we now wait synchronously
-    rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen  
+    rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
     random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
     writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
     d_rrsetToSign = new rrset_t;
+    d_outstandings[*rwVect.second.begin()]++;
     d_outstanding++;
     d_queued++;
   }
   
 }
 
-unsigned int ChunkedSigningPipe::getReady()
+unsigned int ChunkedSigningPipe::getReady() const
 {
    unsigned int sum=0; 
    for(const auto& v :  d_chunks) {
index 7bea5c269c6c8f387760b428db2d09dfe445662e..eb10b2fa60b6c089d657368742726bb0f318298f 100644 (file)
@@ -41,15 +41,16 @@ public:
   typedef vector<DNSZoneRecord> rrset_t; 
   typedef rrset_t chunk_t; // for now
   
-  ChunkedSigningPipe(const DNSName& signerName, bool mustSign, /* FIXME servers is unused? */ const string& servers=string(), unsigned int numWorkers=3);
+  ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int numWorkers=3);
   ~ChunkedSigningPipe();
   bool submit(const DNSZoneRecord& rr);
   chunk_t getChunk(bool final=false);
+  unsigned int getReady() const;
 
   std::atomic<unsigned long> d_signed;
-  int d_queued;
-  int d_outstanding;
-  unsigned int getReady();
+  unsigned int d_queued;
+  unsigned int d_outstanding;
+
 private:
   void flushToSign();  
   void dedupRRSet();
@@ -61,7 +62,7 @@ private:
   void worker(int fd);
 
   unsigned int d_numworkers;
-  int d_submitted;
+  unsigned int d_submitted;
 
   rrset_t* d_rrsetToSign;
   std::deque< std::vector<DNSZoneRecord> > d_chunks;
@@ -71,6 +72,8 @@ private:
   
   std::vector<int> d_sockets;
   std::set<int> d_eof;
+  std::map<int,int> d_outstandings;
+
   vector<std::thread> d_threads;
   bool d_mustSign;
   bool d_final;
index d6f241b0701c6b84e3a8b6c64c8ad93cffe0ca6a..53ff0c1e26e29fc228c166ccecb287bd4a6552cf 100644 (file)
@@ -673,7 +673,7 @@ int TCPNameserver::doAXFR(const DNSName &target, shared_ptr<DNSPacket> q, int ou
   trc.d_mac = outpacket->d_trc.d_mac;
   outpacket = getFreshAXFRPacket(q);
   
-  ChunkedSigningPipe csp(target, securedZone, "", ::arg().asNum("signing-threads", 1));
+  ChunkedSigningPipe csp(target, securedZone, ::arg().asNum("signing-threads", 1));
   
   typedef map<string, NSECXEntry> nsecxrepo_t;
   nsecxrepo_t nsecxrepo;