#include "sodcrypto.hh"
#include "base64.hh"
#include <fstream>
+#include "lock.hh"
using std::thread;
g_lua.writeFunction("topClients", [](unsigned int top) {
map<ComboAddress, int,ComboAddress::addressOnlyLessThan > counts;
unsigned int total=0;
- for(const auto& c : g_rings.queryRing) {
- counts[c.requestor]++;
- total++;
+ {
+ ReadLock rl(&g_rings.queryLock);
+ for(const auto& c : g_rings.queryRing) {
+ counts[c.requestor]++;
+ total++;
+ }
}
vector<pair<int, ComboAddress>> rcounts;
for(const auto& c : counts)
map<DNSName, int> counts;
unsigned int total=0;
if(!labels) {
+ ReadLock rl(&g_rings.queryLock);
for(const auto& a : g_rings.queryRing) {
counts[a.name]++;
total++;
}
else {
unsigned int lab = *labels;
+ ReadLock rl(&g_rings.queryLock);
for(auto a : g_rings.queryRing) {
a.name.trimToLabels(lab);
counts[a.name]++;
total++;
}
-
}
// cout<<"Looked at "<<total<<" queries, "<<counts.size()<<" different ones"<<endl;
vector<pair<int, DNSName>> rcounts;
#include "dnsrulactions.hh"
#include <grp.h>
#include <pwd.h>
+#include "lock.hh"
#include <getopt.h>
/* Known sins:
continue;
IDState* ids = &state->idStates[dh->id];
- if(ids->origFD < 0) // duplicate
+ int origFD;
+ {
+ ReadLock rl(&(ids->lock));
+ origFD = ids->origFD;
+ }
+ if(origFD < 0) // duplicate
continue;
else
--state->outstanding; // you'd think an attacker could game this, but we're using connected socket
g_stats.responses++;
if(ids->delayMsec && g_delay) {
- DelayedPacket dp{ids->origFD, string(packet,len), ids->origRemote, ids->origDest};
+ DelayedPacket dp{origFD, string(packet,len), ids->origRemote, ids->origDest};
g_delay->submit(dp, ids->delayMsec);
}
else {
if(ids->origDest.sin4.sin_family == 0)
- sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen());
+ sendto(origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen());
else
- sendfromto(ids->origFD, packet, len, 0, ids->origDest, ids->origRemote);
+ sendfromto(origFD, packet, len, 0, ids->origDest, ids->origRemote);
}
double udiff = ids->sentTime.udiff();
vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
doAvg(g_stats.latencyAvg10000, udiff, 10000);
doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
- ids->origFD = -1;
+ {
+ WriteLock wl(&(ids->lock));
+ if (ids->origFD == origFD)
+ ids->origFD = -1;
+ }
}
return 0;
}
/* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
poss.reserve(servers.size());
- for(auto& d : servers) {
+ for(auto& d : servers) {
if(d.second->isUp()) {
poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
}
DNSName qname(packet, len, 12, false, &qtype);
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
- g_rings.queryRing.push_back({now,remote,qname,qtype}); // XXX LOCK?!
+ {
+ WriteLock wl(&g_rings.queryLock);
+ g_rings.queryRing.push_back({now,remote,qname,qtype});
+ }
if(localDynBlock->match(remote)) {
vinfolog("Query from %s dropped because of dynamic block", remote.toStringWithPort());
ss->queries++;
unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
- IDState* ids = &ss->idStates[idOffset];
-
- if(ids->origFD < 0) // if we are reusing, no change in outstanding
- ss->outstanding++;
- else {
- ss->reuseds++;
- g_stats.downstreamTimeouts++;
+ {
+ IDState* ids = &ss->idStates[idOffset];
+ WriteLock wl(&ids->lock);
+
+ if(ids->origFD < 0) // if we are reusing, no change in outstanding
+ ss->outstanding++;
+ else {
+ ss->reuseds++;
+ g_stats.downstreamTimeouts++;
+ }
+
+ ids->origFD = cs->udpFD;
+ ids->age = 0;
+ ids->origID = dh->id;
+ ids->origRemote = remote;
+ ids->sentTime.start();
+ ids->qname = qname;
+ ids->qtype = qtype;
+ ids->origDest.sin4.sin_family=0;
+ ids->delayMsec = delayMsec;
+ ids->origFlags = origFlags;
+ HarvestDestinationAddress(&msgh, &ids->origDest);
}
-
- ids->origFD = cs->udpFD;
- ids->age = 0;
- ids->origID = dh->id;
- ids->origRemote = remote;
- ids->sentTime.start();
- ids->qname = qname;
- ids->qtype = qtype;
- ids->origDest.sin4.sin_family=0;
- ids->delayMsec = delayMsec;
- ids->origFlags = origFlags;
- HarvestDestinationAddress(&msgh, &ids->origDest);
-
+
dh->id = idOffset;
len = send(ss->fd, packet, len, 0);
dss->prev.reuseds.store(dss->reuseds.load());
for(IDState& ids : dss->idStates) { // timeouts
+ WriteLock wl(&(ids.lock));
if(ids.origFD >=0 && ids.age++ > 2) {
ids.age = 0;
ids.origFD = -1;
struct IDState
{
- IDState() : origFD(-1), delayMsec(0) { origDest.sin4.sin_family = 0;}
+ IDState() : origFD(-1), delayMsec(0) { origDest.sin4.sin_family = 0; pthread_rwlock_init(&lock, 0);}
IDState(const IDState& orig)
{
origFD = orig.origFD;
origDest = orig.origDest;
delayMsec = orig.delayMsec;
age.store(orig.age.load());
+ pthread_rwlock_init(&lock, 0);
}
int origFD; // set to <0 to indicate this state is empty // 4
ComboAddress origDest; // 28
StopWatch sentTime; // 16
DNSName qname; // 80
+ pthread_rwlock_t lock;
std::atomic<uint16_t> age; // 4
uint16_t qtype; // 2
uint16_t origID; // 2
{
queryRing.set_capacity(10000);
respRing.set_capacity(10000);
+ pthread_rwlock_init(&queryLock, 0);
}
struct Query
{
};
boost::circular_buffer<Response> respRing;
std::mutex respMutex;
+ pthread_rwlock_t queryLock;
};
-extern Rings g_rings; // XXX locking for this is still substandard, queryRing and clientRing need RW lock
+extern Rings g_rings;
struct ClientState
{