From 0efb53ddad5498ac346a88d7db8fadcc3b9a7110 Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Fri, 6 Oct 2017 12:48:26 +0200 Subject: [PATCH] auth: Handle signing pipe worker dying with work still pending (cherry picked from commit e3200e070e7cc4e243676776c41eb806c4edb7a5) --- pdns/pdnsutil.cc | 2 +- pdns/signingpipe.cc | 21 ++++++++++++++------- pdns/signingpipe.hh | 13 ++++++++----- pdns/tcpreceiver.cc | 2 +- 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/pdns/pdnsutil.cc b/pdns/pdnsutil.cc index 2361bee48..437bbe26d 100644 --- a/pdns/pdnsutil.cc +++ b/pdns/pdnsutil.cc @@ -1371,7 +1371,7 @@ void testSpeed(DNSSECKeeper& dk, const DNSName& zone, const string& remote, int throw runtime_error("No backends available for DNSSEC key storage"); } - ChunkedSigningPipe csp(DNSName(zone), 1, remote, cores); + ChunkedSigningPipe csp(DNSName(zone), 1, cores); vector signatures; uint32_t rnd; diff --git a/pdns/signingpipe.cc b/pdns/signingpipe.cc index 8c1616f62..ddffe9649 100644 --- a/pdns/signingpipe.cc +++ b/pdns/signingpipe.cc @@ -70,7 +70,7 @@ catch(...) { return 0; } -ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, const string& servers, unsigned int workers) +ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers) : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName), d_maxchunkrecords(100), d_tids(d_numworkers), d_mustSign(mustSign), d_final(false) { @@ -90,6 +90,7 @@ ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1])); setNonBlocking(fds[0]); d_sockets.push_back(fds[0]); + d_outstandings[fds[0]] = 0; } } @@ -145,7 +146,7 @@ bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr) pair, vector > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds) { vector pfds; - + for(unsigned int n = 0; n < d_sockets.size(); ++n) { if(d_eof.count(d_sockets[n])) continue; @@ -158,7 +159,7 @@ pair, vector > ChunkedSigningPipe::waitForRW(bool rd, bool wr, pfd.events |= POLLOUT; pfds.push_back(pfd); } - + int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite if(res < 0) unixDie("polling for activity from signers, "+std::to_string(d_sockets.size())); @@ -209,12 +210,13 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! pair, vector > rwVect; - rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen + rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen if(wantWrite && !rwVect.second.empty()) { random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign)); d_rrsetToSign = new rrset_t; + d_outstandings[*rwVect.second.begin()]++; d_outstanding++; d_queued++; wantWrite=false; @@ -231,6 +233,9 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! while(d_outstanding) { int res = readn(fd, &chunk, sizeof(chunk)); if(!res) { + if (d_outstandings[fd] > 0) { + throw std::runtime_error("A signing pipe worker died while we were waiting for its result"); + } d_eof.insert(fd); break; } @@ -242,6 +247,7 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! } --d_outstanding; + d_outstandings[fd]--; addSignedToChunks(chunk); @@ -250,22 +256,23 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! } if(!d_outstanding || !d_final) break; - rwVect = waitForRW(1, 0, -1); // wait for something to happen + rwVect = waitForRW(true, false, -1); // wait for something to happen } } if(wantWrite) { // our optimization above failed, we now wait synchronously - rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen + rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign)); d_rrsetToSign = new rrset_t; + d_outstandings[*rwVect.second.begin()]++; d_outstanding++; d_queued++; } } -unsigned int ChunkedSigningPipe::getReady() +unsigned int ChunkedSigningPipe::getReady() const { unsigned int sum=0; for(const std::vector& v : d_chunks) { diff --git a/pdns/signingpipe.hh b/pdns/signingpipe.hh index 99884982c..241b42c37 100644 --- a/pdns/signingpipe.hh +++ b/pdns/signingpipe.hh @@ -40,15 +40,16 @@ public: typedef vector rrset_t; typedef rrset_t chunk_t; // for now - ChunkedSigningPipe(const DNSName& signerName, bool mustSign, /* FIXME servers is unused? */ const string& servers=string(), unsigned int numWorkers=3); + ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int numWorkers=3); ~ChunkedSigningPipe(); bool submit(const DNSResourceRecord& rr); chunk_t getChunk(bool final=false); + unsigned int getReady() const; std::atomic d_signed; - int d_queued; - int d_outstanding; - unsigned int getReady(); + unsigned int d_queued; + unsigned int d_outstanding; + private: void flushToSign(); void dedupRRSet(); @@ -61,7 +62,7 @@ private: static void* helperWorker(void* p); unsigned int d_numworkers; - int d_submitted; + unsigned int d_submitted; rrset_t* d_rrsetToSign; std::deque< std::vector > d_chunks; @@ -72,6 +73,8 @@ private: std::vector d_sockets; std::set d_eof; vector d_tids; + std::map d_outstandings; + bool d_mustSign; bool d_final; }; diff --git a/pdns/tcpreceiver.cc b/pdns/tcpreceiver.cc index 31c4d0b7b..ec8a4aac3 100644 --- a/pdns/tcpreceiver.cc +++ b/pdns/tcpreceiver.cc @@ -653,7 +653,7 @@ int TCPNameserver::doAXFR(const DNSName &target, shared_ptr q, int ou trc.d_mac = outpacket->d_trc.d_mac; outpacket = getFreshAXFRPacket(q); - ChunkedSigningPipe csp(target, securedZone, "", ::arg().asNum("signing-threads", 1)); + ChunkedSigningPipe csp(target, securedZone, ::arg().asNum("signing-threads", 1)); typedef map nsecxrepo_t; nsecxrepo_t nsecxrepo; -- 2.40.0