]> granicus.if.org Git - icinga2/blob - lib/base/stream.cpp
Merge pull request #7185 from Icinga/bugfix/gelfwriter-wrong-log-facility
[icinga2] / lib / base / stream.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "base/stream.hpp"
4 #include <boost/algorithm/string/trim.hpp>
5
6 using namespace icinga;
7
8 void Stream::RegisterDataHandler(const std::function<void(const Stream::Ptr&)>& handler)
9 {
10         if (SupportsWaiting())
11                 OnDataAvailable.connect(handler);
12         else
13                 BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
14 }
15
16 bool Stream::SupportsWaiting() const
17 {
18         return false;
19 }
20
21 bool Stream::IsDataAvailable() const
22 {
23         return false;
24 }
25
26 void Stream::Shutdown()
27 {
28         BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support Shutdown()."));
29 }
30
31 size_t Stream::Peek(void *buffer, size_t count, bool allow_partial)
32 {
33         BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support Peek()."));
34 }
35
36 void Stream::SignalDataAvailable()
37 {
38         OnDataAvailable(this);
39
40         {
41                 boost::mutex::scoped_lock lock(m_Mutex);
42                 m_CV.notify_all();
43         }
44 }
45
46 bool Stream::WaitForData()
47 {
48         if (!SupportsWaiting())
49                 BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
50
51         boost::mutex::scoped_lock lock(m_Mutex);
52
53         while (!IsDataAvailable() && !IsEof())
54                 m_CV.wait(lock);
55
56         return IsDataAvailable() || IsEof();
57 }
58
59 bool Stream::WaitForData(int timeout)
60 {
61         if (!SupportsWaiting())
62                 BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
63
64         if (timeout < 0)
65                 BOOST_THROW_EXCEPTION(std::runtime_error("Timeout can't be negative"));
66
67         boost::system_time const point_of_timeout = boost::get_system_time() + boost::posix_time::seconds(timeout);
68
69         boost::mutex::scoped_lock lock(m_Mutex);
70
71         while (!IsDataAvailable() && !IsEof() && point_of_timeout > boost::get_system_time())
72                 m_CV.timed_wait(lock, point_of_timeout);
73
74         return IsDataAvailable() || IsEof();
75 }
76
77 static void StreamDummyCallback()
78 { }
79
80 void Stream::Close()
81 {
82         OnDataAvailable.disconnect_all_slots();
83
84         /* Force signals2 to remove the slots, see https://stackoverflow.com/questions/2049291/force-deletion-of-slot-in-boostsignals2
85          * for details. */
86         OnDataAvailable.connect(std::bind(&StreamDummyCallback));
87 }
88
89 StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context, bool may_wait)
90 {
91         if (context.Eof)
92                 return StatusEof;
93
94         if (context.MustRead) {
95                 if (!context.FillFromStream(this, may_wait)) {
96                         context.Eof = true;
97
98                         *line = String(context.Buffer, &(context.Buffer[context.Size]));
99                         boost::algorithm::trim_right(*line);
100
101                         return StatusNewItem;
102                 }
103         }
104
105         for (size_t i = 0; i < context.Size; i++) {
106                 if (context.Buffer[i] == '\n') {
107                         *line = String(context.Buffer, context.Buffer + i);
108                         boost::algorithm::trim_right(*line);
109
110                         context.DropData(i + 1u);
111
112                         context.MustRead = !context.Size;
113                         return StatusNewItem;
114                 }
115         }
116
117         context.MustRead = true;
118         return StatusNeedData;
119 }
120
121 bool StreamReadContext::FillFromStream(const Stream::Ptr& stream, bool may_wait)
122 {
123         if (may_wait && stream->SupportsWaiting())
124                 stream->WaitForData();
125
126         size_t count = 0;
127
128         do {
129                 Buffer = (char *)realloc(Buffer, Size + 4096);
130
131                 if (!Buffer)
132                         throw std::bad_alloc();
133
134                 if (stream->IsEof())
135                         break;
136
137                 size_t rc = stream->Read(Buffer + Size, 4096, true);
138
139                 Size += rc;
140                 count += rc;
141         } while (count < 64 * 1024 && stream->IsDataAvailable());
142
143         if (count == 0 && stream->IsEof())
144                 return false;
145         else
146                 return true;
147 }
148
149 void StreamReadContext::DropData(size_t count)
150 {
151         ASSERT(count <= Size);
152         memmove(Buffer, Buffer + count, Size - count);
153         Size -= count;
154 }