return nullptr;
}
-ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, const string& servers, unsigned int workers)
+ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers)
: d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
{
d_threads[n] = std::thread(helperWorker, this, fds[1]);
setNonBlocking(fds[0]);
d_sockets.push_back(fds[0]);
+ d_outstandings[fds[0]] = 0;
}
}
pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
{
vector<pollfd> pfds;
-
+
for(unsigned int n = 0; n < d_sockets.size(); ++n) {
if(d_eof.count(d_sockets[n]))
continue;
pfd.events |= POLLOUT;
pfds.push_back(pfd);
}
-
+
int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
if(res < 0)
unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
pair<vector<int>, vector<int> > rwVect;
- rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
+ rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
if(wantWrite && !rwVect.second.empty()) {
random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
d_rrsetToSign = new rrset_t;
+ d_outstandings[*rwVect.second.begin()]++;
d_outstanding++;
d_queued++;
wantWrite=false;
while(d_outstanding) {
int res = readn(fd, &chunk, sizeof(chunk));
if(!res) {
+ if (d_outstandings[fd] > 0) {
+ throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
+ }
d_eof.insert(fd);
break;
}
}
--d_outstanding;
+ d_outstandings[fd]--;
addSignedToChunks(chunk);
}
if(!d_outstanding || !d_final)
break;
- rwVect = waitForRW(1, 0, -1); // wait for something to happen
+ rwVect = waitForRW(true, false, -1); // wait for something to happen
}
}
if(wantWrite) { // our optimization above failed, we now wait synchronously
- rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen
+ rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
d_rrsetToSign = new rrset_t;
+ d_outstandings[*rwVect.second.begin()]++;
d_outstanding++;
d_queued++;
}
}
-unsigned int ChunkedSigningPipe::getReady()
+unsigned int ChunkedSigningPipe::getReady() const
{
unsigned int sum=0;
for(const auto& v : d_chunks) {
typedef vector<DNSZoneRecord> rrset_t;
typedef rrset_t chunk_t; // for now
- ChunkedSigningPipe(const DNSName& signerName, bool mustSign, /* FIXME servers is unused? */ const string& servers=string(), unsigned int numWorkers=3);
+ ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int numWorkers=3);
~ChunkedSigningPipe();
bool submit(const DNSZoneRecord& rr);
chunk_t getChunk(bool final=false);
+ unsigned int getReady() const;
std::atomic<unsigned long> d_signed;
- int d_queued;
- int d_outstanding;
- unsigned int getReady();
+ unsigned int d_queued;
+ unsigned int d_outstanding;
+
private:
void flushToSign();
void dedupRRSet();
void worker(int fd);
unsigned int d_numworkers;
- int d_submitted;
+ unsigned int d_submitted;
rrset_t* d_rrsetToSign;
std::deque< std::vector<DNSZoneRecord> > d_chunks;
std::vector<int> d_sockets;
std::set<int> d_eof;
+ std::map<int,int> d_outstandings;
+
vector<std::thread> d_threads;
bool d_mustSign;
bool d_final;