]> granicus.if.org Git - pdns/commitdiff
dnsdist: Use a separate lock for accessing the pool's servers
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 23 Mar 2018 22:13:52 +0000 (23:13 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 23 Mar 2018 22:13:52 +0000 (23:13 +0100)
We used to hold the Lua lock while applying the load-balancing policy
to select a backend, which is only needed by Lua policies, not core
ones. We do need a lock to make sure that the vector of servers is
not altered under our feet, but a per-pool read-write lock is enough
and reduces contention a lot, especially when the maintenance thread
is doing some heavy-lifting.

pdns/dnsdist-carbon.cc
pdns/dnsdist-lua-bindings.cc
pdns/dnsdist-lua.cc
pdns/dnsdist-tcp.cc
pdns/dnsdist-web.cc
pdns/dnsdist.cc
pdns/dnsdist.hh

index 6ef88767f041a083603ce516e29504984a04b895..32cab5d9a69b46523d612a19d51bd8d2db03dbab 100644 (file)
@@ -111,8 +111,8 @@ try
           }
           const string base = "dnsdist." + hostname + ".main.pools." + poolName + ".";
           const std::shared_ptr<ServerPool> pool = entry.second;
-          str<<base<<"servers" << " " << pool->servers.size() << " " << now << "\r\n";
-          str<<base<<"servers-up" << " " << pool->countServersUp() << " " << now << "\r\n";
+          str<<base<<"servers" << " " << pool->countServers(false) << " " << now << "\r\n";
+          str<<base<<"servers-up" << " " << pool->countServers(true) << " " << now << "\r\n";
           if (pool->packetCache != nullptr) {
             const auto& cache = pool->packetCache;
             str<<base<<"cache-size" << " " << cache->getMaxEntries() << " " << now << "\r\n";
index a317e79d162ac1a490670a0039eb9407b76c5be9..24e31590af56d9d81020df624d33435fd4ce13ad 100644 (file)
@@ -45,15 +45,16 @@ void setupLuaBindings(bool client)
     });
 
   /* ServerPolicy */
-  g_lua.writeFunction("newServerPolicy", [](string name, policyfunc_t policy) { return ServerPolicy{name, policy};});
+  g_lua.writeFunction("newServerPolicy", [](string name, policyfunc_t policy) { return ServerPolicy{name, policy, true};});
   g_lua.registerMember("name", &ServerPolicy::name);
   g_lua.registerMember("policy", &ServerPolicy::policy);
+  g_lua.registerMember("isLua", &ServerPolicy::isLua);
 
