fifo.cpp \
fifo.h \
i2-base.h \
+ ioqueue.h \
logger.cpp \
logger.h \
object.cpp \
try {
Run();
- } catch (const exception& ex) {
+ } catch (const exception&) {
FinishException(boost::current_exception());
}
}
<ClInclude Include="configobject.h" />
<ClInclude Include="dictionary.h" />
<ClInclude Include="event.h" />
+ <ClInclude Include="ioqueue.h" />
<ClInclude Include="scriptfunction.h" />
<ClInclude Include="scripttask.h" />
<ClInclude Include="logger.h" />
<ClInclude Include="scriptfunction.h">
<Filter>Headerdateien</Filter>
</ClInclude>
+ <ClInclude Include="ioqueue.h">
+ <Filter>Headerdateien</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="Quelldateien">
}
/**
- * Returns the number of bytes that are contained in the FIFO.
- *
- * @returns The number of bytes.
+ * Implements IOQueue::GetAvailableBytes().
*/
-size_t FIFO::GetSize(void) const
+size_t FIFO::GetAvailableBytes(void) const
{
return m_DataSize;
}
*
* @returns Pointer to the read buffer.
*/
-const void *FIFO::GetReadBuffer(void) const
+/*const void *FIFO::GetReadBuffer(void) const
{
return m_Buffer + m_Offset;
-}
+}*/
/**
- * Reads data from the FIFO and places it in the specified buffer.
- *
- * @param buffer The buffer where the data should be placed (can be NULL if
- * the reader is not interested in the data).
- * @param count The number of bytes to read.
- * @returns The number of bytes read which may be less than what was requested.
+ * Implements IOQueue::Peek.
*/
-size_t FIFO::Read(void *buffer, size_t count)
+void FIFO::Peek(void *buffer, size_t count)
{
- count = (count <= m_DataSize) ? count : m_DataSize;
+ assert(m_DataSize >= count);
if (buffer != NULL)
memcpy(buffer, m_Buffer + m_Offset, count);
+}
+
+/**
+ * Implements IOQueue::Read.
+ */
+void FIFO::Read(void *buffer, size_t count)
+{
+ Peek(buffer, count);
m_DataSize -= count;
m_Offset += count;
Optimize();
-
- return count;
}
/**
* contains the actual size of the available buffer which can
* be larger than the requested size.
*/
-void *FIFO::GetWriteBuffer(size_t *count)
+/*void *FIFO::GetWriteBuffer(size_t *count)
{
ResizeBuffer(m_Offset + m_DataSize + *count);
*count = m_AllocSize - m_Offset - m_DataSize;
return m_Buffer + m_Offset + m_DataSize;
-}
+}*/
/**
- * Writes data to the FIFO.
- *
- * @param buffer The data that is to be written (can be NULL if the writer has
- * already filled the write buffer, e.g. via GetWriteBuffer()).
- * @param count The number of bytes to write.
- * @returns The number of bytes written
+ * Implements IOQueue::Write.
*/
-size_t FIFO::Write(const void *buffer, size_t count)
+void FIFO::Write(const void *buffer, size_t count)
{
- if (buffer != NULL) {
- size_t bufferSize = count;
- void *target_buffer = GetWriteBuffer(&bufferSize);
- memcpy(target_buffer, buffer, count);
- }
-
+ ResizeBuffer(m_Offset + m_DataSize + count);
+ memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count);
m_DataSize += count;
-
- return count;
}
*
* @ingroup base
*/
-class I2_BASE_API FIFO : public Object
+class I2_BASE_API FIFO : public Object, public IOQueue
{
public:
static const size_t BlockSize = 16 * 1024;
FIFO(void);
~FIFO(void);
- size_t GetSize(void) const;
+ /*const void *GetReadBuffer(void) const;
+ void *GetWriteBuffer(size_t *count);*/
- const void *GetReadBuffer(void) const;
- void *GetWriteBuffer(size_t *count);
-
- size_t Read(void *buffer, size_t count);
- size_t Write(const void *buffer, size_t count);
+ virtual size_t GetAvailableBytes(void) const;
+ virtual void Peek(void *buffer, size_t count);
+ virtual void Read(void *buffer, size_t count);
+ virtual void Write(const void *buffer, size_t count);
private:
char *m_Buffer;
#include "dictionary.h"
#include "ringbuffer.h"
#include "timer.h"
+#include "ioqueue.h"
#include "fifo.h"
#include "socket.h"
#include "tcpsocket.h"
--- /dev/null
+#ifndef IOQUEUE_H
+#define IOQUEUE_H
+
+namespace icinga
+{
+
+/**
+ * An I/O queue.
+ */
+class IOQueue
+{
+public:
+ /**
+ * Retrieves the number of bytes available for reading.
+ *
+ * @returns The number of available bytes.
+ */
+ virtual size_t GetAvailableBytes(void) const = 0;
+
+ /**
+ * Reads data from the queue without advancing the read pointer. Trying
+ * to read more data than is available in the queue is a programming error.
+ * Use GetBytesAvailable() to check how much data is available.
+ *
+ * @buffer The buffer where data should be stored. May be NULL if you're
+ * not actually interested in the data.
+ * @param count The number of bytes to read from the queue.
+ */
+ virtual void Peek(void *buffer, size_t count) = 0;
+
+ /**
+ * Reads data from the queue. Trying to read more data than is
+ * available in the queue is a programming error. Use GetBytesAvailable()
+ * to check how much data is available.
+ *
+ * @param buffer The buffer where data should be stored. May be NULL if you're
+ * not actually interested in the data.
+ * @param count The number of bytes to read from the queue.
+ */
+ virtual void Read(void *buffer, size_t count) = 0;
+
+ /**
+ * Writes data to the queue.
+ *
+ * @param buffer The data that is to be written.
+ * @param count The number of bytes to write.
+ * @returns The number of bytes written
+ */
+ virtual void Write(const void *buffer, size_t count) = 0;
+};
+
+}
+
+#endif /* IOQUEUE_H */
\ No newline at end of file
"Could not create a suitable socket."));
}
+void TcpClient::HandleWritable(void)
+{
+ int rc;
+ char data[1024];
+ size_t count;
+
+ for (;;) {
+ count = m_SendQueue->GetAvailableBytes();
+
+ if (count == 0)
+ break;
+
+ if (count > sizeof(data))
+ count = sizeof(data);
+
+ m_SendQueue->Peek(data, count);
+
+ rc = send(GetFD(), (const char *)data, count, 0);
+
+ if (rc <= 0) {
+ HandleSocketError(SocketException("send() failed", GetError()));
+ return;
+ }
+
+ m_SendQueue->Read(NULL, rc);
+ }
+}
+
/**
- * Retrieves the send queue for the socket.
- *
- * @returns The send queue.
+ * Implements IOQueue::GetAvailableBytes.
*/
-FIFO::Ptr TcpClient::GetSendQueue(void)
+size_t TcpClient::GetAvailableBytes(void) const
{
- return m_SendQueue;
+ mutex::scoped_lock lock(GetMutex());
+
+ return m_RecvQueue->GetAvailableBytes();
}
-void TcpClient::HandleWritable(void)
+/**
+ * Implements IOQueue::Peek.
+ */
+void TcpClient::Peek(void *buffer, size_t count)
{
- int rc;
+ mutex::scoped_lock lock(GetMutex());
- rc = send(GetFD(), (const char *)m_SendQueue->GetReadBuffer(), m_SendQueue->GetSize(), 0);
+ m_RecvQueue->Peek(buffer, count);
+}
- if (rc <= 0) {
- HandleSocketError(SocketException("send() failed", GetError()));
- return;
- }
+/**
+ * Implements IOQueue::Read.
+ */
+void TcpClient::Read(void *buffer, size_t count)
+{
+ mutex::scoped_lock lock(GetMutex());
- m_SendQueue->Read(NULL, rc);
+ m_RecvQueue->Read(buffer, count);
}
/**
- * Retrieves the recv queue for the socket.
- *
- * @returns The recv queue.
+ * Implements IOQueue::Write.
*/
-FIFO::Ptr TcpClient::GetRecvQueue(void)
+void TcpClient::Write(const void *buffer, size_t count)
{
- return m_RecvQueue;
+ mutex::scoped_lock lock(GetMutex());
+
+ m_SendQueue->Write(buffer, count);
}
void TcpClient::HandleReadable(void)
{
for (;;) {
- size_t bufferSize = FIFO::BlockSize / 2;
- char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
- int rc = recv(GetFD(), buffer, bufferSize, 0);
+ char data[1024];
+ int rc = recv(GetFD(), data, sizeof(data), 0);
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
return;
}
- m_RecvQueue->Write(NULL, rc);
+ m_RecvQueue->Write(data, rc);
}
Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
*/
bool TcpClient::WantsToWrite(void) const
{
- return (m_SendQueue->GetSize() > 0);
+ return (m_SendQueue->GetAvailableBytes() > 0);
}
/**
*
* @ingroup base
*/
-class I2_BASE_API TcpClient : public TcpSocket
+class I2_BASE_API TcpClient : public TcpSocket, public IOQueue
{
public:
typedef shared_ptr<TcpClient> Ptr;
void Connect(const string& node, const string& service);
- FIFO::Ptr GetSendQueue(void);
- FIFO::Ptr GetRecvQueue(void);
-
boost::signal<void (const TcpClient::Ptr&)> OnDataAvailable;
+ virtual size_t GetAvailableBytes(void) const;
+ virtual void Peek(void *buffer, size_t count);
+ virtual void Read(void *buffer, size_t count);
+ virtual void Write(const void *buffer, size_t count);
+
protected:
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
virtual void HandleReadable(void);
virtual void HandleWritable(void);
-private:
- TcpClientRole m_Role;
-
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
+
+private:
+ TcpClientRole m_Role;
};
/**
result = 0;
for (;;) {
- size_t bufferSize = FIFO::BlockSize / 2;
- char *buffer = (char *)GetRecvQueue()->GetWriteBuffer(&bufferSize);
- int rc = SSL_read(m_SSL.get(), buffer, bufferSize);
+ char data[1024];
+ int rc = SSL_read(m_SSL.get(), data, sizeof(data));
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
}
}
- GetRecvQueue()->Write(NULL, rc);
+ m_RecvQueue->Write(data, rc);
}
post_event:
m_BlockRead = false;
m_BlockWrite = false;
- int rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize());
-
- if (rc <= 0) {
- switch (SSL_get_error(m_SSL.get(), rc)) {
- case SSL_ERROR_WANT_READ:
- m_BlockWrite = true;
- /* fall through */
- case SSL_ERROR_WANT_WRITE:
- return;
- case SSL_ERROR_ZERO_RETURN:
- CloseInternal(false);
- return;
- default:
- HandleSocketError(OpenSSLException(
- "SSL_write failed", ERR_get_error()));
- return;
+ char data[1024];
+ size_t count;
+
+ for (;;) {
+ count = m_SendQueue->GetAvailableBytes();
+
+ if (count == 0)
+ break;
+
+ if (count > sizeof(data))
+ count = sizeof(data);
+
+ m_SendQueue->Peek(data, count);
+
+ int rc = SSL_write(m_SSL.get(), (const char *)data, count);
+
+ if (rc <= 0) {
+ switch (SSL_get_error(m_SSL.get(), rc)) {
+ case SSL_ERROR_WANT_READ:
+ m_BlockWrite = true;
+ /* fall through */
+ case SSL_ERROR_WANT_WRITE:
+ return;
+ case SSL_ERROR_ZERO_RETURN:
+ CloseInternal(false);
+ return;
+ default:
+ HandleSocketError(OpenSSLException(
+ "SSL_write failed", ERR_get_error()));
+ return;
+ }
}
- }
- GetSendQueue()->Read(NULL, rc);
+ m_SendQueue->Read(NULL, rc);
+ }
}
/**