#include <mutex>
#include <thread>
#include <dirent.h>
+#include <queue>
+#include <condition_variable>
#include "ixfr.hh"
#include "ixfrutils.hh"
#include "resolver.hh"
std::map<DNSName, ixfrinfo_t> g_soas;
std::mutex g_soas_mutex;
+// Condition variable for TCP handling
+std::condition_variable g_tcpHandlerCV;
+std::queue<pair<int, ComboAddress>> g_tcpRequestFDs;
+std::mutex g_tcpRequestFDsMutex;
+
using namespace boost::multi_index;
namespace po = boost::program_options;
return;
}
- char buf[4096];
- ssize_t res;
- try {
- uint16_t toRead;
- readn2(cfd, &toRead, sizeof(toRead));
- toRead = std::min(ntohs(toRead), static_cast<uint16_t>(sizeof(buf)));
- res = readn2WithTimeout(cfd, &buf, toRead, 2);
- } catch (runtime_error &e) {
- cerr<<"[WARNING] Could not read message from "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
- close(cfd);
- return;
+ {
+ std::lock_guard<std::mutex> lg(g_tcpRequestFDsMutex);
+ g_tcpRequestFDs.push({cfd, saddr});
}
+ g_tcpHandlerCV.notify_one();
+}
- try {
- MOADNSParser mdp(true, string(buf, res));
+/* Thread to handle TCP traffic
+ */
+void tcpWorker(int tid) {
+ string prefix = "TCP Worker " + std::to_string(tid) + ": ";
+
+ while(true) {
+ if (g_debug) {
+ cerr<<"[DEBUG] "<<prefix<<"ready for a new request!"<<endl;
+ }
+ std::unique_lock<std::mutex> lk(g_tcpRequestFDsMutex);
+ g_tcpHandlerCV.wait(lk, []{return g_tcpRequestFDs.size() || g_exiting ;});
+ if (g_exiting) {
+ if (g_debug) {
+ cerr<<"[DEBUG] "<<prefix<<"stopping thread"<<endl;
+ }
+ break;
+ }
+ if (g_debug) {
+ cerr<<"[DEBUG] "<<prefix<<"Going to handle a query"<<endl;
+ }
+ auto request = g_tcpRequestFDs.front();
+ g_tcpRequestFDs.pop();
+ lk.unlock();
- if (!checkQuery(mdp, saddr, false)) {
+ int cfd = request.first;
+ ComboAddress saddr = request.second;
+
+ char buf[4096];
+ ssize_t res;
+ try {
+ uint16_t toRead;
+ readn2(cfd, &toRead, sizeof(toRead));
+ toRead = std::min(ntohs(toRead), static_cast<uint16_t>(sizeof(buf)));
+ res = readn2WithTimeout(cfd, &buf, toRead, 2);
+ } catch (runtime_error &e) {
+ cerr<<"[WARNING] "<<prefix<<"Could not read message from "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
close(cfd);
- return;
+ continue;
}
- vector<vector<uint8_t>> packets;
- if (mdp.d_qtype == QType::SOA) {
- vector<uint8_t> packet;
- bool ret = makeSOAPacket(mdp, packet);
- if (!ret) {
+ try {
+ MOADNSParser mdp(true, string(buf, res));
+
+ if (!checkQuery(mdp, saddr, false)) {
close(cfd);
- return;
+ continue;
}
- packets.push_back(packet);
- }
- if (mdp.d_qtype == QType::AXFR) {
- if (!makeAXFRPackets(mdp, packets)) {
- close(cfd);
- return;
+ vector<vector<uint8_t>> packets;
+ if (mdp.d_qtype == QType::SOA) {
+ vector<uint8_t> packet;
+ bool ret = makeSOAPacket(mdp, packet);
+ if (!ret) {
+ close(cfd);
+ continue;
+ }
+ packets.push_back(packet);
}
- }
- if (mdp.d_qtype == QType::IXFR) {
- /* RFC 1995 section 3:
- * The IXFR query packet format is the same as that of a normal DNS
- * query, but with the query type being IXFR and the authority section
- * containing the SOA record of client's version of the zone.
- */
- shared_ptr<SOARecordContent> clientSOA;
- for (auto &answer : mdp.d_answers) {
- // from dnsparser.hh:
- // typedef vector<pair<DNSRecord, uint16_t > > answers_t;
- if (answer.first.d_type == QType::SOA && answer.first.d_place == DNSResourceRecord::AUTHORITY) {
- clientSOA = getRR<SOARecordContent>(answer.first);
- if (clientSOA != nullptr) {
- break;
+ if (mdp.d_qtype == QType::AXFR) {
+ if (!makeAXFRPackets(mdp, packets)) {
+ close(cfd);
+ continue;
+ }
+ }
+
+ if (mdp.d_qtype == QType::IXFR) {
+ /* RFC 1995 section 3:
+ * The IXFR query packet format is the same as that of a normal DNS
+ * query, but with the query type being IXFR and the authority section
+ * containing the SOA record of client's version of the zone.
+ */
+ shared_ptr<SOARecordContent> clientSOA;
+ for (auto &answer : mdp.d_answers) {
+ // from dnsparser.hh:
+ // typedef vector<pair<DNSRecord, uint16_t > > answers_t;
+ if (answer.first.d_type == QType::SOA && answer.first.d_place == DNSResourceRecord::AUTHORITY) {
+ clientSOA = getRR<SOARecordContent>(answer.first);
+ if (clientSOA != nullptr) {
+ break;
+ }
}
+ } /* for (auto const &answer : mdp.d_answers) */
+
+ if (clientSOA == nullptr) {
+ cerr<<"[WARNING] "<<prefix<<"IXFR request packet did not contain a SOA record in the AUTHORITY section"<<endl;
+ close(cfd);
+ continue;
}
- } /* for (auto const &answer : mdp.d_answers) */
- if (clientSOA == nullptr) {
- cerr<<"[WARNING] IXFR request packet did not contain a SOA record in the AUTHORITY section"<<endl;
- close(cfd);
- return;
- }
+ if (!makeIXFRPackets(mdp, clientSOA, packets)) {
+ close(cfd);
+ continue;
+ }
+ } /* if (mdp.d_qtype == QType::IXFR) */
- if (!makeIXFRPackets(mdp, clientSOA, packets)) {
- close(cfd);
- return;
- }
- } /* if (mdp.d_qtype == QType::IXFR) */
+ for (const auto& packet : packets) {
+ char sendBuf[2];
+ sendBuf[0]=packet.size()/256;
+ sendBuf[1]=packet.size()%256;
- for (const auto& packet : packets) {
- char sendBuf[2];
- sendBuf[0]=packet.size()/256;
- sendBuf[1]=packet.size()%256;
+ ssize_t send = writen2(cfd, sendBuf, 2);
+ send += writen2(cfd, &packet[0], packet.size());
+ }
+ shutdown(cfd, 2);
+ } catch (MOADNSException &e) {
+ cerr<<"[WARNING] "<<prefix<<"Could not parse DNS packet from "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
+ } catch (runtime_error &e) {
+ cerr<<"[WARNING] "<<prefix<<"Could not write reply to "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
+ }
+ // bye!
+ close(cfd);
- ssize_t send = writen2(cfd, sendBuf, 2);
- send += writen2(cfd, &packet[0], packet.size());
+ if (g_exiting) {
+ break;
}
- shutdown(cfd, 2);
- } catch (MOADNSException &e) {
- cerr<<"[WARNING] Could not parse DNS packet from "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
- } catch (runtime_error &e) {
- cerr<<"[WARNING] Could not write reply to "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
}
- // bye!
- close(cfd);
}
int main(int argc, char** argv) {
("server-address", po::value<string>()->default_value("127.0.0.1:5300"), "server address")
("work-dir", po::value<string>()->default_value("."), "Directory for storing AXFR and IXFR data")
("keep", po::value<uint16_t>()->default_value(KEEP_DEFAULT), "Number of old zone versions to retain")
- ("axfr-timeout", po::value<uint16_t>()->default_value(AXFRTIMEOUT_DEFAULT), "Timeout in seconds for an AXFR to complete")
+ ("axfr-timeout", po::value<uint16_t>()->default_value(AXFRTIMEOUT_DEFAULT), "Timeout in seconds for an inbound AXFR to complete")
+ ("tcp-out-threads", po::value<uint16_t>()->default_value(10), "Number of maximum simultaneous outbound TCP connections")
;
po::options_description alloptions;
po::options_description hidden("hidden options");
signal(SIGTERM, handleSignal);
signal(SIGINT, handleSignal);
signal(SIGSTOP, handleSignal);
+ signal(SIGPIPE, SIG_IGN);
// Init the things we need
reportAllTypes();
cout<<"[INFO] IXFR distributor starting up!"<<endl;
std::thread ut(updateThread);
+ std::thread tcpHandlers[g_vm["tcp-out-threads"].as<uint16_t>()];
+ for (int i=0; i<g_vm["tcp-out-threads"].as<uint16_t>(); i++) {
+ tcpHandlers[i] = std::thread(tcpWorker, i);
+ }
struct timeval now;
for(;;) {
}
}
ut.join();
+ g_tcpHandlerCV.notify_all();
+ for (int i=0; i<g_vm["tcp-out-threads"].as<uint16_t>(); i++) {
+ tcpHandlers[i].join();
+ }
if (g_verbose) {
cerr<<"[INFO] IXFR distributor stopped"<<endl;
}