-  g_lua.writeVariable("firstAvailable", ServerPolicy{"firstAvailable", firstAvailable});
-  g_lua.writeVariable("roundrobin", ServerPolicy{"roundrobin", roundrobin});
-  g_lua.writeVariable("wrandom", ServerPolicy{"wrandom", wrandom});
-  g_lua.writeVariable("whashed", ServerPolicy{"whashed", whashed});
-  g_lua.writeVariable("leastOutstanding", ServerPolicy{"leastOutstanding", leastOutstanding});
+  g_lua.writeVariable("firstAvailable", ServerPolicy{"firstAvailable", firstAvailable, false});
+  g_lua.writeVariable("roundrobin", ServerPolicy{"roundrobin", roundrobin, false});
+  g_lua.writeVariable("wrandom", ServerPolicy{"wrandom", wrandom, false});
+  g_lua.writeVariable("whashed", ServerPolicy{"whashed", whashed, false});
+  g_lua.writeVariable("leastOutstanding", ServerPolicy{"leastOutstanding", leastOutstanding, false});
 
   /* ServerPool */
   g_lua.registerFunction<void(std::shared_ptr<ServerPool>::*)(std::shared_ptr<DNSDistPacketCache>)>("setCache", [](std::shared_ptr<ServerPool> pool, std::shared_ptr<DNSDistPacketCache> cache) {
index 0e573f16d172cdf79aa0d720c0be4df31c8a6b49..1701af44e76920e680fba5d33ac876a09cb423be 100644 (file)
@@ -411,7 +411,7 @@ void setupLuaConfig(bool client)
     });
   g_lua.writeFunction("setServerPolicyLua", [](string name, policyfunc_t policy)  {
       setLuaSideEffect();
-      g_policy.setState(ServerPolicy{name, policy});
+      g_policy.setState(ServerPolicy{name, policy, true});
     });
 
   g_lua.writeFunction("showServerPolicy", []() {
@@ -1069,7 +1069,7 @@ void setupLuaConfig(bool client)
           }
           string servers;
 
-          for (const auto& server: pool->servers) {
+          for (const auto& server: pool->getServers()) {
             if (!servers.empty()) {
               servers += ", ";
             }
@@ -1353,7 +1353,7 @@ void setupLuaConfig(bool client)
   g_lua.writeFunction("setPoolServerPolicyLua", [](string name, policyfunc_t policy, string pool) {
       setLuaSideEffect();
       auto localPools = g_pools.getCopy();
-      setPoolPolicy(localPools, pool, std::make_shared<ServerPolicy>(ServerPolicy{name, policy}));
+      setPoolPolicy(localPools, pool, std::make_shared<ServerPolicy>(ServerPolicy{name, policy, true}));
       g_pools.setState(localPools);
     });
 
index b3b74864ebf338d83d14568213efd9b848705b63..74613c32edd54b8f6ade75a5efc7259940b65fc4 100644 (file)
@@ -385,15 +385,19 @@ void* tcpClientThread(int pipefd)
         }
 
         std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
-        std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
-        auto policy = holders.policy->policy;
+        std::shared_ptr<DNSDistPacketCache> packetCache = serverPool->packetCache;
+
+        auto policy = *(holders.policy);
         if (serverPool->policy != nullptr) {
-          policy = serverPool->policy->policy;
+          policy = *(serverPool->policy);
         }
-        {
+        auto servers = serverPool->getServers();
+        if (policy.isLua) {
           std::lock_guard<std::mutex> lock(g_luamutex);
-          ds = policy(serverPool->servers, &dq);
-          packetCache = serverPool->packetCache;
+          ds = policy.policy(servers, &dq);
+        }
+        else {
+          ds = policy.policy(servers, &dq);
         }
 
         if (dq.useECS && ds && ds->useECS) {
index b2bca433514cbd52292c94488c1b5bc8d61efa8b..8ebb4818eda911ada9a3b46daf96fae25f663f96 100644 (file)
@@ -448,7 +448,7 @@ static void connectionThread(int sock, ComboAddress remote, string password, str
         Json::object entry {
           { "id", num++ },
           { "name", pool.first },
-          { "serversCount", (int) pool.second->servers.size() },
+          { "serversCount", (int) pool.second->countServers(false) },
           { "cacheSize", (double) (cache ? cache->getMaxEntries() : 0) },
           { "cacheEntries", (double) (cache ? cache->getEntriesCount() : 0) },
           { "cacheHits", (double) (cache ? cache->getHits() : 0) },
index 958701c06efaeb537c6813bb23962c4fc8360492..c928994a17bb2a4e887aa8d47ab535f8b5667878 100644 (file)
@@ -790,22 +790,12 @@ void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<Serve
 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
 {
   std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
-  unsigned int count = (unsigned int) pool->servers.size();
   if (!poolName.empty()) {
     vinfolog("Adding server to pool %s", poolName);
   } else {
     vinfolog("Adding server to default pool");
   }
-  pool->servers.push_back(make_pair(++count, server));
-  /* we need to reorder based on the server 'order' */
-  std::stable_sort(pool->servers.begin(), pool->servers.end(), [](const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& a, const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& b) {
-      return a.second->order < b.second->order;
-    });
-  /* and now we need to renumber for Lua (custom policies) */
-  size_t idx = 1;
-  for (auto& serv : pool->servers) {
-    serv.first = idx++;
-  }
+  pool->addServer(server);
 }
 
 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
@@ -819,23 +809,7 @@ void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_pt
     vinfolog("Removing server from default pool");
   }
 
-  size_t idx = 1;
-  bool found = false;
-  for (NumberedVector<shared_ptr<DownstreamState> >::iterator it = pool->servers.begin(); it != pool->servers.end();) {
-    if (found) {
-      /* we need to renumber the servers placed
-         after the removed one, for Lua (custom policies) */
-      it->first = idx++;
-      it++;
-    }
-    else if (it->second == server) {
-      it = pool->servers.erase(it);
-      found = true;
-    } else {
-      idx++;
-      it++;
-    }
-  }
+  pool->removeServer(server);
 }
 
 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
@@ -849,10 +823,10 @@ std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poo
   return it->second;
 }
 
-const NumberedServerVector& getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
+NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
 {
   std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
-  return pool->servers;
+  return pool->getServers();
 }
 
 // goal in life - if you send us a reasonably normal packet, we'll get Z for you, otherwise 0
@@ -1358,15 +1332,18 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct
 
     DownstreamState* ss = nullptr;
     std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
-    std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
-    auto policy = holders.policy->policy;
+    std::shared_ptr<DNSDistPacketCache> packetCache = serverPool->packetCache;
+    auto policy = *(holders.policy);
     if (serverPool->policy != nullptr) {
-      policy = serverPool->policy->policy;
+      policy = *(serverPool->policy);
     }
-    {
+    auto servers = serverPool->getServers();
+    if (policy.isLua) {
       std::lock_guard<std::mutex> lock(g_luamutex);
-      ss = policy(serverPool->servers, &dq).get();
-      packetCache = serverPool->packetCache;
+      ss = policy.policy(servers, &dq).get();
+    }
+    else {
+      ss = policy.policy(servers, &dq).get();
     }
 
     bool ednsAdded = false;
@@ -1804,10 +1781,7 @@ void* maintThread()
       const auto localPools = g_pools.getCopy();
       std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
       for (const auto& entry : localPools) {
-        {
-          std::lock_guard<std::mutex> lock(g_luamutex);
-          packetCache = entry.second->packetCache;
-        }
+        packetCache = entry.second->packetCache;
         if (packetCache) {
           size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
           packetCache->purgeExpired(upTo);
@@ -2272,7 +2246,7 @@ try
     }
   }
 
-  ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding};
+  ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
 
   g_policy.setState(leastOutstandingPol);
   if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
index 7eca6bd72bd6edb8ed0185c56a42ef44771880c2..862efa60b5ac51a3890bbd5f7f1f16c3bb87d8bf 100644 (file)
@@ -645,25 +645,84 @@ struct ServerPolicy
 {
   string name;
   policyfunc_t policy;
+  bool isLua;
 };
 
 struct ServerPool
 {
+  ServerPool()
+  {
+    pthread_rwlock_init(&d_lock, nullptr);
+  }
+
   const std::shared_ptr<DNSDistPacketCache> getCache() const { return packetCache; };
 
-  NumberedVector<shared_ptr<DownstreamState>> servers;
   std::shared_ptr<DNSDistPacketCache> packetCache{nullptr};
   std::shared_ptr<ServerPolicy> policy{nullptr};
 
-  size_t countServersUp() const {
-    size_t upFound = 0;
-    for (const auto& server : servers) {
-      if (std::get<1>(server)->isUp() ) {
-        upFound++;
+  size_t countServers(bool upOnly)
+  {
+    size_t count = 0;
+    ReadLock rl(&d_lock);
+    for (const auto& server : d_servers) {
+      if (!upOnly || std::get<1>(server)->isUp() ) {
+        count++;
       };
     };
-    return upFound;
-  };
+    return count;
+  }
+
+  NumberedVector<shared_ptr<DownstreamState>> getServers()
+  {
+    NumberedVector<shared_ptr<DownstreamState>> result;
+    {
+      ReadLock rl(&d_lock);
+      result = d_servers;
+    }
+    return result;
+  }
+
+  void addServer(shared_ptr<DownstreamState>& server)
+  {
+    WriteLock wl(&d_lock);
+    unsigned int count = (unsigned int) d_servers.size();
+    d_servers.push_back(make_pair(++count, server));
+    /* we need to reorder based on the server 'order' */
+    std::stable_sort(d_servers.begin(), d_servers.end(), [](const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& a, const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& b) {
+      return a.second->order < b.second->order;
+    });
+    /* and now we need to renumber for Lua (custom policies) */
+    size_t idx = 1;
+    for (auto& serv : d_servers) {
+      serv.first = idx++;
+    }
+  }
+
+  void removeServer(shared_ptr<DownstreamState>& server)
+  {
+    WriteLock wl(&d_lock);
+    size_t idx = 1;
+    bool found = false;
+    for (auto it = d_servers.begin(); it != d_servers.end();) {
+      if (found) {
+        /* we need to renumber the servers placed
+           after the removed one, for Lua (custom policies) */
+        it->first = idx++;
+        it++;
+      }
+      else if (it->second == server) {
+        it = d_servers.erase(it);
+        found = true;
+      } else {
+        idx++;
+        it++;
+      }
+    }
+  }
+
+private:
+  NumberedVector<shared_ptr<DownstreamState>> d_servers;
+  pthread_rwlock_t d_lock;
 };
 using pools_t=map<std::string,std::shared_ptr<ServerPool>>;
 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy);
@@ -786,7 +845,7 @@ void controlThread(int fd, ComboAddress local);
 vector<std::function<void(void)>> setupLua(bool client, const std::string& config);
 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName);
 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName);
-const NumberedServerVector& getDownstreamCandidates(const pools_t& pools, const std::string& poolName);
+NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName);
 
 std::shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq);