msg.SetParams(params);
params.Set("name", object->GetName());
- params.Set("type", object->GetType());
+ params.Set("type", object->GetType()->GetName());
String source = object->GetSource();
std::ostringstream msgbuf;
msgbuf << "Sending " << modifiedObjects.size() << " replication updates.";
- Log(LogDebug, "replication", msgbuf.str());
+ Log(LogInformation, "replication", msgbuf.str());
BOOST_FOREACH(const DynamicObject::WeakPtr& wobject, modifiedObjects) {
DynamicObject::Ptr object = wobject.lock();
using namespace icinga;
BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
- : m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>())
+ : m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
+ m_Exception(), m_Blocking(true)
{
boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this));
readThread.detach();
m_ReadCV.notify_all();
}
} catch (const std::exception& ex) {
- std::ostringstream msgbuf;
- msgbuf << "Error for buffered stream (Read): " << boost::diagnostic_information(ex);
- Log(LogWarning, "base", msgbuf.str());
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ if (!m_Exception)
+ m_Exception = boost::current_exception();
- Close();
+ m_ReadCV.notify_all();
+ }
}
}
m_InnerStream->Write(buffer, rc);
}
} catch (const std::exception& ex) {
- std::ostringstream msgbuf;
- msgbuf << "Error for buffered stream (Write): " << boost::diagnostic_information(ex);
- Log(LogWarning, "base", msgbuf.str());
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ if (!m_Exception)
+ m_Exception = boost::current_exception();
- Close();
+ m_WriteCV.notify_all();
+ }
}
}
size_t BufferedStream::Read(void *buffer, size_t count)
{
boost::mutex::scoped_lock lock(m_Mutex);
+
+ if (m_Blocking)
+ InternalWaitReadable(count, lock);
+
+ if (m_Exception)
+ boost::rethrow_exception(m_Exception);
+
return m_RecvQ->Read(buffer, count);
}
void BufferedStream::Write(const void *buffer, size_t count)
{
boost::mutex::scoped_lock lock(m_Mutex);
+
+ if (m_Exception)
+ boost::rethrow_exception(m_Exception);
+
m_SendQ->Write(buffer, count);
m_WriteCV.notify_all();
}
{
boost::mutex::scoped_lock lock(m_Mutex);
- while (m_RecvQ->GetAvailableBytes() < count)
+ InternalWaitReadable(count, lock);
+}
+
+void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock)
+{
+ while (m_RecvQ->GetAvailableBytes() < count && !m_Exception)
m_ReadCV.wait(lock);
}
void BufferedStream::WaitWritable(size_t count)
{ /* Nothing to do here. */ }
+
+void BufferedStream::MakeNonBlocking(void)
+{
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ m_Blocking = false;
+}
+
void WaitReadable(size_t count);
void WaitWritable(size_t count);
+ void MakeNonBlocking(void);
+
private:
Stream::Ptr m_InnerStream;
FIFO::Ptr m_RecvQ;
FIFO::Ptr m_SendQ;
+
+ bool m_Blocking;
boost::exception_ptr m_Exception;
void ReadThreadProc(void);
void WriteThreadProc(void);
+
+ void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
};
}
if (!m_InnerStream)
m_InnerStream = boost::make_shared<BufferedStream>(innerStream);
+
+ m_InnerStream->MakeNonBlocking();
m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
shared_ptr<SSL_CTX> sslContext = shared_ptr<SSL_CTX>(SSL_CTX_new(TLSv1_method()), SSL_CTX_free);
- SSL_CTX_set_mode(sslContext.get(), SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
+ SSL_CTX_set_mode(sslContext.get(), 0);
if (!SSL_CTX_use_certificate_chain_file(sslContext.get(), pubkey.CStr())) {
BOOST_THROW_EXCEPTION(openssl_error()
REGISTER_TYPE(Endpoint);
boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
-boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
/**
* Constructor for the Endpoint class.
RegisterSubscription(topic);
}
-void Endpoint::UnregisterTopicHandler(const String&, const boost::function<Endpoint::Callback>&)
-{
- // TODO: implement
- //m_TopicHandlers[method] -= callback;
- //UnregisterSubscription(method);
-
- BOOST_THROW_EXCEPTION(std::runtime_error("Not implemented."));
-}
-
void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request)
{
if (!IsConnected()) {
Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
} else {
- JsonRpc::SendMessage(GetClient(), request);
+ try {
+ JsonRpc::SendMessage(GetClient(), request);
+ } catch (const std::exception& ex) {
+ std::ostringstream msgbuf;
+ msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
+ Log(LogWarning, "remoting", msgbuf.str());
+
+ m_Client.reset();
+ }
}
}
if (IsLocalEndpoint())
EndpointManager::GetInstance()->ProcessResponseMessage(sender, response);
else {
- JsonRpc::SendMessage(GetClient(), response);
+ try {
+ JsonRpc::SendMessage(GetClient(), response);
+ } catch (const std::exception& ex) {
+ std::ostringstream msgbuf;
+ msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
+ Log(LogWarning, "remoting", msgbuf.str());
+
+ m_Client.reset();
+ }
}
}
void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
{
- try {
- for (;;) {
- MessagePart message = JsonRpc::ReadMessage(stream);
- Endpoint::Ptr sender = GetSelf();
-
- if (ResponseMessage::IsResponseMessage(message)) {
- /* rather than routing the message to the right virtual
- * endpoint we just process it here right away. */
- EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
- return;
- }
-
- RequestMessage request = message;
-
- String method;
- if (!request.GetMethod(&method))
- return;
-
- String id;
- if (request.GetID(&id))
- EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
- else
- EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
- }
- } catch (const std::exception& ex) {
- Log(LogWarning, "jsonrpc", "Lost connection to endpoint '" + GetName() + "': " + boost::diagnostic_information(ex));
+ for (;;) {
+ MessagePart message;
- {
- ObjectLock olock(this);
+ try {
+ message = JsonRpc::ReadMessage(stream);
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "jsonrpc", "Error while reading JSON-RPC message for endpoint '" + GetName() + "': " + boost::diagnostic_information(ex));
- // TODO: _only_ clear non-persistent subscriptions
- // unregister ourselves if no persistent subscriptions are left (use a
- // timer for that, once we have a TTL property for the topics)
- ClearSubscriptions();
+ GetClient()->Close();
m_Client.reset();
}
- OnDisconnected(GetSelf());
+ Endpoint::Ptr sender = GetSelf();
+
+ if (ResponseMessage::IsResponseMessage(message)) {
+ /* rather than routing the message to the right virtual
+ * endpoint we just process it here right away. */
+ EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
+ return;
+ }
+
+ RequestMessage request = message;
+
+ String method;
+ if (!request.GetMethod(&method))
+ return;
+
+ String id;
+ if (request.GetID(&id))
+ EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
+ else
+ EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
}
}
void ClearSubscriptions(void);
void RegisterTopicHandler(const String& topic, const boost::function<Callback>& callback);
- void UnregisterTopicHandler(const String& topic, const boost::function<Callback>& callback);
String GetNode(void) const;
String GetService(void) const;
static Endpoint::Ptr MakeEndpoint(const String& name, bool replicated, bool local = true);
static boost::signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
- static boost::signals2::signal<void (const Endpoint::Ptr&)> OnDisconnected;
private:
Attribute<bool> m_Local;
if (!endpoint)
endpoint = Endpoint::MakeEndpoint(identity, true);
- endpoint->SetClient(tlsStream);
+ BufferedStream::Ptr bufferedStream = boost::make_shared<BufferedStream>(tlsStream);
+
+ endpoint->SetClient(bufferedStream);
}
/**