]> granicus.if.org Git - pdns/commitdiff
Use zeromq library C API, based on work by @wtfuzz and @hexwave. Fixes #1760.
authorAki Tuomi <cmouse@desteem.org>
Tue, 22 Jul 2014 19:59:10 +0000 (22:59 +0300)
committerRuben Kerkhof <ruben@rubenkerkhof.com>
Tue, 20 Jan 2015 20:26:47 +0000 (21:26 +0100)
(cherry picked from commit 49e4360a664e68a62eda2101e9e6af9b18d6005f)

Conflicts:
.travis.yml

.travis.yml
m4/pdns_enable_remotebackend_zeromq.m4
modules/remotebackend/Gemfile.lock
modules/remotebackend/regression-tests/Gemfile
modules/remotebackend/regression-tests/Gemfile.lock
modules/remotebackend/regression-tests/zeromq-backend.rb
modules/remotebackend/remotebackend.hh
modules/remotebackend/test-remotebackend-zeromq.cc
modules/remotebackend/unittest_zeromq.rb
modules/remotebackend/zmqconnector.cc
regression-tests/backends/remote-master

index dfd83156e8559816a8933f9fab6609887f2733ab..b5070eac79b4ad88ecc2684138262cbc4c1bdcaf 100644 (file)
@@ -8,10 +8,10 @@ before_script:
  - sudo /sbin/ip addr add 1.2.3.4/32 dev lo
  - sudo rm /etc/apt/sources.list.d/travis_ci_zeromq3-source.list
  - sudo apt-get update
- - sudo apt-get install --no-install-recommends libboost-all-dev libtolua-dev bc libcdb-dev libnet-dns-perl unbound-host ldnsutils dnsutils bind9utils libtool libcdb-dev xmlto links asciidoc ruby-json ruby-sqlite3 rubygems libcurl4-openssl-dev ruby1.9.1 socat time libzmq1 libzmq-dev pkg-config daemontools authbind liblua5.1-posix1 libopendbx1-dev libopendbx1-sqlite3 python-virtualenv libldap2-dev softhsm libp11-kit-dev p11-kit moreutils libgeoip-dev geoip-database
+ - sudo apt-get install --no-install-recommends libboost-all-dev libtolua-dev bc libcdb-dev libnet-dns-perl unbound-host ldnsutils dnsutils bind9utils libtool libcdb-dev xmlto links asciidoc ruby-json ruby-sqlite3 rubygems libcurl4-openssl-dev ruby1.9.1 socat time pkg-config daemontools authbind liblua5.1-posix1 libopendbx1-dev libopendbx1-sqlite3 python-virtualenv libldap2-dev softhsm libp11-kit-dev p11-kit moreutils libgeoip-dev geoip-database
  - sudo sh -c 'sed s/precise/trusty/g /etc/apt/sources.list > /etc/apt/sources.list.d/trusty.list'
  - sudo apt-get update
- - sudo apt-get install liblmdb0 liblmdb-dev lmdb-utils libyaml-cpp-dev 
+ - sudo apt-get install liblmdb0 liblmdb-dev lmdb-utils libyaml-cpp-dev libzmq3-dev
  - sudo update-alternatives --set ruby /usr/bin/ruby1.9.1
  - sudo touch /etc/authbind/byport/53
  - sudo chmod 755 /etc/authbind/byport/53
index 10f41606a68478ac7e0967f5f22d16fdde989a74..9fba38fdfa35772e21e36e85117b7adecb141f50 100644 (file)
@@ -18,9 +18,21 @@ AC_DEFUN([PDNS_ENABLE_REMOTEBACKEND_ZEROMQ],[
         AC_DEFINE([HAVE_LIBZMQ], [1], [Define to 1 if you have libzmq])
         AC_DEFINE([REMOTEBACKEND_ZEROMQ], [1], [Define to 1 if you have the ZeroMQ connector])
         REMOTEBACKEND_ZEROMQ=yes
+
       ],
       [AC_MSG_ERROR([Could not find libzmq])]
     )]
+
+    old_CXXFLAGS="$CXXFLAGS"
+    old_LDFLAGS="$LDFLAGS"
+    CXXFLAGS="$CFLAGS $LIBZMQ_CFLAGS"
+    LDFLAGS="$LDFLAGS $LIBZMQ_LIBS"
+    AC_CHECK_LIB([zmq], [zmq_msg_send],
+      [
+        AC_DEFINE([HAVE_ZMQ_MSG_SEND], [1], [Define to 1 if the ZeroMQ 3.x or greater API is available])
+      ])
+    CXXFLAGS="$old_CXXFLAGS"
+    LDFLAGS="$old_LDFLAGS"
   )
 ])
 
