1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "base/stream.hpp"
4 #include <boost/algorithm/string/trim.hpp>
6 using namespace icinga;
8 void Stream::RegisterDataHandler(const std::function<void(const Stream::Ptr&)>& handler)
10 if (SupportsWaiting())
11 OnDataAvailable.connect(handler);
13 BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
16 bool Stream::SupportsWaiting() const
21 bool Stream::IsDataAvailable() const
26 void Stream::Shutdown()
28 BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support Shutdown()."));
31 size_t Stream::Peek(void *buffer, size_t count, bool allow_partial)
33 BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support Peek()."));
36 void Stream::SignalDataAvailable()
38 OnDataAvailable(this);
41 boost::mutex::scoped_lock lock(m_Mutex);
46 bool Stream::WaitForData()
48 if (!SupportsWaiting())
49 BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
51 boost::mutex::scoped_lock lock(m_Mutex);
53 while (!IsDataAvailable() && !IsEof())
56 return IsDataAvailable() || IsEof();
59 bool Stream::WaitForData(int timeout)
61 if (!SupportsWaiting())
62 BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
65 BOOST_THROW_EXCEPTION(std::runtime_error("Timeout can't be negative"));
67 boost::system_time const point_of_timeout = boost::get_system_time() + boost::posix_time::seconds(timeout);
69 boost::mutex::scoped_lock lock(m_Mutex);
71 while (!IsDataAvailable() && !IsEof() && point_of_timeout > boost::get_system_time())
72 m_CV.timed_wait(lock, point_of_timeout);
74 return IsDataAvailable() || IsEof();
77 static void StreamDummyCallback()
82 OnDataAvailable.disconnect_all_slots();
84 /* Force signals2 to remove the slots, see https://stackoverflow.com/questions/2049291/force-deletion-of-slot-in-boostsignals2
86 OnDataAvailable.connect(std::bind(&StreamDummyCallback));
89 StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context, bool may_wait)
94 if (context.MustRead) {
95 if (!context.FillFromStream(this, may_wait)) {
98 *line = String(context.Buffer, &(context.Buffer[context.Size]));
99 boost::algorithm::trim_right(*line);
101 return StatusNewItem;
105 for (size_t i = 0; i < context.Size; i++) {
106 if (context.Buffer[i] == '\n') {
107 *line = String(context.Buffer, context.Buffer + i);
108 boost::algorithm::trim_right(*line);
110 context.DropData(i + 1u);
112 context.MustRead = !context.Size;
113 return StatusNewItem;
117 context.MustRead = true;
118 return StatusNeedData;
121 bool StreamReadContext::FillFromStream(const Stream::Ptr& stream, bool may_wait)
123 if (may_wait && stream->SupportsWaiting())
124 stream->WaitForData();
129 Buffer = (char *)realloc(Buffer, Size + 4096);
132 throw std::bad_alloc();
137 size_t rc = stream->Read(Buffer + Size, 4096, true);
141 } while (count < 64 * 1024 && stream->IsDataAvailable());
143 if (count == 0 && stream->IsEof())
149 void StreamReadContext::DropData(size_t count)
151 ASSERT(count <= Size);
152 memmove(Buffer, Buffer + count, Size - count);