]> granicus.if.org Git - pdns/commitdiff
auth: Handle signing pipe worker dying with work still pending
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 6 Oct 2017 10:48:26 +0000 (12:48 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 6 Oct 2017 10:51:03 +0000 (12:51 +0200)
(cherry picked from commit e3200e070e7cc4e243676776c41eb806c4edb7a5)

pdns/pdnsutil.cc
pdns/signingpipe.cc
pdns/signingpipe.hh
pdns/tcpreceiver.cc

index 2361bee48077b10a0a84fcad0b35f7dfa95ca358..437bbe26d685f3d1b9926ec249d36474773a24f5 100644 (file)
@@ -1371,7 +1371,7 @@ void testSpeed(DNSSECKeeper& dk, const DNSName& zone, const string& remote, int
     throw runtime_error("No backends available for DNSSEC key storage");
   }
 
-  ChunkedSigningPipe csp(DNSName(zone), 1, remote, cores);
+  ChunkedSigningPipe csp(DNSName(zone), 1, cores);
   
   vector<DNSResourceRecord> signatures;
   uint32_t rnd;
index 8c1616f627e1715a0ee3c23dbb22c880e9a3c624..ddffe9649b1a5c55726f9b2d6d2992ae1fe0809e 100644 (file)
@@ -70,7 +70,7 @@ catch(...) {
   return 0;
 }
 
-ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, const string& servers, unsigned int workers)
+ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers)
   : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
     d_maxchunkrecords(100), d_tids(d_numworkers), d_mustSign(mustSign), d_final(false)
 {
@@ -90,6 +90,7 @@ ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign,
     pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1]));
     setNonBlocking(fds[0]);
     d_sockets.push_back(fds[0]);
+    d_outstandings[fds[0]] = 0;
   }
 }
 
@@ -145,7 +146,7 @@ bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr)
 pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
 {
   vector<pollfd> pfds;
-  
+
   for(unsigned int n = 0; n < d_sockets.size(); ++n) {    
     if(d_eof.count(d_sockets[n]))  
       continue;
@@ -158,7 +159,7 @@ pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr,
       pfd.events |= POLLOUT;
     pfds.push_back(pfd);
   }
-  
+
   int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
   if(res < 0)
     unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
@@ -209,12 +210,13 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
   
   pair<vector<int>, vector<int> > rwVect;
   
-  rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen  
+  rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
   
   if(wantWrite && !rwVect.second.empty()) {
     random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
     writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
     d_rrsetToSign = new rrset_t;
+    d_outstandings[*rwVect.second.begin()]++;
     d_outstanding++;
     d_queued++;
     wantWrite=false;
@@ -231,6 +233,9 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
         while(d_outstanding) {
           int res = readn(fd, &chunk, sizeof(chunk));
           if(!res) {
+            if (d_outstandings[fd] > 0) {
+              throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
+            }
             d_eof.insert(fd);
             break;
           }
@@ -242,6 +247,7 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
           }
           
           --d_outstanding;
+          d_outstandings[fd]--;
           
           addSignedToChunks(chunk);
           
@@ -250,22 +256,23 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
       }
       if(!d_outstanding || !d_final)
         break;
-      rwVect = waitForRW(1, 0, -1); // wait for something to happen  
+      rwVect = waitForRW(true, false, -1); // wait for something to happen
     }
   }
   
   if(wantWrite) {  // our optimization above failed, we now wait synchronously
-    rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen  
+    rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
     random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
     writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
     d_rrsetToSign = new rrset_t;
+    d_outstandings[*rwVect.second.begin()]++;
     d_outstanding++;
     d_queued++;
   }
   
 }
 
-unsigned int ChunkedSigningPipe::getReady()
+unsigned int ChunkedSigningPipe::getReady() const
 {
    unsigned int sum=0; 
    for(const std::vector<DNSResourceRecord>& v :  d_chunks) {
index 99884982cafd46884c9a12de3f1ced1c57a3ac66..241b42c375b574bde4cf1f5280df5bf7c2c0df0a 100644 (file)
@@ -40,15 +40,16 @@ public:
   typedef vector<DNSResourceRecord> rrset_t; 
   typedef rrset_t chunk_t; // for now
   
-  ChunkedSigningPipe(const DNSName& signerName, bool mustSign, /* FIXME servers is unused? */ const string& servers=string(), unsigned int numWorkers=3);
+  ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int numWorkers=3);
   ~ChunkedSigningPipe();
   bool submit(const DNSResourceRecord& rr);
   chunk_t getChunk(bool final=false);
+  unsigned int getReady() const;
 
   std::atomic<unsigned long> d_signed;
-  int d_queued;
-  int d_outstanding;
-  unsigned int getReady();
+  unsigned int d_queued;
+  unsigned int d_outstanding;
+
 private:
   void flushToSign();  
   void dedupRRSet();
@@ -61,7 +62,7 @@ private:
   static void* helperWorker(void* p);
 
   unsigned int d_numworkers;
-  int d_submitted;
+  unsigned int d_submitted;
 
   rrset_t* d_rrsetToSign;
   std::deque< std::vector<DNSResourceRecord> > d_chunks;
@@ -72,6 +73,8 @@ private:
   std::vector<int> d_sockets;
   std::set<int> d_eof;
   vector<pthread_t> d_tids;
+  std::map<int,int> d_outstandings;
+
   bool d_mustSign;
   bool d_final;
 };
index 31c4d0b7b69d325e40f82752fe3ea796fcda0bbb..ec8a4aac3e64e5bdd3a51aa9c32a8c55e3628561 100644 (file)
@@ -653,7 +653,7 @@ int TCPNameserver::doAXFR(const DNSName &target, shared_ptr<DNSPacket> q, int ou
   trc.d_mac = outpacket->d_trc.d_mac;
   outpacket = getFreshAXFRPacket(q);
   
-  ChunkedSigningPipe csp(target, securedZone, "", ::arg().asNum("signing-threads", 1));
+  ChunkedSigningPipe csp(target, securedZone, ::arg().asNum("signing-threads", 1));
   
   typedef map<string, NSECXEntry> nsecxrepo_t;
   nsecxrepo_t nsecxrepo;