#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
-ZeroMQConnector::ZeroMQConnector(std::map<std::string,std::string> options) : d_ctx(1), d_sock(d_ctx, ZMQ_REQ) {
+ZeroMQConnector::ZeroMQConnector(std::map<std::string,std::string> options) {
rapidjson::Value val;
rapidjson::Document init,res;
+ int opt=0;
// lookup timeout, target and stuff
if (options.count("endpoint") == 0) {
this->d_timeout = boost::lexical_cast<int>(options.find("timeout")->second);
}
- d_sock.connect(d_endpoint.c_str());
+ d_ctx = zmq_init(2);
+ d_sock = zmq_socket(this->d_ctx, ZMQ_REQ);
+ zmq_setsockopt(d_sock, ZMQ_LINGER, &opt, sizeof(opt));
+
+ if(zmq_connect(this->d_sock, this->d_endpoint.c_str()) < 0)
+ {
+ L<<Logger::Error<<"zmq_connect() failed"<< zmq_strerror(errno)<<std::endl;;
+ throw PDNSException("Cannot find 'endpoint' option in connection string");
+ }
init.SetObject();
val = "initialize";
this->send(init);
if (this->recv(res)==false) {
L<<Logger::Error<<"Failed to initialize zeromq"<<std::endl;
+ throw PDNSException("Failed to initialize zeromq");
}
};
ZeroMQConnector::~ZeroMQConnector() {
+ zmq_close(this->d_sock);
+ zmq_term(this->d_ctx);
};
int ZeroMQConnector::send_message(const rapidjson::Document &input) {
std::string line;
line = makeStringFromDocument(input);
- zmq::message_t message(line.size()+1);
- line.copy(reinterpret_cast<char*>(message.data()), line.size());
- reinterpret_cast<char*>(message.data())[line.size()]=0;
+ zmq_msg_t message;
+
+ zmq_msg_init_size(&message, line.size()+1);
+ line.copy(reinterpret_cast<char*>(zmq_msg_data(&message)), line.size());
+ ((char *)zmq_msg_data(&message))[line.size()] = '\0';
try {
zmq_pollitem_t item;
// poll until it's sent or timeout is spent. try to leave
// leave few cycles for read. just in case.
for(d_timespent = 0; d_timespent < d_timeout-5; d_timespent++) {
- if (zmq::poll(&item, 1, 1000)>0) {
- if (d_sock.send(message, 0) == false) {
+ if (zmq_poll(&item, 1, 1)>0) {
+ if(zmq_msg_send(&message, this->d_sock, 0) == -1) {
// message was not sent
- L<<Logger::Error<<"Cannot send to " << this->d_endpoint << ": " << errno;
- return 0;
- }
- return line.size();
+ L<<Logger::Error<<"Cannot send to " << this->d_endpoint << ": " << zmq_strerror(errno)<<std::endl;
+ } else
+ return line.size();
}
}
} catch (std::exception &ex) {
- L<<Logger::Error<<"Cannot send to " << this->d_endpoint << ": " << ex.what();
+ L<<Logger::Error<<"Cannot send to " << this->d_endpoint << ": " << ex.what()<<std::endl;
throw PDNSException(ex.what());
}
// try to receive message
zmq_pollitem_t item;
rapidjson::GenericReader<rapidjson::UTF8<> , rapidjson::MemoryPoolAllocator<> > r;
- zmq::message_t message;
+ zmq_msg_t message;
item.socket = d_sock;
item.events = ZMQ_POLLIN;
// d_timespent should always be initialized by send_message, recv should never
// be called without send first.
for(; d_timespent < d_timeout; d_timespent++) {
- if (zmq::poll(&item, 1, 1000)>0) {
+ if (zmq_poll(&item, 1, 1)>0) {
// we have an event
if ((item.revents & ZMQ_POLLIN) == ZMQ_POLLIN) {
char *data;
+ size_t msg_size;
+ zmq_msg_init(&message);
// read something
- if (d_sock.recv(&message, 0) && message.size() > 0) {
- data = new char[message.size()+1];
- // convert it into json
- memcpy(data, message.data(), message.size());
- data[message.size()]=0;
+ if(zmq_msg_recv(&message, this->d_sock, ZMQ_NOBLOCK)>0) {
+ msg_size = zmq_msg_size(&message);
+ data = new char[msg_size+1];
+ memcpy(data, zmq_msg_data(&message), msg_size);
+ data[msg_size] = '\0';
+ zmq_msg_close(&message);
+
rapidjson::StringStream ss(data);
output.ParseStream<0>(ss);
- delete [] data;
+ delete[] data;
+
if (output.HasParseError() == false)
- rv = message.size();
+ rv = msg_size;
else
- L<<Logger::Error<<"Cannot parse JSON reply from " << this->d_endpoint;
+ L<<Logger::Error<<"Cannot parse JSON reply from " << this->d_endpoint<<std::endl;
break;
} else if (errno == EAGAIN) { continue; // try again }
} else {
}
}
} catch (std::exception &ex) {
- L<<Logger::Error<<"Cannot receive from " << this->d_endpoint << ": " << ex.what();
+ L<<Logger::Error<<"Cannot receive from " << this->d_endpoint << ": " << ex.what()<<std::endl;
throw PDNSException(ex.what());
}