continue;
unixDie("Unable to poll for new UDP events");
}
-
for(struct pollfd &pfd : fds) {
if(pfd.revents & POLLIN) {
setSocketBuffer(fd, SO_SNDBUF, size);
}
-void sendThread(const vector<Socket*>* sockets, const vector<vector<uint8_t> >* packets, int qps, ComboAddress dest)
+void sendPackets(const vector<Socket*>* sockets, const vector<vector<uint8_t>* >& packets, int qps, ComboAddress dest)
{
unsigned int burst=100;
struct timespec nsec;
vector<unique_ptr<Unit> > units;
int ret;
- for(const auto& p : *packets) {
+ for(const auto& p : packets) {
count++;
Unit u;
- fillMSGHdr(&u.msgh, &u.iov, u.cbuf, 0, (char*)&p[0], p.size(), &dest);
+ fillMSGHdr(&u.msgh, &u.iov, u.cbuf, 0, (char*)&(*p)[0], p->size(), &dest);
if((ret=sendmsg((*sockets)[count % sockets->size()]->getHandle(),
&u.msgh, 0)))
if(ret < 0)
}
-// calidns queryfile destination qps
+/*
+ New plan. Set cache hit percentage, which we achieve on a per second basis.
+ So we start with 10000 qps for example, and for 90% cache hit ratio means
+ we take 1000 unique queries and each send them 10 times.
+
+ We then move the 1000 unique queries to the 'known' pool.
+
+ For the next second, say 20000 qps, we know we are going to need 2000 new queries,
+ so we take 2000 from the unknown pool. Then we need 18000 cache hits. We can get 1000 from
+ the known pool, leaving us down 17000. Or, we have 3000 in total now and we need 2000. We simply
+ repeat the 3000 mix we have ~7 times. The 2000 can now go to the known pool too.
+
+ For the next second, say 30000 qps, we'll need 3000 cache misses, which we get from
+ the unknown pool. To this we add 3000 queries from the known pool. Next up we repeat this batch 5
+ times.
+
+ In general the algorithm therefore is:
+
+ 1) Calculate number of cache misses required, get them from the unknown pool
+ 2) Move those to the known pool
+ 3) Fill up to amount of queries we need with random picks from the known pool
+
+*/
int main(int argc, char** argv)
try
struct sched_param param;
param.sched_priority=99;
- if(argc != 4) {
- cerr<<"Syntax: calidns file-with-queries destination qps-goal"<<endl;
+ if(argc != 5) {
+ cerr<<"Syntax: calidns file-with-queries destination qps-start hitperc"<<endl;
exit(EXIT_FAILURE);
}
if(sched_setscheduler(0, SCHED_FIFO, ¶m) < 0)
- unixDie("setting scheduler");
+ cerr<<"Unable to set SCHED_FIFO: "<<strerror(errno)<<endl;
+ double hitrate=atof(argv[4])/100.0;
+ int qpsstart=atoi(argv[3]);
ifstream ifs(argv[1]);
string line;
reportAllTypes();
- vector<vector<uint8_t> > packets;
+ vector<std::shared_ptr<vector<uint8_t> > > unknown, known;
while(getline(ifs, line)) {
vector<uint8_t> packet;
boost::trim(line);
DNSPacketWriter pw(packet, DNSName(p.first), DNSRecordContent::TypeToNumber(p.second));
pw.getHeader()->rd=1;
pw.getHeader()->id=random();
- packets.push_back(packet);
+ unknown.emplace_back(std::make_shared<vector<uint8_t>>(packet));
}
- cout<<"Generated "<<packets.size()<<" queries"<<endl;
- random_shuffle(packets.begin(), packets.end());
+ random_shuffle(unknown.begin(), unknown.end());
+ cout<<"Generated "<<unknown.size()<<" ready to use queries"<<endl;
vector<Socket*> sockets;
- ComboAddress dest(argv[2]);
+ ComboAddress dest(argv[2], 53);
for(int i=0; i < 24; ++i) {
Socket *sock = new Socket(AF_INET, SOCK_DGRAM);
-
// sock->connect(dest);
setSocketSendBuffer(sock->getHandle(), 2000000);
setSocketReceiveBuffer(sock->getHandle(), 2000000);
sockets.push_back(sock);
-
}
new thread(recvThread, &sockets);
int qps=atoi(argv[3]);
ofstream plot("plot");
- for(qps=10000;;qps+=5000) {
- cout<<"Aiming at "<<qps<< "qps"<<endl;
+ for(qps=qpsstart;;qps *= 1.1) {
+ double seconds=1;
+ cout<<"Aiming at "<<qps<< "qps for "<<seconds<<" seconds at cache hitrate "<<100.0*hitrate<<"%";
+ unsigned int misses=(1-hitrate)*qps*seconds;
+ unsigned int total=qps*seconds;
+ cout<<", need "<<misses<<" misses, "<<total<<" queries, have "<<unknown.size()<<" unknown left!"<<endl;
+
+ vector<vector<uint8_t>*> toSend;
+ unsigned int n;
+ for(n=0; n < misses; ++n) {
+ auto ptr=unknown.back();
+ unknown.pop_back();
+ toSend.push_back(ptr.get());
+ known.push_back(ptr);
+ }
+ for(;n < total; ++n) {
+ toSend.push_back(known[random()%known.size()].get());
+ }
+ random_shuffle(toSend.begin(), toSend.end());
g_recvcounter.store(0);
DTime dt;
dt.set();
- sendThread(&sockets, &packets, qps, dest);
+ sendPackets(&sockets, toSend, qps, dest);
auto udiff = dt.udiff();
- auto realqps=packets.size()/(udiff/1000000.0);
+ auto realqps=toSend.size()/(udiff/1000000.0);
cout<<"Achieved "<<realqps<<"qps"<< " over "<< udiff/1000000.0<<" seconds"<<endl;
usleep(50000);
- double perc=g_recvcounter.load()*100.0/packets.size();
+ double perc=g_recvcounter.load()*100.0/toSend.size();
cout<<"Received "<<g_recvcounter.load()<<" packets ("<<perc<<"%)"<<endl;
plot<<qps<<" "<<realqps<<" "<<perc<<" "<<g_recvcounter.load()/(udiff/1000000.0)<<endl;
}