From: Remi Gacogne Date: Fri, 6 Oct 2017 10:48:26 +0000 (+0200) Subject: auth: Handle signing pipe worker dying with work still pending X-Git-Tag: rec-4.1.0-rc2~32^2 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=e3200e070e7cc4e243676776c41eb806c4edb7a5;p=pdns auth: Handle signing pipe worker dying with work still pending --- diff --git a/pdns/pdnsutil.cc b/pdns/pdnsutil.cc index db522db8d..69edd8905 100644 --- a/pdns/pdnsutil.cc +++ b/pdns/pdnsutil.cc @@ -1424,7 +1424,7 @@ void testSpeed(DNSSECKeeper& dk, const DNSName& zone, const string& remote, int throw runtime_error("No backends available for DNSSEC key storage"); } - ChunkedSigningPipe csp(DNSName(zone), 1, remote, cores); + ChunkedSigningPipe csp(DNSName(zone), 1, cores); vector signatures; uint32_t rnd; diff --git a/pdns/signingpipe.cc b/pdns/signingpipe.cc index ffe5b2996..2f8a2acd6 100644 --- a/pdns/signingpipe.cc +++ b/pdns/signingpipe.cc @@ -55,7 +55,7 @@ catch(...) { return nullptr; } -ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, const string& servers, unsigned int workers) +ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers) : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName), d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false) { @@ -75,6 +75,7 @@ ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, d_threads[n] = std::thread(helperWorker, this, fds[1]); setNonBlocking(fds[0]); d_sockets.push_back(fds[0]); + d_outstandings[fds[0]] = 0; } } @@ -131,7 +132,7 @@ bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr) pair, vector > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds) { vector pfds; - + for(unsigned int n = 0; n < d_sockets.size(); ++n) { if(d_eof.count(d_sockets[n])) continue; @@ -144,7 +145,7 @@ pair, vector > ChunkedSigningPipe::waitForRW(bool rd, bool wr, pfd.events |= POLLOUT; pfds.push_back(pfd); } - + int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite if(res < 0) unixDie("polling for activity from signers, "+std::to_string(d_sockets.size())); @@ -195,12 +196,13 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! pair, vector > rwVect; - rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen + rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen if(wantWrite && !rwVect.second.empty()) { random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign)); d_rrsetToSign = new rrset_t; + d_outstandings[*rwVect.second.begin()]++; d_outstanding++; d_queued++; wantWrite=false; @@ -217,6 +219,9 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! while(d_outstanding) { int res = readn(fd, &chunk, sizeof(chunk)); if(!res) { + if (d_outstandings[fd] > 0) { + throw std::runtime_error("A signing pipe worker died while we were waiting for its result"); + } d_eof.insert(fd); break; } @@ -228,6 +233,7 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! } --d_outstanding; + d_outstandings[fd]--; addSignedToChunks(chunk); @@ -236,22 +242,23 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! } if(!d_outstanding || !d_final) break; - rwVect = waitForRW(1, 0, -1); // wait for something to happen + rwVect = waitForRW(true, false, -1); // wait for something to happen } } if(wantWrite) { // our optimization above failed, we now wait synchronously - rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen + rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign)); d_rrsetToSign = new rrset_t; + d_outstandings[*rwVect.second.begin()]++; d_outstanding++; d_queued++; } } -unsigned int ChunkedSigningPipe::getReady() +unsigned int ChunkedSigningPipe::getReady() const { unsigned int sum=0; for(const auto& v : d_chunks) { diff --git a/pdns/signingpipe.hh b/pdns/signingpipe.hh index 7bea5c269..eb10b2fa6 100644 --- a/pdns/signingpipe.hh +++ b/pdns/signingpipe.hh @@ -41,15 +41,16 @@ public: typedef vector rrset_t; typedef rrset_t chunk_t; // for now - ChunkedSigningPipe(const DNSName& signerName, bool mustSign, /* FIXME servers is unused? */ const string& servers=string(), unsigned int numWorkers=3); + ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int numWorkers=3); ~ChunkedSigningPipe(); bool submit(const DNSZoneRecord& rr); chunk_t getChunk(bool final=false); + unsigned int getReady() const; std::atomic d_signed; - int d_queued; - int d_outstanding; - unsigned int getReady(); + unsigned int d_queued; + unsigned int d_outstanding; + private: void flushToSign(); void dedupRRSet(); @@ -61,7 +62,7 @@ private: void worker(int fd); unsigned int d_numworkers; - int d_submitted; + unsigned int d_submitted; rrset_t* d_rrsetToSign; std::deque< std::vector > d_chunks; @@ -71,6 +72,8 @@ private: std::vector d_sockets; std::set d_eof; + std::map d_outstandings; + vector d_threads; bool d_mustSign; bool d_final; diff --git a/pdns/tcpreceiver.cc b/pdns/tcpreceiver.cc index d6f241b07..53ff0c1e2 100644 --- a/pdns/tcpreceiver.cc +++ b/pdns/tcpreceiver.cc @@ -673,7 +673,7 @@ int TCPNameserver::doAXFR(const DNSName &target, shared_ptr q, int ou trc.d_mac = outpacket->d_trc.d_mac; outpacket = getFreshAXFRPacket(q); - ChunkedSigningPipe csp(target, securedZone, "", ::arg().asNum("signing-threads", 1)); + ChunkedSigningPipe csp(target, securedZone, ::arg().asNum("signing-threads", 1)); typedef map nsecxrepo_t; nsecxrepo_t nsecxrepo;