index 6750b1184fdcd91050351b48c1bb3933cfe81fe7..276a96c9c08288d69b63add53b3cb89d00c637a3 100644 (file)
@@ -1,14 +1,16 @@
 GEM
   remote: https://rubygems.org/
   specs:
-    ffi (1.9.3)
-    ffi-rzmq (1.0.3)
-      ffi
+    ffi (1.9.6)
+    ffi-rzmq (2.0.1)
+      ffi-rzmq-core (>= 1.0.1)
+    ffi-rzmq-core (1.0.3)
+      ffi (~> 1.9)
     json (1.8.1)
-    sqlite3 (1.3.8)
+    sqlite3 (1.3.9)
     webrick (1.3.1)
-    zeromqrb (0.1.1)
-      ffi-rzmq (~> 1.0)
+    zeromqrb (0.1.3)
+      ffi-rzmq
 
 PLATFORMS
   ruby
index 019fc2a224c7d4da9dd714293021e186cf02fd35..2762b5430878ef78bbc9ef74096dbf4931518875 100644 (file)
@@ -1,5 +1,6 @@
-source 'https://rubygems.org'
+source "https://rubygems.org"
 
-gem 'webrick'
-gem 'sqlite3'
-gem 'zeromqrb'
+gem "json"
+gem "webrick"
+gem "zeromqrb"
+gem "sqlite3"
index 44b2bee99727c78a837fc049175c0f949410e016..276a96c9c08288d69b63add53b3cb89d00c637a3 100644 (file)
@@ -1,18 +1,22 @@
 GEM
   remote: https://rubygems.org/
   specs:
-    ffi (1.9.3)
-    ffi-rzmq (1.0.3)
-      ffi
-    sqlite3 (1.3.8)
+    ffi (1.9.6)
+    ffi-rzmq (2.0.1)
+      ffi-rzmq-core (>= 1.0.1)
+    ffi-rzmq-core (1.0.3)
+      ffi (~> 1.9)
+    json (1.8.1)
+    sqlite3 (1.3.9)
     webrick (1.3.1)
-    zeromqrb (0.1.1)
-      ffi-rzmq (~> 1.0)
+    zeromqrb (0.1.3)
+      ffi-rzmq
 
 PLATFORMS
   ruby
 
 DEPENDENCIES
+  json
   sqlite3
   webrick
   zeromqrb
index c8f72582163ba69badfe0acc447d2dc5b90eaafc..2af3bd9f9d2d09b4b7fcd641844a2c09722b4a5d 100755 (executable)
@@ -14,7 +14,7 @@ f.sync = true
 begin
   context = ZeroMQ::Context.new
   socket = context.socket ZMQ::REP
-  socket.bind("ipc:///tmp/pdns.0")
+  socket.bind("ipc:///tmp/pdns.0") or raise "Cannot bind to IPC socket"
 
   while(true) do
     line = ""
index 38118efbcf763d2990ec6f814377f503df576ed1..b730c25647a10f90827b0c994a78e6fe92769a4a 100644 (file)
 #include "sstuff.hh"
 
 #ifdef REMOTEBACKEND_ZEROMQ
-#include <zmq.hpp>
+#include <zmq.h>
+
+// If the available ZeroMQ library version is < 2.x, create macros for the zmq_msg_send/recv functions
+#ifndef HAVE_ZMQ_MSG_SEND
+#define zmq_msg_send(msg, socket, flags) zmq_send(socket, msg, flags)
+#define zmq_msg_recv(msg, socket, flags) zmq_recv(socket, msg, flags)
+#endif
 #endif
 #define JSON_GET(obj,val,def) (obj.HasMember(val)?obj["" val ""]:def)
 #define JSON_ADD_MEMBER(obj, name, val, alloc) { rapidjson::Value __xval; __xval = val; obj.AddMember(name, __xval, alloc); }
@@ -90,8 +96,8 @@ class ZeroMQConnector: public Connector {
     int d_timeout;
     int d_timespent;
     std::map<std::string,std::string> d_options;
-    zmq::context_t d_ctx;
-    zmq::socket_t d_sock;
+    void *d_ctx;
+    void *d_sock; 
 };
 #endif
 
