From 74d13635b99021efbccf8056ff1e56a9f03b7fe1 Mon Sep 17 00:00:00 2001 From: Aki Tuomi Date: Sat, 1 Nov 2014 10:56:34 +0200 Subject: [PATCH] Reconnect on read error, also fix whitespace --- modules/remotebackend/unixconnector.cc | 293 +++++++++++++------------ 1 file changed, 147 insertions(+), 146 deletions(-) diff --git a/modules/remotebackend/unixconnector.cc b/modules/remotebackend/unixconnector.cc index 1b9e05c43..516e3b52d 100644 --- a/modules/remotebackend/unixconnector.cc +++ b/modules/remotebackend/unixconnector.cc @@ -8,173 +8,174 @@ #endif UnixsocketConnector::UnixsocketConnector(std::map options) { - if (options.count("path") == 0) { - L<timeout = 2000; - if (options.find("timeout") != options.end()) { - this->timeout = boost::lexical_cast(options.find("timeout")->second); - } - this->path = options.find("path")->second; - this->options = options; - this->connected = false; - this->fd = -1; + if (options.count("path") == 0) { + L<timeout = 2000; + if (options.find("timeout") != options.end()) { + this->timeout = boost::lexical_cast(options.find("timeout")->second); + } + this->path = options.find("path")->second; + this->options = options; + this->connected = false; + this->fd = -1; } UnixsocketConnector::~UnixsocketConnector() { - if (this->connected) { - L<connected) { + L<write(data); - if (rv == -1) - return -1; - return rv; + std::string data; + int rv; + data = makeStringFromDocument(input); + data = data + "\n"; + rv = this->write(data); + if (rv == -1) + return -1; + return rv; } int UnixsocketConnector::recv_message(rapidjson::Document &output) { - int rv,nread; - std::string s_output; - rapidjson::GenericReader , rapidjson::MemoryPoolAllocator<> > r; - - struct timeval t0,t; - - nread = 0; - gettimeofday(&t0, NULL); - memcpy(&t,&t0,sizeof(t0)); - s_output = ""; - - while((t.tv_sec - t0.tv_sec)*1000 + (t.tv_usec - t0.tv_usec)/1000 < this->timeout) { - std::string temp; - temp.clear(); - - rv = this->read(temp); - if (rv == -1) - return -1; - - if (rv>0) { - nread += rv; - s_output.append(temp); - rapidjson::StringStream ss(s_output.c_str()); - output.ParseStream<0>(ss); - if (output.HasParseError() == false) - return s_output.size(); - } - gettimeofday(&t, NULL); - } - - return -1; + int rv,nread; + std::string s_output; + rapidjson::GenericReader , rapidjson::MemoryPoolAllocator<> > r; + + struct timeval t0,t; + + nread = 0; + gettimeofday(&t0, NULL); + memcpy(&t,&t0,sizeof(t0)); + s_output = ""; + + while((t.tv_sec - t0.tv_sec)*1000 + (t.tv_usec - t0.tv_usec)/1000 < this->timeout) { + std::string temp; + temp.clear(); + + rv = this->read(temp); + if (rv == -1) + return -1; + + if (rv>0) { + nread += rv; + s_output.append(temp); + rapidjson::StringStream ss(s_output.c_str()); + output.ParseStream<0>(ss); + if (output.HasParseError() == false) + return s_output.size(); + } + gettimeofday(&t, NULL); + } + + close(fd); + connected = false; // we need to reconnect + return -1; } ssize_t UnixsocketConnector::read(std::string &data) { - ssize_t nread; - char buf[1500] = {0}; + ssize_t nread; + char buf[1500] = {0}; - reconnect(); - if (!connected) return -1; - nread = ::read(this->fd, buf, sizeof buf); + reconnect(); + if (!connected) return -1; + nread = ::read(this->fd, buf, sizeof buf); - // just try again later... - if (nread==-1 && errno == EAGAIN) return 0; + // just try again later... + if (nread==-1 && errno == EAGAIN) return 0; - if (nread==-1) { - connected = false; - close(fd); - return -1; - } + if (nread==-1) { + connected = false; + close(fd); + return -1; + } - data.append(buf, nread); - return nread; + data.append(buf, nread); + return nread; } ssize_t UnixsocketConnector::write(const std::string &data) { - ssize_t nwrite, nbuf; - size_t pos; - char buf[1500]; - - reconnect(); - if (!connected) return -1; - pos = 0; - nwrite = 0; - while(pos < data.size()) { - nbuf = data.copy(buf, sizeof buf, pos); // copy data and write - nwrite = ::write(fd, buf, nbuf); - pos = pos + sizeof(buf); - if (nwrite == -1) { - connected = false; - close(fd); - return -1; - } + ssize_t nwrite, nbuf; + size_t pos; + char buf[1500]; + + reconnect(); + if (!connected) return -1; + pos = 0; + nwrite = 0; + while(pos < data.size()) { + nbuf = data.copy(buf, sizeof buf, pos); // copy data and write + nwrite = ::write(fd, buf, nbuf); + pos = pos + sizeof(buf); + if (nwrite == -1) { + connected = false; + close(fd); + return -1; } - return nwrite; + } + return nwrite; } void UnixsocketConnector::reconnect() { - struct sockaddr_un sock; - rapidjson::Document init,res; - rapidjson::Value val; - int rv; - - if (connected) return; // no point reconnecting if connected... - connected = true; - - L<(&sock), sizeof sock))==-1 && (errno == EINPROGRESS)) { - waitForData(fd, 0, -1); - rv = connect(fd, reinterpret_cast(&sock), sizeof sock); - } - - if (rv != 0 && errno != EISCONN && errno != 0) { - L<::iterator i = options.begin(); i != options.end(); i++) { - val = i->second.c_str(); - init["parameters"].AddMember(i->first.c_str(), val, init.GetAllocator()); - } - - this->send_message(init); - if (this->recv_message(res) == false) { - L<connected = false; - } + struct sockaddr_un sock; + rapidjson::Document init,res; + rapidjson::Value val; + int rv; + + if (connected) return; // no point reconnecting if connected... + connected = true; + + L<(&sock), sizeof sock))==-1 && (errno == EINPROGRESS)) { + waitForData(fd, 0, -1); + rv = connect(fd, reinterpret_cast(&sock), sizeof sock); + } + + if (rv != 0 && errno != EISCONN && errno != 0) { + L<::iterator i = options.begin(); i != options.end(); i++) { + val = i->second.c_str(); + init["parameters"].AddMember(i->first.c_str(), val, init.GetAllocator()); + } + + this->send_message(init); + if (this->recv_message(res) == false) { + L<connected = false; + } } - -- 2.40.0