* @exception invalid_argument The input stream is invalid.
* @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c
*/
-StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str, StreamReadContext& context)
+StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str, StreamReadContext& context, bool may_wait)
{
if (context.Eof)
return StatusEof;
if (context.MustRead) {
- if (!context.FillFromStream(stream)) {
+ if (!context.FillFromStream(stream, may_wait)) {
context.Eof = true;
return StatusEof;
}
class I2_BASE_API NetString
{
public:
- static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context);
+ static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context, bool may_wait = false);
static void WriteStringToStream(const Stream::Ptr& stream, const String& message);
private:
}
}
+bool StdioStream::IsDataAvailable(void) const
+{
+ return !IsEof();
+}
+
bool StdioStream::IsEof(void) const
{
return !m_InnerStream->good();
virtual void Close(void);
+ virtual bool IsDataAvailable(void) const;
virtual bool IsEof(void) const;
private:
m_CV.wait(lock);
}
-StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context)
+StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context, bool may_wait)
{
if (context.Eof)
return StatusEof;
if (context.MustRead) {
- if (!context.FillFromStream(this)) {
+ if (!context.FillFromStream(this, may_wait)) {
context.Eof = true;
*line = String(context.Buffer, &(context.Buffer[context.Size]));
if (count == 1)
first_newline = i;
+ else if (count > 1)
+ break;
}
}
return StatusNeedData;
}
-bool StreamReadContext::FillFromStream(const Stream::Ptr& stream)
+bool StreamReadContext::FillFromStream(const Stream::Ptr& stream, bool may_wait)
{
- if (Wait && stream->SupportsWaiting())
+ if (may_wait && stream->SupportsWaiting())
stream->WaitForData();
size_t count = 0;
Size += rc;
count += rc;
- } while (stream->IsDataAvailable());
+ } while (count < 64 * 1024 && stream->IsDataAvailable());
if (count == 0 && stream->IsEof())
return false;
struct StreamReadContext
{
- StreamReadContext(bool wait = true)
- : Buffer(NULL), Size(0), MustRead(true), Eof(false), Wait(wait)
+ StreamReadContext(void)
+ : Buffer(NULL), Size(0), MustRead(true), Eof(false)
{ }
~StreamReadContext(void)
free(Buffer);
}
- bool FillFromStream(const intrusive_ptr<Stream>& stream);
+ bool FillFromStream(const intrusive_ptr<Stream>& stream, bool may_wait);
void DropData(size_t count);
char *Buffer;
size_t Size;
bool MustRead;
bool Eof;
- bool Wait;
};
enum StreamReadStatus
void RegisterDataHandler(const boost::function<void(void)>& handler);
- StreamReadStatus ReadLine(String *line, StreamReadContext& context);
+ StreamReadStatus ReadLine(String *line, StreamReadContext& context, bool may_wait = false);
protected:
void SignalDataAvailable(void);
ApiClient::ApiClient(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime()),
- m_NextHeartbeat(0), m_HeartbeatTimeout(0), m_Context(false)
+ m_NextHeartbeat(0), m_HeartbeatTimeout(0)
{
boost::call_once(l_ApiClientOnceFlag, &ApiClient::StaticInitialize);
void ApiClient::Start(void)
{
m_Stream->RegisterDataHandler(boost::bind(&ApiClient::DataAvailableHandler, this));
+ if (m_Stream->IsDataAvailable())
+ DataAvailableHandler();
}
String ApiClient::GetIdentity(void) const
void ApiClient::DataAvailableHandler(void)
{
+ boost::mutex::scoped_lock lock(m_DataHandlerMutex);
+
try {
while (ProcessMessage())
; /* empty loop body */
double m_NextHeartbeat;
double m_HeartbeatTimeout;
Timer::Ptr m_TimeoutTimer;
+ boost::mutex m_DataHandlerMutex;
StreamReadContext m_Context;