index 266ea19160f3a3c7cc508b0d2fa3d75ab1fac715..b096f8d8533b7e02ccfa89220f69fec2f561c647 100644 (file)
@@ -49,7 +49,7 @@ struct RemotebackendSetup {
                 new RemoteLoader();
                BackendMakers().launch("remote");
                 // then get us a instance of it 
-                ::arg().set("remote-connection-string")="zeromq:endpoint=tcp://127.0.0.1:43622";
+                ::arg().set("remote-connection-string")="zeromq:endpoint=ipc:///tmp/remotebackend.0";
                 ::arg().set("remote-dnssec")="yes";
                 be = BackendMakers().all()[0];
                // load few record types to help out
index a61a979421a9200bd8f256349b3811e9e5b0fa00..7f1b82af7bee0cef29430d70d2ab5d5eb07e9e2c 100755 (executable)
@@ -18,7 +18,7 @@ trap('TERM') { runcond = false }
 begin
   context = ZeroMQ::Context.new
   socket = context.socket ZMQ::REP
-  socket.bind("tcp://127.0.0.1:43622")
+  socket.bind("ipc:///tmp/remotebackend.0")
  
   print "[#{Time.now.to_s}] ZeroMQ unit test responder running\n"
 
@@ -43,10 +43,10 @@ begin
       else
          res, log = h.send(method)
       end
-      socket.send_string ({:result => res, :log => log}).to_json, 0
+      socket.send_string ({:result => res, :log => log}).to_json + "\n" , 0
       f.puts "#{Time.now.to_f} [zmq]: #{({:result => res, :log => log}).to_json}"
     rescue JSON::ParserError
-      socket.send_string ({:result => false, :log => "Cannot parse input #{line}"}).to_json
+      socket.send_string ({:result => false, :log => "Cannot parse input #{line}"}).to_json + "\n";
       f.puts "#{Time.now.to_f} [zmq]: #{({:result => false, :log => "Cannot parse input #{line}"}).to_json}"
       next
     end
index 5f863d2e8c7194ce95a73726d6f3b00e86bbe518..8e427c49f3a88587a795ebac15fd544edb2472d4 100644 (file)
@@ -9,9 +9,10 @@
 #include "rapidjson/stringbuffer.h"
 #include "rapidjson/writer.h"
 
-ZeroMQConnector::ZeroMQConnector(std::map<std::string,std::string> options) : d_ctx(1), d_sock(d_ctx, ZMQ_REQ)  {
+ZeroMQConnector::ZeroMQConnector(std::map<std::string,std::string> options) {
   rapidjson::Value val;
   rapidjson::Document init,res;
+  int opt=0;
 
   // lookup timeout, target and stuff
   if (options.count("endpoint") == 0) {
@@ -26,7 +27,15 @@ ZeroMQConnector::ZeroMQConnector(std::map<std::string,std::string> options) : d_
      this->d_timeout = boost::lexical_cast<int>(options.find("timeout")->second);
   }
 
-  d_sock.connect(d_endpoint.c_str());
+  d_ctx = zmq_init(2);
+  d_sock = zmq_socket(this->d_ctx, ZMQ_REQ);
+  zmq_setsockopt(d_sock, ZMQ_LINGER, &opt, sizeof(opt));
+
+  if(zmq_connect(this->d_sock, this->d_endpoint.c_str()) < 0)
+  {
+    L<<Logger::Error<<"zmq_connect() failed"<< zmq_strerror(errno)<<std::endl;;
+    throw PDNSException("Cannot find 'endpoint' option in connection string");
+  }
 
   init.SetObject();
   val = "initialize";
@@ -43,18 +52,23 @@ ZeroMQConnector::ZeroMQConnector(std::map<std::string,std::string> options) : d_
   this->send(init);
   if (this->recv(res)==false) {
     L<<Logger::Error<<"Failed to initialize zeromq"<<std::endl;
+    throw PDNSException("Failed to initialize zeromq");
   } 
 };
 
 ZeroMQConnector::~ZeroMQConnector() {
+  zmq_close(this->d_sock);
+  zmq_term(this->d_ctx);
 };
 
 int ZeroMQConnector::send_message(const rapidjson::Document &input) {
    std::string line;
    line = makeStringFromDocument(input);
-   zmq::message_t message(line.size()+1);   
-   line.copy(reinterpret_cast<char*>(message.data()), line.size());
-   reinterpret_cast<char*>(message.data())[line.size()]=0;
+   zmq_msg_t message;
+
+   zmq_msg_init_size(&message, line.size()+1);
+   line.copy(reinterpret_cast<char*>(zmq_msg_data(&message)), line.size());
+   ((char *)zmq_msg_data(&message))[line.size()] = '\0';
 
    try {
      zmq_pollitem_t item;
@@ -63,17 +77,16 @@ int ZeroMQConnector::send_message(const rapidjson::Document &input) {
      // poll until it's sent or timeout is spent. try to leave 
      // leave few cycles for read. just in case. 
      for(d_timespent = 0; d_timespent < d_timeout-5; d_timespent++) {
-       if (zmq::poll(&item, 1, 1000)>0) {
-         if (d_sock.send(message, 0) == false) {
+       if (zmq_poll(&item, 1, 1)>0) {
+         if(zmq_msg_send(&message, this->d_sock, 0) == -1) {
            // message was not sent
-           L<<Logger::Error<<"Cannot send to " << this->d_endpoint << ": " << errno;
-           return 0;
-         }
-         return line.size();
+           L<<Logger::Error<<"Cannot send to " << this->d_endpoint << ": " << zmq_strerror(errno)<<std::endl;
+         } else
+           return line.size();
        }
      }
    } catch (std::exception &ex) {
-     L<<Logger::Error<<"Cannot send to " << this->d_endpoint << ": " << ex.what();
+     L<<Logger::Error<<"Cannot send to " << this->d_endpoint << ": " << ex.what()<<std::endl;
      throw PDNSException(ex.what());
    }
 
@@ -85,7 +98,7 @@ int ZeroMQConnector::recv_message(rapidjson::Document &output) {
    // try to receive message
    zmq_pollitem_t item;
    rapidjson::GenericReader<rapidjson::UTF8<> , rapidjson::MemoryPoolAllocator<> > r;
-   zmq::message_t message;
+   zmq_msg_t message;
 
    item.socket = d_sock;
    item.events = ZMQ_POLLIN;
@@ -95,23 +108,28 @@ int ZeroMQConnector::recv_message(rapidjson::Document &output) {
      // d_timespent should always be initialized by send_message, recv should never
      // be called without send first.
      for(; d_timespent < d_timeout; d_timespent++) {
-       if (zmq::poll(&item, 1, 1000)>0) {
+       if (zmq_poll(&item, 1, 1)>0) {
          // we have an event
          if ((item.revents & ZMQ_POLLIN) == ZMQ_POLLIN) {
            char *data;
+           size_t msg_size;
+           zmq_msg_init(&message);
            // read something
-             if (d_sock.recv(&message, 0) && message.size() > 0) {
-               data = new char[message.size()+1];
-               // convert it into json
-               memcpy(data, message.data(), message.size());
-               data[message.size()]=0;
+           if(zmq_msg_recv(&message, this->d_sock, ZMQ_NOBLOCK)>0) {
+               msg_size = zmq_msg_size(&message);
+               data = new char[msg_size+1];
+               memcpy(data, zmq_msg_data(&message), msg_size);
+               data[msg_size] = '\0';
+               zmq_msg_close(&message);
+
                rapidjson::StringStream ss(data);
                output.ParseStream<0>(ss);
-               delete [] data;
+               delete[] data;
+
                if (output.HasParseError() == false)
-                 rv = message.size();
+                 rv = msg_size;
                else 
-                 L<<Logger::Error<<"Cannot parse JSON reply from " << this->d_endpoint;
+                 L<<Logger::Error<<"Cannot parse JSON reply from " << this->d_endpoint<<std::endl;
                break;
              } else if (errno == EAGAIN) { continue; // try again }
              } else {
@@ -121,7 +139,7 @@ int ZeroMQConnector::recv_message(rapidjson::Document &output) {
         }
      }
    } catch (std::exception &ex) {
-     L<<Logger::Error<<"Cannot receive from " << this->d_endpoint << ": " << ex.what();
+     L<<Logger::Error<<"Cannot receive from " << this->d_endpoint << ": " << ex.what()<<std::endl;
      throw PDNSException(ex.what());
    }
 
index 0dd7360a0eccdac69df4f33cd981404e6d69fff3..563ce32e34553f5f143d122068f985a4d73cd27c 100644 (file)
@@ -39,6 +39,7 @@ case $context in
                        connstr="zeromq:endpoint=ipc:///tmp/pdns.0"
                        $testsdir/zeromq-backend.rb &
                        echo $! > pdns-remotebackend.pid
+                       sleep 20 # just a test
                        ;;
                unix)
                        connstr="unix:path=/tmp/remote.socket"